Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # SQLcontext.R: SQLContext-driven functions
0019 
0020 
0021 # Map top level R type to SQL type
0022 getInternalType <- function(x) {
0023   # class of POSIXlt is c("POSIXlt" "POSIXt")
0024   switch(class(x)[[1]],
0025          integer = "integer",
0026          character = "string",
0027          logical = "boolean",
0028          double = "double",
0029          numeric = "double",
0030          raw = "binary",
0031          list = "array",
0032          struct = "struct",
0033          environment = "map",
0034          Date = "date",
0035          POSIXlt = "timestamp",
0036          POSIXct = "timestamp",
0037          stop("Unsupported type for SparkDataFrame: ", class(x)))
0038 }
0039 
0040 #' return the SparkSession
0041 #' @noRd
0042 getSparkSession <- function() {
0043   if (exists(".sparkRsession", envir = .sparkREnv)) {
0044     get(".sparkRsession", envir = .sparkREnv)
0045   } else {
0046     stop("SparkSession not initialized")
0047   }
0048 }
0049 
0050 #' infer the SQL type
0051 #' @noRd
0052 infer_type <- function(x) {
0053   if (is.null(x)) {
0054     stop("can not infer type from NULL")
0055   }
0056 
0057   type <- getInternalType(x)
0058 
0059   if (type == "map") {
0060     stopifnot(length(x) > 0)
0061     key <- ls(x)[[1]]
0062     paste0("map<string,", infer_type(get(key, x)), ">")
0063   } else if (type == "array") {
0064     stopifnot(length(x) > 0)
0065 
0066     paste0("array<", infer_type(x[[1]]), ">")
0067   } else if (type == "struct") {
0068     stopifnot(length(x) > 0)
0069     names <- names(x)
0070     stopifnot(!is.null(names))
0071 
0072     type <- lapply(seq_along(x), function(i) {
0073       paste0(names[[i]], ":", infer_type(x[[i]]), ",")
0074     })
0075     type <- Reduce(paste0, type)
0076     type <- paste0("struct<", substr(type, 1, nchar(type) - 1), ">")
0077   } else if (length(x) > 1 && type != "binary") {
0078     paste0("array<", infer_type(x[[1]]), ">")
0079   } else {
0080     type
0081   }
0082 }
0083 
0084 #' Get Runtime Config from the current active SparkSession
0085 #'
0086 #' Get Runtime Config from the current active SparkSession.
0087 #' To change SparkSession Runtime Config, please see \code{sparkR.session()}.
0088 #'
0089 #' @param key (optional) The key of the config to get, if omitted, all config is returned
0090 #' @param defaultValue (optional) The default value of the config to return if they config is not
0091 #' set, if omitted, the call fails if the config key is not set
0092 #' @return a list of config values with keys as their names
0093 #' @rdname sparkR.conf
0094 #' @name sparkR.conf
0095 #' @examples
0096 #'\dontrun{
0097 #' sparkR.session()
0098 #' allConfigs <- sparkR.conf()
0099 #' masterValue <- unlist(sparkR.conf("spark.master"))
0100 #' namedConfig <- sparkR.conf("spark.executor.memory", "0g")
0101 #' }
0102 #' @note sparkR.conf since 2.0.0
0103 sparkR.conf <- function(key, defaultValue) {
0104   sparkSession <- getSparkSession()
0105   if (missing(key)) {
0106     m <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSessionConf", sparkSession)
0107     as.list(m, all.names = TRUE, sorted = TRUE)
0108   } else {
0109     conf <- callJMethod(sparkSession, "conf")
0110     value <- if (missing(defaultValue)) {
0111       tryCatch(callJMethod(conf, "get", key),
0112               error = function(e) {
0113                 estr <- as.character(e)
0114                 if (any(grepl("java.util.NoSuchElementException", estr, fixed = TRUE))) {
0115                   stop("Config '", key, "' is not set")
0116                 } else {
0117                   stop("Unknown error: ", estr)
0118                 }
0119               })
0120     } else {
0121       callJMethod(conf, "get", key, defaultValue)
0122     }
0123     l <- setNames(list(value), key)
0124     l
0125   }
0126 }
0127 
0128 #' Get version of Spark on which this application is running
0129 #'
0130 #' Get version of Spark on which this application is running.
0131 #'
0132 #' @return a character string of the Spark version
0133 #' @rdname sparkR.version
0134 #' @name sparkR.version
0135 #' @examples
0136 #'\dontrun{
0137 #' sparkR.session()
0138 #' version <- sparkR.version()
0139 #' }
0140 #' @note sparkR.version since 2.0.1
0141 sparkR.version <- function() {
0142   sparkSession <- getSparkSession()
0143   callJMethod(sparkSession, "version")
0144 }
0145 
0146 getDefaultSqlSource <- function() {
0147   l <- sparkR.conf("spark.sql.sources.default", "org.apache.spark.sql.parquet")
0148   l[["spark.sql.sources.default"]]
0149 }
0150 
0151 writeToFileInArrow <- function(fileName, rdf, numPartitions) {
0152   if (requireNamespace("arrow", quietly = TRUE)) {
0153     numPartitions <- if (!is.null(numPartitions)) {
0154       numToInt(numPartitions)
0155     } else {
0156       1
0157     }
0158 
0159     rdf_slices <- if (numPartitions > 1) {
0160       split(rdf, makeSplits(numPartitions, nrow(rdf)))
0161     } else {
0162       list(rdf)
0163     }
0164 
0165     stream_writer <- NULL
0166     tryCatch({
0167       for (rdf_slice in rdf_slices) {
0168         batch <- arrow::record_batch(rdf_slice)
0169         if (is.null(stream_writer)) {
0170           stream <- arrow::FileOutputStream$create(fileName)
0171           schema <- batch$schema
0172           stream_writer <- arrow::RecordBatchStreamWriter$create(stream, schema)
0173         }
0174 
0175         stream_writer$write_batch(batch)
0176       }
0177     },
0178     finally = {
0179       if (!is.null(stream_writer)) {
0180         stream_writer$close()
0181       }
0182     })
0183 
0184   } else {
0185     stop("'arrow' package should be installed.")
0186   }
0187 }
0188 
0189 getSchema <- function(schema, firstRow = NULL, rdd = NULL) {
0190   if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
0191     if (is.null(firstRow)) {
0192       stopifnot(!is.null(rdd))
0193       firstRow <- firstRDD(rdd)
0194     }
0195     names <- if (is.null(schema)) {
0196       names(firstRow)
0197     } else {
0198       as.list(schema)
0199     }
0200     if (is.null(names)) {
0201       names <- lapply(seq_len(length(firstRow)), function(x) {
0202         paste0("_", as.character(x))
0203       })
0204     }
0205 
0206     # SPAKR-SQL does not support '.' in column name, so replace it with '_'
0207     # TODO(davies): remove this once SPARK-2775 is fixed
0208     names <- lapply(names, function(n) {
0209       nn <- gsub(".", "_", n, fixed = TRUE)
0210       if (nn != n) {
0211         warning("Use ", nn, " instead of ", n, " as column name")
0212       }
0213       nn
0214     })
0215 
0216     types <- lapply(firstRow, infer_type)
0217     fields <- lapply(seq_len(length(firstRow)), function(i) {
0218       structField(names[[i]], types[[i]], TRUE)
0219     })
0220     schema <- do.call(structType, fields)
0221   } else {
0222     schema
0223   }
0224 }
0225 
0226 #' Create a SparkDataFrame
0227 #'
0228 #' Converts R data.frame or list into SparkDataFrame.
0229 #'
0230 #' @param data a list or data.frame.
0231 #' @param schema a list of column names or named list (StructType), optional.
0232 #' @param samplingRatio Currently not used.
0233 #' @param numPartitions the number of partitions of the SparkDataFrame. Defaults to 1, this is
0234 #'        limited by length of the list or number of rows of the data.frame
0235 #' @return A SparkDataFrame.
0236 #' @rdname createDataFrame
0237 #' @examples
0238 #'\dontrun{
0239 #' sparkR.session()
0240 #' df1 <- as.DataFrame(iris)
0241 #' df2 <- as.DataFrame(list(3,4,5,6))
0242 #' df3 <- createDataFrame(iris)
0243 #' df4 <- createDataFrame(cars, numPartitions = 2)
0244 #' }
0245 #' @name createDataFrame
0246 #' @note createDataFrame since 1.4.0
0247 # TODO(davies): support sampling and infer type from NA
0248 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
0249                             numPartitions = NULL) {
0250   sparkSession <- getSparkSession()
0251   arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
0252   useArrow <- FALSE
0253   firstRow <- NULL
0254 
0255   if (is.data.frame(data)) {
0256     # get the names of columns, they will be put into RDD
0257     if (is.null(schema)) {
0258       schema <- names(data)
0259     }
0260 
0261     # get rid of factor type
0262     cleanCols <- function(x) {
0263       if (is.factor(x)) {
0264         as.character(x)
0265       } else {
0266         x
0267       }
0268     }
0269     data[] <- lapply(data, cleanCols)
0270 
0271     args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
0272     if (arrowEnabled) {
0273       useArrow <- tryCatch({
0274         stopifnot(length(data) > 0)
0275         firstRow <- do.call(mapply, append(args, head(data, 1)))[[1]]
0276         schema <- getSchema(schema, firstRow = firstRow)
0277         checkSchemaInArrow(schema)
0278         fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp")
0279         tryCatch({
0280           writeToFileInArrow(fileName, data, numPartitions)
0281           jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0282                                      "readArrowStreamFromFile",
0283                                      sparkSession,
0284                                      fileName)
0285         },
0286         finally = {
0287           # File might not be created.
0288           suppressWarnings(file.remove(fileName))
0289         })
0290         TRUE
0291       },
0292       error = function(e) {
0293         warning("createDataFrame attempted Arrow optimization because ",
0294                 "'spark.sql.execution.arrow.sparkr.enabled' is set to true; however, ",
0295                 "failed, attempting non-optimization. Reason: ", e)
0296         FALSE
0297       })
0298     }
0299 
0300     if (!useArrow) {
0301       # Convert data into a list of rows. Each row is a list.
0302       # drop factors and wrap lists
0303       data <- setNames(as.list(data), NULL)
0304 
0305       # check if all columns have supported type
0306       lapply(data, getInternalType)
0307 
0308       # convert to rows
0309       data <- do.call(mapply, append(args, data))
0310       if (length(data) > 0) {
0311         firstRow <- data[[1]]
0312       }
0313     }
0314   }
0315 
0316   if (useArrow) {
0317     rdd <- jrddInArrow
0318   } else if (is.list(data)) {
0319     sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
0320     if (!is.null(numPartitions)) {
0321       rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions))
0322     } else {
0323       rdd <- parallelize(sc, data, numSlices = 1)
0324     }
0325   } else if (inherits(data, "RDD")) {
0326     rdd <- data
0327   } else {
0328     stop("unexpected type: ", class(data))
0329   }
0330 
0331   schema <- getSchema(schema, firstRow, rdd)
0332 
0333   stopifnot(class(schema) == "structType")
0334 
0335   if (useArrow) {
0336     sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0337                        "toDataFrame", rdd, schema$jobj, sparkSession)
0338   } else {
0339     jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
0340     srdd <- callJMethod(jrdd, "rdd")
0341     sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
0342                        srdd, schema$jobj, sparkSession)
0343   }
0344   dataFrame(sdf)
0345 }
0346 
0347 #' @rdname createDataFrame
0348 #' @aliases createDataFrame
0349 #' @note as.DataFrame since 1.6.0
0350 as.DataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) {
0351   createDataFrame(data, schema, samplingRatio, numPartitions)
0352 }
0353 
0354 #' toDF
0355 #'
0356 #' Converts an RDD to a SparkDataFrame by infer the types.
0357 #'
0358 #' @param x An RDD
0359 #' @rdname SparkDataFrame
0360 #' @noRd
0361 #' @examples
0362 #'\dontrun{
0363 #' sparkR.session()
0364 #' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
0365 #' df <- toDF(rdd)
0366 #'}
0367 setGeneric("toDF", function(x, ...) { standardGeneric("toDF") })
0368 
0369 setMethod("toDF", signature(x = "RDD"),
0370           function(x, ...) {
0371             createDataFrame(x, ...)
0372           })
0373 
0374 #' Create a SparkDataFrame from a JSON file.
0375 #'
0376 #' Loads a JSON file, returning the result as a SparkDataFrame
0377 #' By default, (\href{http://jsonlines.org/}{JSON Lines text format or newline-delimited JSON}
0378 #' ) is supported. For JSON (one record per file), set a named property \code{multiLine} to
0379 #' \code{TRUE}.
0380 #' It goes through the entire dataset once to determine the schema.
0381 #'
0382 #' @param path Path of file to read. A vector of multiple paths is allowed.
0383 #' @param ... additional external data source specific named properties.
0384 #' @return SparkDataFrame
0385 #' @rdname read.json
0386 #' @examples
0387 #'\dontrun{
0388 #' sparkR.session()
0389 #' path <- "path/to/file.json"
0390 #' df <- read.json(path)
0391 #' df <- read.json(path, multiLine = TRUE)
0392 #' }
0393 #' @name read.json
0394 #' @note read.json since 1.6.0
0395 read.json <- function(path, ...) {
0396   sparkSession <- getSparkSession()
0397   options <- varargsToStrEnv(...)
0398   # Allow the user to have a more flexible definition of the text file path
0399   paths <- as.list(suppressWarnings(normalizePath(path)))
0400   read <- callJMethod(sparkSession, "read")
0401   read <- callJMethod(read, "options", options)
0402   sdf <- handledCallJMethod(read, "json", paths)
0403   dataFrame(sdf)
0404 }
0405 
0406 #' Create a SparkDataFrame from an ORC file.
0407 #'
0408 #' Loads an ORC file, returning the result as a SparkDataFrame.
0409 #'
0410 #' @param path Path of file to read.
0411 #' @param ... additional external data source specific named properties.
0412 #' @return SparkDataFrame
0413 #' @rdname read.orc
0414 #' @name read.orc
0415 #' @note read.orc since 2.0.0
0416 read.orc <- function(path, ...) {
0417   sparkSession <- getSparkSession()
0418   options <- varargsToStrEnv(...)
0419   # Allow the user to have a more flexible definition of the ORC file path
0420   path <- suppressWarnings(normalizePath(path))
0421   read <- callJMethod(sparkSession, "read")
0422   read <- callJMethod(read, "options", options)
0423   sdf <- handledCallJMethod(read, "orc", path)
0424   dataFrame(sdf)
0425 }
0426 
0427 #' Create a SparkDataFrame from a Parquet file.
0428 #'
0429 #' Loads a Parquet file, returning the result as a SparkDataFrame.
0430 #'
0431 #' @param path path of file to read. A vector of multiple paths is allowed.
0432 #' @param ... additional data source specific named properties.
0433 #' @return SparkDataFrame
0434 #' @rdname read.parquet
0435 #' @name read.parquet
0436 #' @note read.parquet since 1.6.0
0437 read.parquet <- function(path, ...) {
0438   sparkSession <- getSparkSession()
0439   options <- varargsToStrEnv(...)
0440   # Allow the user to have a more flexible definition of the Parquet file path
0441   paths <- as.list(suppressWarnings(normalizePath(path)))
0442   read <- callJMethod(sparkSession, "read")
0443   read <- callJMethod(read, "options", options)
0444   sdf <- handledCallJMethod(read, "parquet", paths)
0445   dataFrame(sdf)
0446 }
0447 
0448 #' Create a SparkDataFrame from a text file.
0449 #'
0450 #' Loads text files and returns a SparkDataFrame whose schema starts with
0451 #' a string column named "value", and followed by partitioned columns if
0452 #' there are any. The text files must be encoded as UTF-8.
0453 #'
0454 #' Each line in the text file is a new row in the resulting SparkDataFrame.
0455 #'
0456 #' @param path Path of file to read. A vector of multiple paths is allowed.
0457 #' @param ... additional external data source specific named properties.
0458 #' @return SparkDataFrame
0459 #' @rdname read.text
0460 #' @examples
0461 #'\dontrun{
0462 #' sparkR.session()
0463 #' path <- "path/to/file.txt"
0464 #' df <- read.text(path)
0465 #' }
0466 #' @name read.text
0467 #' @note read.text since 1.6.1
0468 read.text <- function(path, ...) {
0469   sparkSession <- getSparkSession()
0470   options <- varargsToStrEnv(...)
0471   # Allow the user to have a more flexible definition of the text file path
0472   paths <- as.list(suppressWarnings(normalizePath(path)))
0473   read <- callJMethod(sparkSession, "read")
0474   read <- callJMethod(read, "options", options)
0475   sdf <- handledCallJMethod(read, "text", paths)
0476   dataFrame(sdf)
0477 }
0478 
0479 #' SQL Query
0480 #'
0481 #' Executes a SQL query using Spark, returning the result as a SparkDataFrame.
0482 #'
0483 #' @param sqlQuery A character vector containing the SQL query
0484 #' @return SparkDataFrame
0485 #' @rdname sql
0486 #' @examples
0487 #'\dontrun{
0488 #' sparkR.session()
0489 #' path <- "path/to/file.json"
0490 #' df <- read.json(path)
0491 #' createOrReplaceTempView(df, "table")
0492 #' new_df <- sql("SELECT * FROM table")
0493 #' }
0494 #' @name sql
0495 #' @note sql since 1.4.0
0496 sql <- function(sqlQuery) {
0497   sparkSession <- getSparkSession()
0498   sdf <- callJMethod(sparkSession, "sql", sqlQuery)
0499   dataFrame(sdf)
0500 }
0501 
0502 #' Create a SparkDataFrame from a SparkSQL table or view
0503 #'
0504 #' Returns the specified table or view as a SparkDataFrame. The table or view must already exist or
0505 #' have already been registered in the SparkSession.
0506 #'
0507 #' @param tableName the qualified or unqualified name that designates a table or view. If a database
0508 #'                  is specified, it identifies the table/view from the database.
0509 #'                  Otherwise, it first attempts to find a temporary view with the given name
0510 #'                  and then match the table/view from the current database.
0511 #' @return SparkDataFrame
0512 #' @rdname tableToDF
0513 #' @name tableToDF
0514 #' @examples
0515 #'\dontrun{
0516 #' sparkR.session()
0517 #' path <- "path/to/file.json"
0518 #' df <- read.json(path)
0519 #' createOrReplaceTempView(df, "table")
0520 #' new_df <- tableToDF("table")
0521 #' }
0522 #' @note tableToDF since 2.0.0
0523 tableToDF <- function(tableName) {
0524   sparkSession <- getSparkSession()
0525   sdf <- callJMethod(sparkSession, "table", tableName)
0526   dataFrame(sdf)
0527 }
0528 
0529 #' Load a SparkDataFrame
0530 #'
0531 #' Returns the dataset in a data source as a SparkDataFrame
0532 #'
0533 #' The data source is specified by the \code{source} and a set of options(...).
0534 #' If \code{source} is not specified, the default data source configured by
0535 #' "spark.sql.sources.default" will be used. \cr
0536 #' Similar to R read.csv, when \code{source} is "csv", by default, a value of "NA" will be
0537 #' interpreted as NA.
0538 #'
0539 #' @param path The path of files to load
0540 #' @param source The name of external data source
0541 #' @param schema The data schema defined in structType or a DDL-formatted string.
0542 #' @param na.strings Default string value for NA when source is "csv"
0543 #' @param ... additional external data source specific named properties.
0544 #' @return SparkDataFrame
0545 #' @rdname read.df
0546 #' @name read.df
0547 #' @seealso \link{read.json}
0548 #' @examples
0549 #'\dontrun{
0550 #' sparkR.session()
0551 #' df1 <- read.df("path/to/file.json", source = "json")
0552 #' schema <- structType(structField("name", "string"),
0553 #'                      structField("info", "map<string,double>"))
0554 #' df2 <- read.df(mapTypeJsonPath, "json", schema, multiLine = TRUE)
0555 #' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true")
0556 #' stringSchema <- "name STRING, info MAP<STRING, DOUBLE>"
0557 #' df4 <- read.df(mapTypeJsonPath, "json", stringSchema, multiLine = TRUE)
0558 #' }
0559 #' @note read.df since 1.4.0
0560 read.df <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
0561   if (!is.null(path) && !is.character(path)) {
0562     stop("path should be character, NULL or omitted.")
0563   }
0564   if (!is.null(source) && !is.character(source)) {
0565     stop("source should be character, NULL or omitted. It is the datasource specified ",
0566          "in 'spark.sql.sources.default' configuration by default.")
0567   }
0568   sparkSession <- getSparkSession()
0569   options <- varargsToStrEnv(...)
0570   if (!is.null(path)) {
0571     options[["path"]] <- path
0572   }
0573   if (is.null(source)) {
0574     source <- getDefaultSqlSource()
0575   }
0576   if (source == "csv" && is.null(options[["nullValue"]])) {
0577     options[["nullValue"]] <- na.strings
0578   }
0579   read <- callJMethod(sparkSession, "read")
0580   read <- callJMethod(read, "format", source)
0581   if (!is.null(schema)) {
0582     if (class(schema) == "structType") {
0583       read <- callJMethod(read, "schema", schema$jobj)
0584     } else if (is.character(schema)) {
0585       read <- callJMethod(read, "schema", schema)
0586     } else {
0587       stop("schema should be structType or character.")
0588     }
0589   }
0590   read <- callJMethod(read, "options", options)
0591   sdf <- handledCallJMethod(read, "load")
0592   dataFrame(sdf)
0593 }
0594 
0595 #' @rdname read.df
0596 #' @name loadDF
0597 #' @note loadDF since 1.6.0
0598 loadDF <- function(path = NULL, source = NULL, schema = NULL, ...) {
0599   read.df(path, source, schema, ...)
0600 }
0601 
0602 #' Create a SparkDataFrame representing the database table accessible via JDBC URL
0603 #'
0604 #' Additional JDBC database connection properties can be set (...)
0605 #'
0606 #' Only one of partitionColumn or predicates should be set. Partitions of the table will be
0607 #' retrieved in parallel based on the \code{numPartitions} or by the predicates.
0608 #'
0609 #' Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
0610 #' your external database systems.
0611 #'
0612 #' @param url JDBC database url of the form \code{jdbc:subprotocol:subname}
0613 #' @param tableName the name of the table in the external database
0614 #' @param partitionColumn the name of a column of numeric, date, or timestamp type
0615 #'                        that will be used for partitioning.
0616 #' @param lowerBound the minimum value of \code{partitionColumn} used to decide partition stride
0617 #' @param upperBound the maximum value of \code{partitionColumn} used to decide partition stride
0618 #' @param numPartitions the number of partitions, This, along with \code{lowerBound} (inclusive),
0619 #'                      \code{upperBound} (exclusive), form partition strides for generated WHERE
0620 #'                      clause expressions used to split the column \code{partitionColumn} evenly.
0621 #'                      This defaults to SparkContext.defaultParallelism when unset.
0622 #' @param predicates a list of conditions in the where clause; each one defines one partition
0623 #' @param ... additional JDBC database connection named properties.
0624 #' @return SparkDataFrame
0625 #' @rdname read.jdbc
0626 #' @name read.jdbc
0627 #' @examples
0628 #'\dontrun{
0629 #' sparkR.session()
0630 #' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
0631 #' df <- read.jdbc(jdbcUrl, "table", predicates = list("field<=123"), user = "username")
0632 #' df2 <- read.jdbc(jdbcUrl, "table2", partitionColumn = "index", lowerBound = 0,
0633 #'                  upperBound = 10000, user = "username", password = "password")
0634 #' }
0635 #' @note read.jdbc since 2.0.0
0636 read.jdbc <- function(url, tableName,
0637                       partitionColumn = NULL, lowerBound = NULL, upperBound = NULL,
0638                       numPartitions = 0L, predicates = list(), ...) {
0639   jprops <- varargsToJProperties(...)
0640   sparkSession <- getSparkSession()
0641   read <- callJMethod(sparkSession, "read")
0642   if (!is.null(partitionColumn)) {
0643     if (is.null(numPartitions) || numPartitions == 0) {
0644       sc <- callJMethod(sparkSession, "sparkContext")
0645       numPartitions <- callJMethod(sc, "defaultParallelism")
0646     } else {
0647       numPartitions <- numToInt(numPartitions)
0648     }
0649     sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
0650                               numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
0651   } else if (length(predicates) > 0) {
0652     sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)),
0653                               jprops)
0654   } else {
0655     sdf <- handledCallJMethod(read, "jdbc", url, tableName, jprops)
0656   }
0657   dataFrame(sdf)
0658 }
0659 
0660 #' Load a streaming SparkDataFrame
0661 #'
0662 #' Returns the dataset in a data source as a SparkDataFrame
0663 #'
0664 #' The data source is specified by the \code{source} and a set of options(...).
0665 #' If \code{source} is not specified, the default data source configured by
0666 #' "spark.sql.sources.default" will be used.
0667 #'
0668 #' @param source The name of external data source
0669 #' @param schema The data schema defined in structType or a DDL-formatted string, this is
0670 #'               required for file-based streaming data source
0671 #' @param ... additional external data source specific named options, for instance \code{path} for
0672 #'        file-based streaming data source. \code{timeZone} to indicate a timezone to be used to
0673 #'        parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
0674 #'        uses the default value, session local timezone.
0675 #' @return SparkDataFrame
0676 #' @rdname read.stream
0677 #' @name read.stream
0678 #' @seealso \link{write.stream}
0679 #' @examples
0680 #'\dontrun{
0681 #' sparkR.session()
0682 #' df <- read.stream("socket", host = "localhost", port = 9999)
0683 #' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
0684 #'
0685 #' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
0686 #' stringSchema <- "name STRING, info MAP<STRING, DOUBLE>"
0687 #' df1 <- read.stream("json", path = jsonDir, schema = stringSchema, maxFilesPerTrigger = 1)
0688 #' }
0689 #' @note read.stream since 2.2.0
0690 #' @note experimental
0691 read.stream <- function(source = NULL, schema = NULL, ...) {
0692   sparkSession <- getSparkSession()
0693   if (!is.null(source) && !is.character(source)) {
0694     stop("source should be character, NULL or omitted. It is the data source specified ",
0695          "in 'spark.sql.sources.default' configuration by default.")
0696   }
0697   if (is.null(source)) {
0698     source <- getDefaultSqlSource()
0699   }
0700   options <- varargsToStrEnv(...)
0701   read <- callJMethod(sparkSession, "readStream")
0702   read <- callJMethod(read, "format", source)
0703   if (!is.null(schema)) {
0704     if (class(schema) == "structType") {
0705       read <- callJMethod(read, "schema", schema$jobj)
0706     } else if (is.character(schema)) {
0707       read <- callJMethod(read, "schema", schema)
0708     } else {
0709       stop("schema should be structType or character.")
0710     }
0711   }
0712   read <- callJMethod(read, "options", options)
0713   sdf <- handledCallJMethod(read, "load")
0714   dataFrame(sdf)
0715 }