Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # Utility functions to serialize R objects so they can be read in Java.
0019 
0020 # nolint start
0021 # Type mapping from R to Java
0022 #
0023 # NULL -> Void
0024 # integer -> Int
0025 # character -> String
0026 # logical -> Boolean
0027 # double, numeric -> Double
0028 # raw -> Array[Byte]
0029 # Date -> Date
0030 # POSIXct,POSIXlt -> Time
0031 #
0032 # list[T] -> Array[T], where T is one of above mentioned types
0033 # Multi-element vector of any of the above (except raw) -> Array[T]
0034 # environment -> Map[String, T], where T is a native type
0035 # jobj -> Object, where jobj is an object created in the backend
0036 # nolint end
0037 
0038 getSerdeType <- function(object) {
0039   type <- class(object)[[1]]
0040   if (is.atomic(object) & !is.raw(object) & length(object) > 1) {
0041     "array"
0042   } else if (type != "list") {
0043      type
0044   } else {
0045     # Check if all elements are of same type
0046     elemType <- unique(sapply(object, function(elem) { getSerdeType(elem) }))
0047     if (length(elemType) <= 1) {
0048       "array"
0049     } else {
0050       "list"
0051     }
0052   }
0053 }
0054 
0055 writeObject <- function(con, object, writeType = TRUE) {
0056   # NOTE: In R vectors have same type as objects
0057   type <- class(object)[[1]]  # class of POSIXlt is c("POSIXlt", "POSIXt")
0058   # Checking types is needed here, since 'is.na' only handles atomic vectors,
0059   # lists and pairlists
0060   if (type %in% c("integer", "character", "logical", "double", "numeric")) {
0061     if (is.na(object)) {
0062       object <- NULL
0063       type <- "NULL"
0064     }
0065   }
0066 
0067   serdeType <- getSerdeType(object)
0068   if (writeType) {
0069     writeType(con, serdeType)
0070   }
0071   switch(serdeType,
0072          NULL = writeVoid(con),
0073          integer = writeInt(con, object),
0074          character = writeString(con, object),
0075          logical = writeBoolean(con, object),
0076          double = writeDouble(con, object),
0077          numeric = writeDouble(con, object),
0078          raw = writeRaw(con, object),
0079          array = writeArray(con, object),
0080          list = writeList(con, object),
0081          struct = writeList(con, object),
0082          jobj = writeJobj(con, object),
0083          environment = writeEnv(con, object),
0084          Date = writeDate(con, object),
0085          POSIXlt = writeTime(con, object),
0086          POSIXct = writeTime(con, object),
0087          stop("Unsupported type for serialization ", type))
0088 }
0089 
0090 writeVoid <- function(con) {
0091   # no value for NULL
0092 }
0093 
0094 writeJobj <- function(con, value) {
0095   if (!isValidJobj(value)) {
0096     stop("invalid jobj ", value$id)
0097   }
0098   writeString(con, value$id)
0099 }
0100 
0101 writeString <- function(con, value) {
0102   utfVal <- enc2utf8(value)
0103   writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1))
0104   writeBin(utfVal, con, endian = "big", useBytes = TRUE)
0105 }
0106 
0107 writeInt <- function(con, value) {
0108   writeBin(as.integer(value), con, endian = "big")
0109 }
0110 
0111 writeDouble <- function(con, value) {
0112   writeBin(value, con, endian = "big")
0113 }
0114 
0115 writeBoolean <- function(con, value) {
0116   # TRUE becomes 1, FALSE becomes 0
0117   writeInt(con, as.integer(value))
0118 }
0119 
0120 writeRawSerialize <- function(outputCon, batch) {
0121   outputSer <- serialize(batch, ascii = FALSE, connection = NULL)
0122   writeRaw(outputCon, outputSer)
0123 }
0124 
0125 writeRowSerialize <- function(outputCon, rows) {
0126   invisible(lapply(rows, function(r) {
0127     bytes <- serializeRow(r)
0128     writeRaw(outputCon, bytes)
0129   }))
0130 }
0131 
0132 serializeRow <- function(row) {
0133   rawObj <- rawConnection(raw(0), "wb")
0134   on.exit(close(rawObj))
0135   writeList(rawObj, row)
0136   rawConnectionValue(rawObj)
0137 }
0138 
0139 writeRaw <- function(con, batch) {
0140   writeInt(con, length(batch))
0141   writeBin(batch, con, endian = "big")
0142 }
0143 
0144 writeType <- function(con, class) {
0145   type <- switch(class,
0146                  NULL = "n",
0147                  integer = "i",
0148                  character = "c",
0149                  logical = "b",
0150                  double = "d",
0151                  numeric = "d",
0152                  raw = "r",
0153                  array = "a",
0154                  list = "l",
0155                  struct = "s",
0156                  jobj = "j",
0157                  environment = "e",
0158                  Date = "D",
0159                  POSIXlt = "t",
0160                  POSIXct = "t",
0161                  stop("Unsupported type for serialization ", class))
0162   writeBin(charToRaw(type), con)
0163 }
0164 
0165 # Used to pass arrays where all the elements are of the same type
0166 writeArray <- function(con, arr) {
0167   # TODO: Empty lists are given type "character" right now.
0168   # This may not work if the Java side expects array of any other type.
0169   if (length(arr) == 0) {
0170     elemType <- class("somestring")
0171   } else {
0172     elemType <- getSerdeType(arr[[1]])
0173   }
0174 
0175   writeType(con, elemType)
0176   writeInt(con, length(arr))
0177 
0178   if (length(arr) > 0) {
0179     for (a in arr) {
0180       writeObject(con, a, FALSE)
0181     }
0182   }
0183 }
0184 
0185 # Used to pass arrays where the elements can be of different types
0186 writeList <- function(con, list) {
0187   writeInt(con, length(list))
0188   for (elem in list) {
0189     writeObject(con, elem)
0190   }
0191 }
0192 
0193 # Used to pass in hash maps required on Java side.
0194 writeEnv <- function(con, env) {
0195   len <- length(env)
0196 
0197   writeInt(con, len)
0198   if (len > 0) {
0199     writeArray(con, as.list(ls(env)))
0200     vals <- lapply(ls(env), function(x) { env[[x]] })
0201     writeList(con, as.list(vals))
0202   }
0203 }
0204 
0205 writeDate <- function(con, date) {
0206   writeString(con, as.character(date))
0207 }
0208 
0209 writeTime <- function(con, time) {
0210   writeDouble(con, as.double(time))
0211 }
0212 
0213 # Used to serialize in a list of objects where each
0214 # object can be of a different type. Serialization format is
0215 # <object type> <object> for each object
0216 writeArgs <- function(con, args) {
0217   if (length(args) > 0) {
0218     for (a in args) {
0219       writeObject(con, a)
0220     }
0221   }
0222 }
0223 
0224 writeSerializeInArrow <- function(conn, df) {
0225   if (requireNamespace("arrow", quietly = TRUE)) {
0226     # There looks no way to send each batch in streaming format via socket
0227     # connection. See ARROW-4512.
0228     # So, it writes the whole Arrow streaming-formatted binary at once for now.
0229     writeRaw(conn, arrow::write_arrow(df, raw()))
0230   } else {
0231     stop("'arrow' package should be installed.")
0232   }
0233 }