0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # SQLcontext.R: SQLContext-driven functions
0019
0020
0021 # Map top level R type to SQL type
0022 getInternalType <- function(x) {
0023 # class of POSIXlt is c("POSIXlt" "POSIXt")
0024 switch(class(x)[[1]],
0025 integer = "integer",
0026 character = "string",
0027 logical = "boolean",
0028 double = "double",
0029 numeric = "double",
0030 raw = "binary",
0031 list = "array",
0032 struct = "struct",
0033 environment = "map",
0034 Date = "date",
0035 POSIXlt = "timestamp",
0036 POSIXct = "timestamp",
0037 stop("Unsupported type for SparkDataFrame: ", class(x)))
0038 }
0039
0040 #' return the SparkSession
0041 #' @noRd
0042 getSparkSession <- function() {
0043 if (exists(".sparkRsession", envir = .sparkREnv)) {
0044 get(".sparkRsession", envir = .sparkREnv)
0045 } else {
0046 stop("SparkSession not initialized")
0047 }
0048 }
0049
0050 #' infer the SQL type
0051 #' @noRd
0052 infer_type <- function(x) {
0053 if (is.null(x)) {
0054 stop("can not infer type from NULL")
0055 }
0056
0057 type <- getInternalType(x)
0058
0059 if (type == "map") {
0060 stopifnot(length(x) > 0)
0061 key <- ls(x)[[1]]
0062 paste0("map<string,", infer_type(get(key, x)), ">")
0063 } else if (type == "array") {
0064 stopifnot(length(x) > 0)
0065
0066 paste0("array<", infer_type(x[[1]]), ">")
0067 } else if (type == "struct") {
0068 stopifnot(length(x) > 0)
0069 names <- names(x)
0070 stopifnot(!is.null(names))
0071
0072 type <- lapply(seq_along(x), function(i) {
0073 paste0(names[[i]], ":", infer_type(x[[i]]), ",")
0074 })
0075 type <- Reduce(paste0, type)
0076 type <- paste0("struct<", substr(type, 1, nchar(type) - 1), ">")
0077 } else if (length(x) > 1 && type != "binary") {
0078 paste0("array<", infer_type(x[[1]]), ">")
0079 } else {
0080 type
0081 }
0082 }
0083
0084 #' Get Runtime Config from the current active SparkSession
0085 #'
0086 #' Get Runtime Config from the current active SparkSession.
0087 #' To change SparkSession Runtime Config, please see \code{sparkR.session()}.
0088 #'
0089 #' @param key (optional) The key of the config to get, if omitted, all config is returned
0090 #' @param defaultValue (optional) The default value of the config to return if they config is not
0091 #' set, if omitted, the call fails if the config key is not set
0092 #' @return a list of config values with keys as their names
0093 #' @rdname sparkR.conf
0094 #' @name sparkR.conf
0095 #' @examples
0096 #'\dontrun{
0097 #' sparkR.session()
0098 #' allConfigs <- sparkR.conf()
0099 #' masterValue <- unlist(sparkR.conf("spark.master"))
0100 #' namedConfig <- sparkR.conf("spark.executor.memory", "0g")
0101 #' }
0102 #' @note sparkR.conf since 2.0.0
0103 sparkR.conf <- function(key, defaultValue) {
0104 sparkSession <- getSparkSession()
0105 if (missing(key)) {
0106 m <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSessionConf", sparkSession)
0107 as.list(m, all.names = TRUE, sorted = TRUE)
0108 } else {
0109 conf <- callJMethod(sparkSession, "conf")
0110 value <- if (missing(defaultValue)) {
0111 tryCatch(callJMethod(conf, "get", key),
0112 error = function(e) {
0113 estr <- as.character(e)
0114 if (any(grepl("java.util.NoSuchElementException", estr, fixed = TRUE))) {
0115 stop("Config '", key, "' is not set")
0116 } else {
0117 stop("Unknown error: ", estr)
0118 }
0119 })
0120 } else {
0121 callJMethod(conf, "get", key, defaultValue)
0122 }
0123 l <- setNames(list(value), key)
0124 l
0125 }
0126 }
0127
0128 #' Get version of Spark on which this application is running
0129 #'
0130 #' Get version of Spark on which this application is running.
0131 #'
0132 #' @return a character string of the Spark version
0133 #' @rdname sparkR.version
0134 #' @name sparkR.version
0135 #' @examples
0136 #'\dontrun{
0137 #' sparkR.session()
0138 #' version <- sparkR.version()
0139 #' }
0140 #' @note sparkR.version since 2.0.1
0141 sparkR.version <- function() {
0142 sparkSession <- getSparkSession()
0143 callJMethod(sparkSession, "version")
0144 }
0145
0146 getDefaultSqlSource <- function() {
0147 l <- sparkR.conf("spark.sql.sources.default", "org.apache.spark.sql.parquet")
0148 l[["spark.sql.sources.default"]]
0149 }
0150
0151 writeToFileInArrow <- function(fileName, rdf, numPartitions) {
0152 if (requireNamespace("arrow", quietly = TRUE)) {
0153 numPartitions <- if (!is.null(numPartitions)) {
0154 numToInt(numPartitions)
0155 } else {
0156 1
0157 }
0158
0159 rdf_slices <- if (numPartitions > 1) {
0160 split(rdf, makeSplits(numPartitions, nrow(rdf)))
0161 } else {
0162 list(rdf)
0163 }
0164
0165 stream_writer <- NULL
0166 tryCatch({
0167 for (rdf_slice in rdf_slices) {
0168 batch <- arrow::record_batch(rdf_slice)
0169 if (is.null(stream_writer)) {
0170 stream <- arrow::FileOutputStream$create(fileName)
0171 schema <- batch$schema
0172 stream_writer <- arrow::RecordBatchStreamWriter$create(stream, schema)
0173 }
0174
0175 stream_writer$write_batch(batch)
0176 }
0177 },
0178 finally = {
0179 if (!is.null(stream_writer)) {
0180 stream_writer$close()
0181 }
0182 })
0183
0184 } else {
0185 stop("'arrow' package should be installed.")
0186 }
0187 }
0188
0189 getSchema <- function(schema, firstRow = NULL, rdd = NULL) {
0190 if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
0191 if (is.null(firstRow)) {
0192 stopifnot(!is.null(rdd))
0193 firstRow <- firstRDD(rdd)
0194 }
0195 names <- if (is.null(schema)) {
0196 names(firstRow)
0197 } else {
0198 as.list(schema)
0199 }
0200 if (is.null(names)) {
0201 names <- lapply(seq_len(length(firstRow)), function(x) {
0202 paste0("_", as.character(x))
0203 })
0204 }
0205
0206 # SPAKR-SQL does not support '.' in column name, so replace it with '_'
0207 # TODO(davies): remove this once SPARK-2775 is fixed
0208 names <- lapply(names, function(n) {
0209 nn <- gsub(".", "_", n, fixed = TRUE)
0210 if (nn != n) {
0211 warning("Use ", nn, " instead of ", n, " as column name")
0212 }
0213 nn
0214 })
0215
0216 types <- lapply(firstRow, infer_type)
0217 fields <- lapply(seq_len(length(firstRow)), function(i) {
0218 structField(names[[i]], types[[i]], TRUE)
0219 })
0220 schema <- do.call(structType, fields)
0221 } else {
0222 schema
0223 }
0224 }
0225
0226 #' Create a SparkDataFrame
0227 #'
0228 #' Converts R data.frame or list into SparkDataFrame.
0229 #'
0230 #' @param data a list or data.frame.
0231 #' @param schema a list of column names or named list (StructType), optional.
0232 #' @param samplingRatio Currently not used.
0233 #' @param numPartitions the number of partitions of the SparkDataFrame. Defaults to 1, this is
0234 #' limited by length of the list or number of rows of the data.frame
0235 #' @return A SparkDataFrame.
0236 #' @rdname createDataFrame
0237 #' @examples
0238 #'\dontrun{
0239 #' sparkR.session()
0240 #' df1 <- as.DataFrame(iris)
0241 #' df2 <- as.DataFrame(list(3,4,5,6))
0242 #' df3 <- createDataFrame(iris)
0243 #' df4 <- createDataFrame(cars, numPartitions = 2)
0244 #' }
0245 #' @name createDataFrame
0246 #' @note createDataFrame since 1.4.0
0247 # TODO(davies): support sampling and infer type from NA
0248 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
0249 numPartitions = NULL) {
0250 sparkSession <- getSparkSession()
0251 arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
0252 useArrow <- FALSE
0253 firstRow <- NULL
0254
0255 if (is.data.frame(data)) {
0256 # get the names of columns, they will be put into RDD
0257 if (is.null(schema)) {
0258 schema <- names(data)
0259 }
0260
0261 # get rid of factor type
0262 cleanCols <- function(x) {
0263 if (is.factor(x)) {
0264 as.character(x)
0265 } else {
0266 x
0267 }
0268 }
0269 data[] <- lapply(data, cleanCols)
0270
0271 args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
0272 if (arrowEnabled) {
0273 useArrow <- tryCatch({
0274 stopifnot(length(data) > 0)
0275 firstRow <- do.call(mapply, append(args, head(data, 1)))[[1]]
0276 schema <- getSchema(schema, firstRow = firstRow)
0277 checkSchemaInArrow(schema)
0278 fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp")
0279 tryCatch({
0280 writeToFileInArrow(fileName, data, numPartitions)
0281 jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0282 "readArrowStreamFromFile",
0283 sparkSession,
0284 fileName)
0285 },
0286 finally = {
0287 # File might not be created.
0288 suppressWarnings(file.remove(fileName))
0289 })
0290 TRUE
0291 },
0292 error = function(e) {
0293 warning("createDataFrame attempted Arrow optimization because ",
0294 "'spark.sql.execution.arrow.sparkr.enabled' is set to true; however, ",
0295 "failed, attempting non-optimization. Reason: ", e)
0296 FALSE
0297 })
0298 }
0299
0300 if (!useArrow) {
0301 # Convert data into a list of rows. Each row is a list.
0302 # drop factors and wrap lists
0303 data <- setNames(as.list(data), NULL)
0304
0305 # check if all columns have supported type
0306 lapply(data, getInternalType)
0307
0308 # convert to rows
0309 data <- do.call(mapply, append(args, data))
0310 if (length(data) > 0) {
0311 firstRow <- data[[1]]
0312 }
0313 }
0314 }
0315
0316 if (useArrow) {
0317 rdd <- jrddInArrow
0318 } else if (is.list(data)) {
0319 sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
0320 if (!is.null(numPartitions)) {
0321 rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions))
0322 } else {
0323 rdd <- parallelize(sc, data, numSlices = 1)
0324 }
0325 } else if (inherits(data, "RDD")) {
0326 rdd <- data
0327 } else {
0328 stop("unexpected type: ", class(data))
0329 }
0330
0331 schema <- getSchema(schema, firstRow, rdd)
0332
0333 stopifnot(class(schema) == "structType")
0334
0335 if (useArrow) {
0336 sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0337 "toDataFrame", rdd, schema$jobj, sparkSession)
0338 } else {
0339 jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
0340 srdd <- callJMethod(jrdd, "rdd")
0341 sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
0342 srdd, schema$jobj, sparkSession)
0343 }
0344 dataFrame(sdf)
0345 }
0346
0347 #' @rdname createDataFrame
0348 #' @aliases createDataFrame
0349 #' @note as.DataFrame since 1.6.0
0350 as.DataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) {
0351 createDataFrame(data, schema, samplingRatio, numPartitions)
0352 }
0353
0354 #' toDF
0355 #'
0356 #' Converts an RDD to a SparkDataFrame by infer the types.
0357 #'
0358 #' @param x An RDD
0359 #' @rdname SparkDataFrame
0360 #' @noRd
0361 #' @examples
0362 #'\dontrun{
0363 #' sparkR.session()
0364 #' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
0365 #' df <- toDF(rdd)
0366 #'}
0367 setGeneric("toDF", function(x, ...) { standardGeneric("toDF") })
0368
0369 setMethod("toDF", signature(x = "RDD"),
0370 function(x, ...) {
0371 createDataFrame(x, ...)
0372 })
0373
0374 #' Create a SparkDataFrame from a JSON file.
0375 #'
0376 #' Loads a JSON file, returning the result as a SparkDataFrame
0377 #' By default, (\href{http://jsonlines.org/}{JSON Lines text format or newline-delimited JSON}
0378 #' ) is supported. For JSON (one record per file), set a named property \code{multiLine} to
0379 #' \code{TRUE}.
0380 #' It goes through the entire dataset once to determine the schema.
0381 #'
0382 #' @param path Path of file to read. A vector of multiple paths is allowed.
0383 #' @param ... additional external data source specific named properties.
0384 #' @return SparkDataFrame
0385 #' @rdname read.json
0386 #' @examples
0387 #'\dontrun{
0388 #' sparkR.session()
0389 #' path <- "path/to/file.json"
0390 #' df <- read.json(path)
0391 #' df <- read.json(path, multiLine = TRUE)
0392 #' }
0393 #' @name read.json
0394 #' @note read.json since 1.6.0
0395 read.json <- function(path, ...) {
0396 sparkSession <- getSparkSession()
0397 options <- varargsToStrEnv(...)
0398 # Allow the user to have a more flexible definition of the text file path
0399 paths <- as.list(suppressWarnings(normalizePath(path)))
0400 read <- callJMethod(sparkSession, "read")
0401 read <- callJMethod(read, "options", options)
0402 sdf <- handledCallJMethod(read, "json", paths)
0403 dataFrame(sdf)
0404 }
0405
0406 #' Create a SparkDataFrame from an ORC file.
0407 #'
0408 #' Loads an ORC file, returning the result as a SparkDataFrame.
0409 #'
0410 #' @param path Path of file to read.
0411 #' @param ... additional external data source specific named properties.
0412 #' @return SparkDataFrame
0413 #' @rdname read.orc
0414 #' @name read.orc
0415 #' @note read.orc since 2.0.0
0416 read.orc <- function(path, ...) {
0417 sparkSession <- getSparkSession()
0418 options <- varargsToStrEnv(...)
0419 # Allow the user to have a more flexible definition of the ORC file path
0420 path <- suppressWarnings(normalizePath(path))
0421 read <- callJMethod(sparkSession, "read")
0422 read <- callJMethod(read, "options", options)
0423 sdf <- handledCallJMethod(read, "orc", path)
0424 dataFrame(sdf)
0425 }
0426
0427 #' Create a SparkDataFrame from a Parquet file.
0428 #'
0429 #' Loads a Parquet file, returning the result as a SparkDataFrame.
0430 #'
0431 #' @param path path of file to read. A vector of multiple paths is allowed.
0432 #' @param ... additional data source specific named properties.
0433 #' @return SparkDataFrame
0434 #' @rdname read.parquet
0435 #' @name read.parquet
0436 #' @note read.parquet since 1.6.0
0437 read.parquet <- function(path, ...) {
0438 sparkSession <- getSparkSession()
0439 options <- varargsToStrEnv(...)
0440 # Allow the user to have a more flexible definition of the Parquet file path
0441 paths <- as.list(suppressWarnings(normalizePath(path)))
0442 read <- callJMethod(sparkSession, "read")
0443 read <- callJMethod(read, "options", options)
0444 sdf <- handledCallJMethod(read, "parquet", paths)
0445 dataFrame(sdf)
0446 }
0447
0448 #' Create a SparkDataFrame from a text file.
0449 #'
0450 #' Loads text files and returns a SparkDataFrame whose schema starts with
0451 #' a string column named "value", and followed by partitioned columns if
0452 #' there are any. The text files must be encoded as UTF-8.
0453 #'
0454 #' Each line in the text file is a new row in the resulting SparkDataFrame.
0455 #'
0456 #' @param path Path of file to read. A vector of multiple paths is allowed.
0457 #' @param ... additional external data source specific named properties.
0458 #' @return SparkDataFrame
0459 #' @rdname read.text
0460 #' @examples
0461 #'\dontrun{
0462 #' sparkR.session()
0463 #' path <- "path/to/file.txt"
0464 #' df <- read.text(path)
0465 #' }
0466 #' @name read.text
0467 #' @note read.text since 1.6.1
0468 read.text <- function(path, ...) {
0469 sparkSession <- getSparkSession()
0470 options <- varargsToStrEnv(...)
0471 # Allow the user to have a more flexible definition of the text file path
0472 paths <- as.list(suppressWarnings(normalizePath(path)))
0473 read <- callJMethod(sparkSession, "read")
0474 read <- callJMethod(read, "options", options)
0475 sdf <- handledCallJMethod(read, "text", paths)
0476 dataFrame(sdf)
0477 }
0478
0479 #' SQL Query
0480 #'
0481 #' Executes a SQL query using Spark, returning the result as a SparkDataFrame.
0482 #'
0483 #' @param sqlQuery A character vector containing the SQL query
0484 #' @return SparkDataFrame
0485 #' @rdname sql
0486 #' @examples
0487 #'\dontrun{
0488 #' sparkR.session()
0489 #' path <- "path/to/file.json"
0490 #' df <- read.json(path)
0491 #' createOrReplaceTempView(df, "table")
0492 #' new_df <- sql("SELECT * FROM table")
0493 #' }
0494 #' @name sql
0495 #' @note sql since 1.4.0
0496 sql <- function(sqlQuery) {
0497 sparkSession <- getSparkSession()
0498 sdf <- callJMethod(sparkSession, "sql", sqlQuery)
0499 dataFrame(sdf)
0500 }
0501
0502 #' Create a SparkDataFrame from a SparkSQL table or view
0503 #'
0504 #' Returns the specified table or view as a SparkDataFrame. The table or view must already exist or
0505 #' have already been registered in the SparkSession.
0506 #'
0507 #' @param tableName the qualified or unqualified name that designates a table or view. If a database
0508 #' is specified, it identifies the table/view from the database.
0509 #' Otherwise, it first attempts to find a temporary view with the given name
0510 #' and then match the table/view from the current database.
0511 #' @return SparkDataFrame
0512 #' @rdname tableToDF
0513 #' @name tableToDF
0514 #' @examples
0515 #'\dontrun{
0516 #' sparkR.session()
0517 #' path <- "path/to/file.json"
0518 #' df <- read.json(path)
0519 #' createOrReplaceTempView(df, "table")
0520 #' new_df <- tableToDF("table")
0521 #' }
0522 #' @note tableToDF since 2.0.0
0523 tableToDF <- function(tableName) {
0524 sparkSession <- getSparkSession()
0525 sdf <- callJMethod(sparkSession, "table", tableName)
0526 dataFrame(sdf)
0527 }
0528
0529 #' Load a SparkDataFrame
0530 #'
0531 #' Returns the dataset in a data source as a SparkDataFrame
0532 #'
0533 #' The data source is specified by the \code{source} and a set of options(...).
0534 #' If \code{source} is not specified, the default data source configured by
0535 #' "spark.sql.sources.default" will be used. \cr
0536 #' Similar to R read.csv, when \code{source} is "csv", by default, a value of "NA" will be
0537 #' interpreted as NA.
0538 #'
0539 #' @param path The path of files to load
0540 #' @param source The name of external data source
0541 #' @param schema The data schema defined in structType or a DDL-formatted string.
0542 #' @param na.strings Default string value for NA when source is "csv"
0543 #' @param ... additional external data source specific named properties.
0544 #' @return SparkDataFrame
0545 #' @rdname read.df
0546 #' @name read.df
0547 #' @seealso \link{read.json}
0548 #' @examples
0549 #'\dontrun{
0550 #' sparkR.session()
0551 #' df1 <- read.df("path/to/file.json", source = "json")
0552 #' schema <- structType(structField("name", "string"),
0553 #' structField("info", "map<string,double>"))
0554 #' df2 <- read.df(mapTypeJsonPath, "json", schema, multiLine = TRUE)
0555 #' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true")
0556 #' stringSchema <- "name STRING, info MAP<STRING, DOUBLE>"
0557 #' df4 <- read.df(mapTypeJsonPath, "json", stringSchema, multiLine = TRUE)
0558 #' }
0559 #' @note read.df since 1.4.0
0560 read.df <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
0561 if (!is.null(path) && !is.character(path)) {
0562 stop("path should be character, NULL or omitted.")
0563 }
0564 if (!is.null(source) && !is.character(source)) {
0565 stop("source should be character, NULL or omitted. It is the datasource specified ",
0566 "in 'spark.sql.sources.default' configuration by default.")
0567 }
0568 sparkSession <- getSparkSession()
0569 options <- varargsToStrEnv(...)
0570 if (!is.null(path)) {
0571 options[["path"]] <- path
0572 }
0573 if (is.null(source)) {
0574 source <- getDefaultSqlSource()
0575 }
0576 if (source == "csv" && is.null(options[["nullValue"]])) {
0577 options[["nullValue"]] <- na.strings
0578 }
0579 read <- callJMethod(sparkSession, "read")
0580 read <- callJMethod(read, "format", source)
0581 if (!is.null(schema)) {
0582 if (class(schema) == "structType") {
0583 read <- callJMethod(read, "schema", schema$jobj)
0584 } else if (is.character(schema)) {
0585 read <- callJMethod(read, "schema", schema)
0586 } else {
0587 stop("schema should be structType or character.")
0588 }
0589 }
0590 read <- callJMethod(read, "options", options)
0591 sdf <- handledCallJMethod(read, "load")
0592 dataFrame(sdf)
0593 }
0594
0595 #' @rdname read.df
0596 #' @name loadDF
0597 #' @note loadDF since 1.6.0
0598 loadDF <- function(path = NULL, source = NULL, schema = NULL, ...) {
0599 read.df(path, source, schema, ...)
0600 }
0601
0602 #' Create a SparkDataFrame representing the database table accessible via JDBC URL
0603 #'
0604 #' Additional JDBC database connection properties can be set (...)
0605 #'
0606 #' Only one of partitionColumn or predicates should be set. Partitions of the table will be
0607 #' retrieved in parallel based on the \code{numPartitions} or by the predicates.
0608 #'
0609 #' Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
0610 #' your external database systems.
0611 #'
0612 #' @param url JDBC database url of the form \code{jdbc:subprotocol:subname}
0613 #' @param tableName the name of the table in the external database
0614 #' @param partitionColumn the name of a column of numeric, date, or timestamp type
0615 #' that will be used for partitioning.
0616 #' @param lowerBound the minimum value of \code{partitionColumn} used to decide partition stride
0617 #' @param upperBound the maximum value of \code{partitionColumn} used to decide partition stride
0618 #' @param numPartitions the number of partitions, This, along with \code{lowerBound} (inclusive),
0619 #' \code{upperBound} (exclusive), form partition strides for generated WHERE
0620 #' clause expressions used to split the column \code{partitionColumn} evenly.
0621 #' This defaults to SparkContext.defaultParallelism when unset.
0622 #' @param predicates a list of conditions in the where clause; each one defines one partition
0623 #' @param ... additional JDBC database connection named properties.
0624 #' @return SparkDataFrame
0625 #' @rdname read.jdbc
0626 #' @name read.jdbc
0627 #' @examples
0628 #'\dontrun{
0629 #' sparkR.session()
0630 #' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
0631 #' df <- read.jdbc(jdbcUrl, "table", predicates = list("field<=123"), user = "username")
0632 #' df2 <- read.jdbc(jdbcUrl, "table2", partitionColumn = "index", lowerBound = 0,
0633 #' upperBound = 10000, user = "username", password = "password")
0634 #' }
0635 #' @note read.jdbc since 2.0.0
0636 read.jdbc <- function(url, tableName,
0637 partitionColumn = NULL, lowerBound = NULL, upperBound = NULL,
0638 numPartitions = 0L, predicates = list(), ...) {
0639 jprops <- varargsToJProperties(...)
0640 sparkSession <- getSparkSession()
0641 read <- callJMethod(sparkSession, "read")
0642 if (!is.null(partitionColumn)) {
0643 if (is.null(numPartitions) || numPartitions == 0) {
0644 sc <- callJMethod(sparkSession, "sparkContext")
0645 numPartitions <- callJMethod(sc, "defaultParallelism")
0646 } else {
0647 numPartitions <- numToInt(numPartitions)
0648 }
0649 sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
0650 numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
0651 } else if (length(predicates) > 0) {
0652 sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)),
0653 jprops)
0654 } else {
0655 sdf <- handledCallJMethod(read, "jdbc", url, tableName, jprops)
0656 }
0657 dataFrame(sdf)
0658 }
0659
0660 #' Load a streaming SparkDataFrame
0661 #'
0662 #' Returns the dataset in a data source as a SparkDataFrame
0663 #'
0664 #' The data source is specified by the \code{source} and a set of options(...).
0665 #' If \code{source} is not specified, the default data source configured by
0666 #' "spark.sql.sources.default" will be used.
0667 #'
0668 #' @param source The name of external data source
0669 #' @param schema The data schema defined in structType or a DDL-formatted string, this is
0670 #' required for file-based streaming data source
0671 #' @param ... additional external data source specific named options, for instance \code{path} for
0672 #' file-based streaming data source. \code{timeZone} to indicate a timezone to be used to
0673 #' parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
0674 #' uses the default value, session local timezone.
0675 #' @return SparkDataFrame
0676 #' @rdname read.stream
0677 #' @name read.stream
0678 #' @seealso \link{write.stream}
0679 #' @examples
0680 #'\dontrun{
0681 #' sparkR.session()
0682 #' df <- read.stream("socket", host = "localhost", port = 9999)
0683 #' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
0684 #'
0685 #' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
0686 #' stringSchema <- "name STRING, info MAP<STRING, DOUBLE>"
0687 #' df1 <- read.stream("json", path = jsonDir, schema = stringSchema, maxFilesPerTrigger = 1)
0688 #' }
0689 #' @note read.stream since 2.2.0
0690 #' @note experimental
0691 read.stream <- function(source = NULL, schema = NULL, ...) {
0692 sparkSession <- getSparkSession()
0693 if (!is.null(source) && !is.character(source)) {
0694 stop("source should be character, NULL or omitted. It is the data source specified ",
0695 "in 'spark.sql.sources.default' configuration by default.")
0696 }
0697 if (is.null(source)) {
0698 source <- getDefaultSqlSource()
0699 }
0700 options <- varargsToStrEnv(...)
0701 read <- callJMethod(sparkSession, "readStream")
0702 read <- callJMethod(read, "format", source)
0703 if (!is.null(schema)) {
0704 if (class(schema) == "structType") {
0705 read <- callJMethod(read, "schema", schema$jobj)
0706 } else if (is.character(schema)) {
0707 read <- callJMethod(read, "schema", schema)
0708 } else {
0709 stop("schema should be structType or character.")
0710 }
0711 }
0712 read <- callJMethod(read, "options", options)
0713 sdf <- handledCallJMethod(read, "load")
0714 dataFrame(sdf)
0715 }