0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # Utility functions to deserialize objects from Java.
0019
0020 # nolint start
0021 # Type mapping from Java to R
0022 #
0023 # void -> NULL
0024 # Int -> integer
0025 # String -> character
0026 # Boolean -> logical
0027 # Float -> double
0028 # Double -> double
0029 # Long -> double
0030 # Array[Byte] -> raw
0031 # Date -> Date
0032 # Time -> POSIXct
0033 #
0034 # Array[T] -> list()
0035 # Object -> jobj
0036 #
0037 # nolint end
0038
0039 readObject <- function(con) {
0040 # Read type first
0041 type <- readType(con)
0042 readTypedObject(con, type)
0043 }
0044
0045 readTypedObject <- function(con, type) {
0046 switch(type,
0047 "i" = readInt(con),
0048 "c" = readString(con),
0049 "b" = readBoolean(con),
0050 "d" = readDouble(con),
0051 "r" = readRaw(con),
0052 "D" = readDate(con),
0053 "t" = readTime(con),
0054 "a" = readArray(con),
0055 "l" = readList(con),
0056 "e" = readEnv(con),
0057 "s" = readStruct(con),
0058 "n" = NULL,
0059 "j" = getJobj(readString(con)),
0060 stop("Unsupported type for deserialization ", type))
0061 }
0062
0063 readStringData <- function(con, len) {
0064 raw <- readBin(con, raw(), len, endian = "big")
0065 string <- rawToChar(raw)
0066 Encoding(string) <- "UTF-8"
0067 string
0068 }
0069
0070 readString <- function(con) {
0071 stringLen <- readInt(con)
0072 readStringData(con, stringLen)
0073 }
0074
0075 readInt <- function(con) {
0076 readBin(con, integer(), n = 1, endian = "big")
0077 }
0078
0079 readDouble <- function(con) {
0080 readBin(con, double(), n = 1, endian = "big")
0081 }
0082
0083 readBoolean <- function(con) {
0084 as.logical(readInt(con))
0085 }
0086
0087 readType <- function(con) {
0088 rawToChar(readBin(con, "raw", n = 1L))
0089 }
0090
0091 readDate <- function(con) {
0092 as.Date(readString(con))
0093 }
0094
0095 readTime <- function(con) {
0096 t <- readDouble(con)
0097 as.POSIXct(t, origin = "1970-01-01")
0098 }
0099
0100 readArray <- function(con) {
0101 type <- readType(con)
0102 len <- readInt(con)
0103 if (len > 0) {
0104 l <- vector("list", len)
0105 for (i in 1:len) {
0106 l[[i]] <- readTypedObject(con, type)
0107 }
0108 l
0109 } else {
0110 list()
0111 }
0112 }
0113
0114 # Read a list. Types of each element may be different.
0115 # Null objects are read as NA.
0116 readList <- function(con) {
0117 len <- readInt(con)
0118 if (len > 0) {
0119 l <- vector("list", len)
0120 for (i in 1:len) {
0121 elem <- readObject(con)
0122 if (is.null(elem)) {
0123 elem <- NA
0124 }
0125 l[[i]] <- elem
0126 }
0127 l
0128 } else {
0129 list()
0130 }
0131 }
0132
0133 readEnv <- function(con) {
0134 env <- new.env()
0135 len <- readInt(con)
0136 if (len > 0) {
0137 for (i in 1:len) {
0138 key <- readString(con)
0139 value <- readObject(con)
0140 env[[key]] <- value
0141 }
0142 }
0143 env
0144 }
0145
0146 # Read a field of StructType from SparkDataFrame
0147 # into a named list in R whose class is "struct"
0148 readStruct <- function(con) {
0149 names <- readObject(con)
0150 fields <- readObject(con)
0151 names(fields) <- names
0152 listToStruct(fields)
0153 }
0154
0155 readRaw <- function(con) {
0156 dataLen <- readInt(con)
0157 readBin(con, raw(), as.integer(dataLen), endian = "big")
0158 }
0159
0160 readRawLen <- function(con, dataLen) {
0161 readBin(con, raw(), as.integer(dataLen), endian = "big")
0162 }
0163
0164 readDeserialize <- function(con) {
0165 # We have two cases that are possible - In one, the entire partition is
0166 # encoded as a byte array, so we have only one value to read. If so just
0167 # return firstData
0168 dataLen <- readInt(con)
0169 firstData <- unserialize(
0170 readBin(con, raw(), as.integer(dataLen), endian = "big"))
0171
0172 # Else, read things into a list
0173 dataLen <- readInt(con)
0174 if (length(dataLen) > 0 && dataLen > 0) {
0175 data <- list(firstData)
0176 while (length(dataLen) > 0 && dataLen > 0) {
0177 data[[length(data) + 1L]] <- unserialize(
0178 readBin(con, raw(), as.integer(dataLen), endian = "big"))
0179 dataLen <- readInt(con)
0180 }
0181 unlist(data, recursive = FALSE)
0182 } else {
0183 firstData
0184 }
0185 }
0186
0187 readMultipleObjects <- function(inputCon) {
0188 # readMultipleObjects will read multiple continuous objects from
0189 # a DataOutputStream. There is no preceding field telling the count
0190 # of the objects, so the number of objects varies, we try to read
0191 # all objects in a loop until the end of the stream.
0192 data <- list()
0193 while (TRUE) {
0194 # If reaching the end of the stream, type returned should be "".
0195 type <- readType(inputCon)
0196 if (type == "") {
0197 break
0198 }
0199 data[[length(data) + 1L]] <- readTypedObject(inputCon, type)
0200 }
0201 data # this is a list of named lists now
0202 }
0203
0204 readMultipleObjectsWithKeys <- function(inputCon) {
0205 # readMultipleObjectsWithKeys will read multiple continuous objects from
0206 # a DataOutputStream. There is no preceding field telling the count
0207 # of the objects, so the number of objects varies, we try to read
0208 # all objects in a loop until the end of the stream. This function
0209 # is for use by gapply. Each group of rows is followed by the grouping
0210 # key for this group which is then followed by next group.
0211 keys <- list()
0212 data <- list()
0213 subData <- list()
0214 while (TRUE) {
0215 # If reaching the end of the stream, type returned should be "".
0216 type <- readType(inputCon)
0217 if (type == "") {
0218 break
0219 } else if (type == "r") {
0220 type <- readType(inputCon)
0221 # A grouping boundary detected
0222 key <- readTypedObject(inputCon, type)
0223 index <- length(data) + 1L
0224 data[[index]] <- subData
0225 keys[[index]] <- key
0226 subData <- list()
0227 } else {
0228 subData[[length(subData) + 1L]] <- readTypedObject(inputCon, type)
0229 }
0230 }
0231 list(keys = keys, data = data) # this is a list of keys and corresponding data
0232 }
0233
0234 readDeserializeInArrow <- function(inputCon) {
0235 if (requireNamespace("arrow", quietly = TRUE)) {
0236 # Arrow drops `as_tibble` since 0.14.0, see ARROW-5190.
0237 useAsTibble <- exists("as_tibble", envir = asNamespace("arrow"))
0238
0239
0240 # Currently, there looks no way to read batch by batch by socket connection in R side,
0241 # See ARROW-4512. Therefore, it reads the whole Arrow streaming-formatted binary at once
0242 # for now.
0243 dataLen <- readInt(inputCon)
0244 arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
0245 batches <- arrow::RecordBatchStreamReader$create(arrowData)$batches()
0246
0247 if (useAsTibble) {
0248 as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
0249 # Read all groupped batches. Tibble -> data.frame is cheap.
0250 lapply(batches, function(batch) as.data.frame(as_tibble(batch)))
0251 } else {
0252 lapply(batches, function(batch) as.data.frame(batch))
0253 }
0254 } else {
0255 stop("'arrow' package should be installed.")
0256 }
0257 }
0258
0259 readDeserializeWithKeysInArrow <- function(inputCon) {
0260 data <- readDeserializeInArrow(inputCon)
0261
0262 keys <- readMultipleObjects(inputCon)
0263
0264 # Read keys to map with each groupped batch later.
0265 list(keys = keys, data = data)
0266 }
0267
0268 readRowList <- function(obj) {
0269 # readRowList is meant for use inside an lapply. As a result, it is
0270 # necessary to open a standalone connection for the row and consume
0271 # the numCols bytes inside the read function in order to correctly
0272 # deserialize the row.
0273 rawObj <- rawConnection(obj, "r+")
0274 on.exit(close(rawObj))
0275 readObject(rawObj)
0276 }