0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # Utility functions to serialize R objects so they can be read in Java.
0019
0020 # nolint start
0021 # Type mapping from R to Java
0022 #
0023 # NULL -> Void
0024 # integer -> Int
0025 # character -> String
0026 # logical -> Boolean
0027 # double, numeric -> Double
0028 # raw -> Array[Byte]
0029 # Date -> Date
0030 # POSIXct,POSIXlt -> Time
0031 #
0032 # list[T] -> Array[T], where T is one of above mentioned types
0033 # Multi-element vector of any of the above (except raw) -> Array[T]
0034 # environment -> Map[String, T], where T is a native type
0035 # jobj -> Object, where jobj is an object created in the backend
0036 # nolint end
0037
0038 getSerdeType <- function(object) {
0039 type <- class(object)[[1]]
0040 if (is.atomic(object) & !is.raw(object) & length(object) > 1) {
0041 "array"
0042 } else if (type != "list") {
0043 type
0044 } else {
0045 # Check if all elements are of same type
0046 elemType <- unique(sapply(object, function(elem) { getSerdeType(elem) }))
0047 if (length(elemType) <= 1) {
0048 "array"
0049 } else {
0050 "list"
0051 }
0052 }
0053 }
0054
0055 writeObject <- function(con, object, writeType = TRUE) {
0056 # NOTE: In R vectors have same type as objects
0057 type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
0058 # Checking types is needed here, since 'is.na' only handles atomic vectors,
0059 # lists and pairlists
0060 if (type %in% c("integer", "character", "logical", "double", "numeric")) {
0061 if (is.na(object)) {
0062 object <- NULL
0063 type <- "NULL"
0064 }
0065 }
0066
0067 serdeType <- getSerdeType(object)
0068 if (writeType) {
0069 writeType(con, serdeType)
0070 }
0071 switch(serdeType,
0072 NULL = writeVoid(con),
0073 integer = writeInt(con, object),
0074 character = writeString(con, object),
0075 logical = writeBoolean(con, object),
0076 double = writeDouble(con, object),
0077 numeric = writeDouble(con, object),
0078 raw = writeRaw(con, object),
0079 array = writeArray(con, object),
0080 list = writeList(con, object),
0081 struct = writeList(con, object),
0082 jobj = writeJobj(con, object),
0083 environment = writeEnv(con, object),
0084 Date = writeDate(con, object),
0085 POSIXlt = writeTime(con, object),
0086 POSIXct = writeTime(con, object),
0087 stop("Unsupported type for serialization ", type))
0088 }
0089
0090 writeVoid <- function(con) {
0091 # no value for NULL
0092 }
0093
0094 writeJobj <- function(con, value) {
0095 if (!isValidJobj(value)) {
0096 stop("invalid jobj ", value$id)
0097 }
0098 writeString(con, value$id)
0099 }
0100
0101 writeString <- function(con, value) {
0102 utfVal <- enc2utf8(value)
0103 writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1))
0104 writeBin(utfVal, con, endian = "big", useBytes = TRUE)
0105 }
0106
0107 writeInt <- function(con, value) {
0108 writeBin(as.integer(value), con, endian = "big")
0109 }
0110
0111 writeDouble <- function(con, value) {
0112 writeBin(value, con, endian = "big")
0113 }
0114
0115 writeBoolean <- function(con, value) {
0116 # TRUE becomes 1, FALSE becomes 0
0117 writeInt(con, as.integer(value))
0118 }
0119
0120 writeRawSerialize <- function(outputCon, batch) {
0121 outputSer <- serialize(batch, ascii = FALSE, connection = NULL)
0122 writeRaw(outputCon, outputSer)
0123 }
0124
0125 writeRowSerialize <- function(outputCon, rows) {
0126 invisible(lapply(rows, function(r) {
0127 bytes <- serializeRow(r)
0128 writeRaw(outputCon, bytes)
0129 }))
0130 }
0131
0132 serializeRow <- function(row) {
0133 rawObj <- rawConnection(raw(0), "wb")
0134 on.exit(close(rawObj))
0135 writeList(rawObj, row)
0136 rawConnectionValue(rawObj)
0137 }
0138
0139 writeRaw <- function(con, batch) {
0140 writeInt(con, length(batch))
0141 writeBin(batch, con, endian = "big")
0142 }
0143
0144 writeType <- function(con, class) {
0145 type <- switch(class,
0146 NULL = "n",
0147 integer = "i",
0148 character = "c",
0149 logical = "b",
0150 double = "d",
0151 numeric = "d",
0152 raw = "r",
0153 array = "a",
0154 list = "l",
0155 struct = "s",
0156 jobj = "j",
0157 environment = "e",
0158 Date = "D",
0159 POSIXlt = "t",
0160 POSIXct = "t",
0161 stop("Unsupported type for serialization ", class))
0162 writeBin(charToRaw(type), con)
0163 }
0164
0165 # Used to pass arrays where all the elements are of the same type
0166 writeArray <- function(con, arr) {
0167 # TODO: Empty lists are given type "character" right now.
0168 # This may not work if the Java side expects array of any other type.
0169 if (length(arr) == 0) {
0170 elemType <- class("somestring")
0171 } else {
0172 elemType <- getSerdeType(arr[[1]])
0173 }
0174
0175 writeType(con, elemType)
0176 writeInt(con, length(arr))
0177
0178 if (length(arr) > 0) {
0179 for (a in arr) {
0180 writeObject(con, a, FALSE)
0181 }
0182 }
0183 }
0184
0185 # Used to pass arrays where the elements can be of different types
0186 writeList <- function(con, list) {
0187 writeInt(con, length(list))
0188 for (elem in list) {
0189 writeObject(con, elem)
0190 }
0191 }
0192
0193 # Used to pass in hash maps required on Java side.
0194 writeEnv <- function(con, env) {
0195 len <- length(env)
0196
0197 writeInt(con, len)
0198 if (len > 0) {
0199 writeArray(con, as.list(ls(env)))
0200 vals <- lapply(ls(env), function(x) { env[[x]] })
0201 writeList(con, as.list(vals))
0202 }
0203 }
0204
0205 writeDate <- function(con, date) {
0206 writeString(con, as.character(date))
0207 }
0208
0209 writeTime <- function(con, time) {
0210 writeDouble(con, as.double(time))
0211 }
0212
0213 # Used to serialize in a list of objects where each
0214 # object can be of a different type. Serialization format is
0215 # <object type> <object> for each object
0216 writeArgs <- function(con, args) {
0217 if (length(args) > 0) {
0218 for (a in args) {
0219 writeObject(con, a)
0220 }
0221 }
0222 }
0223
0224 writeSerializeInArrow <- function(conn, df) {
0225 if (requireNamespace("arrow", quietly = TRUE)) {
0226 # There looks no way to send each batch in streaming format via socket
0227 # connection. See ARROW-4512.
0228 # So, it writes the whole Arrow streaming-formatted binary at once for now.
0229 writeRaw(conn, arrow::write_arrow(df, raw()))
0230 } else {
0231 stop("'arrow' package should be installed.")
0232 }
0233 }