Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # Utility functions to deserialize objects from Java.
0019 
0020 # nolint start
0021 # Type mapping from Java to R
0022 #
0023 # void -> NULL
0024 # Int -> integer
0025 # String -> character
0026 # Boolean -> logical
0027 # Float -> double
0028 # Double -> double
0029 # Long -> double
0030 # Array[Byte] -> raw
0031 # Date -> Date
0032 # Time -> POSIXct
0033 #
0034 # Array[T] -> list()
0035 # Object -> jobj
0036 #
0037 # nolint end
0038 
0039 readObject <- function(con) {
0040   # Read type first
0041   type <- readType(con)
0042   readTypedObject(con, type)
0043 }
0044 
0045 readTypedObject <- function(con, type) {
0046   switch(type,
0047     "i" = readInt(con),
0048     "c" = readString(con),
0049     "b" = readBoolean(con),
0050     "d" = readDouble(con),
0051     "r" = readRaw(con),
0052     "D" = readDate(con),
0053     "t" = readTime(con),
0054     "a" = readArray(con),
0055     "l" = readList(con),
0056     "e" = readEnv(con),
0057     "s" = readStruct(con),
0058     "n" = NULL,
0059     "j" = getJobj(readString(con)),
0060     stop("Unsupported type for deserialization ", type))
0061 }
0062 
0063 readStringData <- function(con, len) {
0064   raw <- readBin(con, raw(), len, endian = "big")
0065   string <- rawToChar(raw)
0066   Encoding(string) <- "UTF-8"
0067   string
0068 }
0069 
0070 readString <- function(con) {
0071   stringLen <- readInt(con)
0072   readStringData(con, stringLen)
0073 }
0074 
0075 readInt <- function(con) {
0076   readBin(con, integer(), n = 1, endian = "big")
0077 }
0078 
0079 readDouble <- function(con) {
0080   readBin(con, double(), n = 1, endian = "big")
0081 }
0082 
0083 readBoolean <- function(con) {
0084   as.logical(readInt(con))
0085 }
0086 
0087 readType <- function(con) {
0088   rawToChar(readBin(con, "raw", n = 1L))
0089 }
0090 
0091 readDate <- function(con) {
0092   as.Date(readString(con))
0093 }
0094 
0095 readTime <- function(con) {
0096   t <- readDouble(con)
0097   as.POSIXct(t, origin = "1970-01-01")
0098 }
0099 
0100 readArray <- function(con) {
0101   type <- readType(con)
0102   len <- readInt(con)
0103   if (len > 0) {
0104     l <- vector("list", len)
0105     for (i in 1:len) {
0106       l[[i]] <- readTypedObject(con, type)
0107     }
0108     l
0109   } else {
0110     list()
0111   }
0112 }
0113 
0114 # Read a list. Types of each element may be different.
0115 # Null objects are read as NA.
0116 readList <- function(con) {
0117   len <- readInt(con)
0118   if (len > 0) {
0119     l <- vector("list", len)
0120     for (i in 1:len) {
0121       elem <- readObject(con)
0122       if (is.null(elem)) {
0123         elem <- NA
0124       }
0125       l[[i]] <- elem
0126     }
0127     l
0128   } else {
0129     list()
0130   }
0131 }
0132 
0133 readEnv <- function(con) {
0134   env <- new.env()
0135   len <- readInt(con)
0136   if (len > 0) {
0137     for (i in 1:len) {
0138       key <- readString(con)
0139       value <- readObject(con)
0140       env[[key]] <- value
0141     }
0142   }
0143   env
0144 }
0145 
0146 # Read a field of StructType from SparkDataFrame
0147 # into a named list in R whose class is "struct"
0148 readStruct <- function(con) {
0149   names <- readObject(con)
0150   fields <- readObject(con)
0151   names(fields) <- names
0152   listToStruct(fields)
0153 }
0154 
0155 readRaw <- function(con) {
0156   dataLen <- readInt(con)
0157   readBin(con, raw(), as.integer(dataLen), endian = "big")
0158 }
0159 
0160 readRawLen <- function(con, dataLen) {
0161   readBin(con, raw(), as.integer(dataLen), endian = "big")
0162 }
0163 
0164 readDeserialize <- function(con) {
0165   # We have two cases that are possible - In one, the entire partition is
0166   # encoded as a byte array, so we have only one value to read. If so just
0167   # return firstData
0168   dataLen <- readInt(con)
0169   firstData <- unserialize(
0170       readBin(con, raw(), as.integer(dataLen), endian = "big"))
0171 
0172   # Else, read things into a list
0173   dataLen <- readInt(con)
0174   if (length(dataLen) > 0 && dataLen > 0) {
0175     data <- list(firstData)
0176     while (length(dataLen) > 0 && dataLen > 0) {
0177       data[[length(data) + 1L]] <- unserialize(
0178           readBin(con, raw(), as.integer(dataLen), endian = "big"))
0179       dataLen <- readInt(con)
0180     }
0181     unlist(data, recursive = FALSE)
0182   } else {
0183     firstData
0184   }
0185 }
0186 
0187 readMultipleObjects <- function(inputCon) {
0188   # readMultipleObjects will read multiple continuous objects from
0189   # a DataOutputStream. There is no preceding field telling the count
0190   # of the objects, so the number of objects varies, we try to read
0191   # all objects in a loop until the end of the stream.
0192   data <- list()
0193   while (TRUE) {
0194     # If reaching the end of the stream, type returned should be "".
0195     type <- readType(inputCon)
0196     if (type == "") {
0197       break
0198     }
0199     data[[length(data) + 1L]] <- readTypedObject(inputCon, type)
0200   }
0201   data # this is a list of named lists now
0202 }
0203 
0204 readMultipleObjectsWithKeys <- function(inputCon) {
0205   # readMultipleObjectsWithKeys will read multiple continuous objects from
0206   # a DataOutputStream. There is no preceding field telling the count
0207   # of the objects, so the number of objects varies, we try to read
0208   # all objects in a loop until the end of the stream. This function
0209   # is for use by gapply. Each group of rows is followed by the grouping
0210   # key for this group which is then followed by next group.
0211   keys <- list()
0212   data <- list()
0213   subData <- list()
0214   while (TRUE) {
0215     # If reaching the end of the stream, type returned should be "".
0216     type <- readType(inputCon)
0217     if (type == "") {
0218       break
0219     } else if (type == "r") {
0220       type <- readType(inputCon)
0221       # A grouping boundary detected
0222       key <- readTypedObject(inputCon, type)
0223       index <- length(data) + 1L
0224       data[[index]] <- subData
0225       keys[[index]] <- key
0226       subData <- list()
0227     } else {
0228       subData[[length(subData) + 1L]] <- readTypedObject(inputCon, type)
0229     }
0230   }
0231   list(keys = keys, data = data) # this is a list of keys and corresponding data
0232 }
0233 
0234 readDeserializeInArrow <- function(inputCon) {
0235   if (requireNamespace("arrow", quietly = TRUE)) {
0236     # Arrow drops `as_tibble` since 0.14.0, see ARROW-5190.
0237     useAsTibble <- exists("as_tibble", envir = asNamespace("arrow"))
0238 
0239 
0240     # Currently, there looks no way to read batch by batch by socket connection in R side,
0241     # See ARROW-4512. Therefore, it reads the whole Arrow streaming-formatted binary at once
0242     # for now.
0243     dataLen <- readInt(inputCon)
0244     arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
0245     batches <- arrow::RecordBatchStreamReader$create(arrowData)$batches()
0246 
0247     if (useAsTibble) {
0248       as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
0249       # Read all groupped batches. Tibble -> data.frame is cheap.
0250       lapply(batches, function(batch) as.data.frame(as_tibble(batch)))
0251     } else {
0252       lapply(batches, function(batch) as.data.frame(batch))
0253     }
0254   } else {
0255     stop("'arrow' package should be installed.")
0256   }
0257 }
0258 
0259 readDeserializeWithKeysInArrow <- function(inputCon) {
0260   data <- readDeserializeInArrow(inputCon)
0261 
0262   keys <- readMultipleObjects(inputCon)
0263 
0264   # Read keys to map with each groupped batch later.
0265   list(keys = keys, data = data)
0266 }
0267 
0268 readRowList <- function(obj) {
0269   # readRowList is meant for use inside an lapply. As a result, it is
0270   # necessary to open a standalone connection for the row and consume
0271   # the numCols bytes inside the read function in order to correctly
0272   # deserialize the row.
0273   rawObj <- rawConnection(obj, "r+")
0274   on.exit(close(rawObj))
0275   readObject(rawObj)
0276 }