Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # catalog.R: SparkSession catalog functions
0019 
0020 #' (Deprecated) Create an external table
0021 #'
0022 #' Creates an external table based on the dataset in a data source,
0023 #' Returns a SparkDataFrame associated with the external table.
0024 #'
0025 #' The data source is specified by the \code{source} and a set of options(...).
0026 #' If \code{source} is not specified, the default data source configured by
0027 #' "spark.sql.sources.default" will be used.
0028 #'
0029 #' @param tableName a name of the table.
0030 #' @param path the path of files to load.
0031 #' @param source the name of external data source.
0032 #' @param schema the schema of the data required for some data sources.
0033 #' @param ... additional argument(s) passed to the method.
0034 #' @return A SparkDataFrame.
0035 #' @rdname createExternalTable-deprecated
0036 #' @seealso \link{createTable}
0037 #' @examples
0038 #'\dontrun{
0039 #' sparkR.session()
0040 #' df <- createExternalTable("myjson", path="path/to/json", source="json", schema)
0041 #' }
0042 #' @name createExternalTable
0043 #' @note createExternalTable since 1.4.0
0044 createExternalTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
0045   .Deprecated("createTable", old = "createExternalTable")
0046   createTable(tableName, path, source, schema, ...)
0047 }
0048 
0049 #' Creates a table based on the dataset in a data source
0050 #'
0051 #' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated with
0052 #' the table.
0053 #'
0054 #' The data source is specified by the \code{source} and a set of options(...).
0055 #' If \code{source} is not specified, the default data source configured by
0056 #' "spark.sql.sources.default" will be used. When a \code{path} is specified, an external table is
0057 #' created from the data at the given path. Otherwise a managed table is created.
0058 #'
0059 #' @param tableName the qualified or unqualified name that designates a table. If no database
0060 #'                  identifier is provided, it refers to a table in the current database.
0061 #' @param path (optional) the path of files to load.
0062 #' @param source (optional) the name of the data source.
0063 #' @param schema (optional) the schema of the data required for some data sources.
0064 #' @param ... additional named parameters as options for the data source.
0065 #' @return A SparkDataFrame.
0066 #' @rdname createTable
0067 #' @examples
0068 #'\dontrun{
0069 #' sparkR.session()
0070 #' df <- createTable("myjson", path="path/to/json", source="json", schema)
0071 #'
0072 #' createTable("people", source = "json", schema = schema)
0073 #' insertInto(df, "people")
0074 #' }
0075 #' @name createTable
0076 #' @note createTable since 2.2.0
0077 createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
0078   sparkSession <- getSparkSession()
0079   options <- varargsToStrEnv(...)
0080   if (!is.null(path)) {
0081     options[["path"]] <- path
0082   }
0083   if (is.null(source)) {
0084     source <- getDefaultSqlSource()
0085   }
0086   catalog <- callJMethod(sparkSession, "catalog")
0087   if (is.null(schema)) {
0088     sdf <- callJMethod(catalog, "createTable", tableName, source, options)
0089   } else if (class(schema) == "structType") {
0090     sdf <- callJMethod(catalog, "createTable", tableName, source, schema$jobj, options)
0091   } else {
0092     stop("schema must be a structType.")
0093   }
0094   dataFrame(sdf)
0095 }
0096 
0097 #' Cache Table
0098 #'
0099 #' Caches the specified table in-memory.
0100 #'
0101 #' @param tableName the qualified or unqualified name that designates a table. If no database
0102 #'                  identifier is provided, it refers to a table in the current database.
0103 #' @return SparkDataFrame
0104 #' @rdname cacheTable
0105 #' @examples
0106 #'\dontrun{
0107 #' sparkR.session()
0108 #' path <- "path/to/file.json"
0109 #' df <- read.json(path)
0110 #' createOrReplaceTempView(df, "table")
0111 #' cacheTable("table")
0112 #' }
0113 #' @name cacheTable
0114 #' @note cacheTable since 1.4.0
0115 cacheTable <- function(tableName) {
0116   sparkSession <- getSparkSession()
0117   catalog <- callJMethod(sparkSession, "catalog")
0118   invisible(handledCallJMethod(catalog, "cacheTable", tableName))
0119 }
0120 
0121 #' Uncache Table
0122 #'
0123 #' Removes the specified table from the in-memory cache.
0124 #'
0125 #' @param tableName the qualified or unqualified name that designates a table. If no database
0126 #'                  identifier is provided, it refers to a table in the current database.
0127 #' @return SparkDataFrame
0128 #' @rdname uncacheTable
0129 #' @examples
0130 #'\dontrun{
0131 #' sparkR.session()
0132 #' path <- "path/to/file.json"
0133 #' df <- read.json(path)
0134 #' createOrReplaceTempView(df, "table")
0135 #' uncacheTable("table")
0136 #' }
0137 #' @name uncacheTable
0138 #' @note uncacheTable since 1.4.0
0139 uncacheTable <- function(tableName) {
0140   sparkSession <- getSparkSession()
0141   catalog <- callJMethod(sparkSession, "catalog")
0142   invisible(handledCallJMethod(catalog, "uncacheTable", tableName))
0143 }
0144 
0145 #' Clear Cache
0146 #'
0147 #' Removes all cached tables from the in-memory cache.
0148 #'
0149 #' @rdname clearCache
0150 #' @examples
0151 #' \dontrun{
0152 #' clearCache()
0153 #' }
0154 #' @name clearCache
0155 #' @note clearCache since 1.4.0
0156 clearCache <- function() {
0157   sparkSession <- getSparkSession()
0158   catalog <- callJMethod(sparkSession, "catalog")
0159   invisible(callJMethod(catalog, "clearCache"))
0160 }
0161 
0162 #' (Deprecated) Drop Temporary Table
0163 #'
0164 #' Drops the temporary table with the given table name in the catalog.
0165 #' If the table has been cached/persisted before, it's also unpersisted.
0166 #'
0167 #' @param tableName The name of the SparkSQL table to be dropped.
0168 #' @seealso \link{dropTempView}
0169 #' @rdname dropTempTable-deprecated
0170 #' @examples
0171 #' \dontrun{
0172 #' sparkR.session()
0173 #' df <- read.df(path, "parquet")
0174 #' createOrReplaceTempView(df, "table")
0175 #' dropTempTable("table")
0176 #' }
0177 #' @name dropTempTable
0178 #' @note dropTempTable since 1.4.0
0179 dropTempTable <- function(tableName) {
0180   .Deprecated("dropTempView", old = "dropTempTable")
0181   if (class(tableName) != "character") {
0182     stop("tableName must be a string.")
0183   }
0184   dropTempView(tableName)
0185 }
0186 
0187 #' Drops the temporary view with the given view name in the catalog.
0188 #'
0189 #' Drops the temporary view with the given view name in the catalog.
0190 #' If the view has been cached before, then it will also be uncached.
0191 #'
0192 #' @param viewName the name of the temporary view to be dropped.
0193 #' @return TRUE if the view is dropped successfully, FALSE otherwise.
0194 #' @rdname dropTempView
0195 #' @name dropTempView
0196 #' @examples
0197 #' \dontrun{
0198 #' sparkR.session()
0199 #' df <- read.df(path, "parquet")
0200 #' createOrReplaceTempView(df, "table")
0201 #' dropTempView("table")
0202 #' }
0203 #' @note since 2.0.0
0204 dropTempView <- function(viewName) {
0205   sparkSession <- getSparkSession()
0206   if (class(viewName) != "character") {
0207     stop("viewName must be a string.")
0208   }
0209   catalog <- callJMethod(sparkSession, "catalog")
0210   callJMethod(catalog, "dropTempView", viewName)
0211 }
0212 
0213 #' Tables
0214 #'
0215 #' Returns a SparkDataFrame containing names of tables in the given database.
0216 #'
0217 #' @param databaseName (optional) name of the database
0218 #' @return a SparkDataFrame
0219 #' @rdname tables
0220 #' @seealso \link{listTables}
0221 #' @examples
0222 #'\dontrun{
0223 #' sparkR.session()
0224 #' tables("hive")
0225 #' }
0226 #' @name tables
0227 #' @note tables since 1.4.0
0228 tables <- function(databaseName = NULL) {
0229   # rename column to match previous output schema
0230   withColumnRenamed(listTables(databaseName), "name", "tableName")
0231 }
0232 
0233 #' Table Names
0234 #'
0235 #' Returns the names of tables in the given database as an array.
0236 #'
0237 #' @param databaseName (optional) name of the database
0238 #' @return a list of table names
0239 #' @rdname tableNames
0240 #' @examples
0241 #'\dontrun{
0242 #' sparkR.session()
0243 #' tableNames("hive")
0244 #' }
0245 #' @name tableNames
0246 #' @note tableNames since 1.4.0
0247 tableNames <- function(databaseName = NULL) {
0248   sparkSession <- getSparkSession()
0249   callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0250               "getTableNames",
0251               sparkSession,
0252               databaseName)
0253 }
0254 
0255 #' Returns the current default database
0256 #'
0257 #' Returns the current default database.
0258 #'
0259 #' @return name of the current default database.
0260 #' @rdname currentDatabase
0261 #' @name currentDatabase
0262 #' @examples
0263 #' \dontrun{
0264 #' sparkR.session()
0265 #' currentDatabase()
0266 #' }
0267 #' @note since 2.2.0
0268 currentDatabase <- function() {
0269   sparkSession <- getSparkSession()
0270   catalog <- callJMethod(sparkSession, "catalog")
0271   callJMethod(catalog, "currentDatabase")
0272 }
0273 
0274 #' Sets the current default database
0275 #'
0276 #' Sets the current default database.
0277 #'
0278 #' @param databaseName name of the database
0279 #' @rdname setCurrentDatabase
0280 #' @name setCurrentDatabase
0281 #' @examples
0282 #' \dontrun{
0283 #' sparkR.session()
0284 #' setCurrentDatabase("default")
0285 #' }
0286 #' @note since 2.2.0
0287 setCurrentDatabase <- function(databaseName) {
0288   sparkSession <- getSparkSession()
0289   if (class(databaseName) != "character") {
0290     stop("databaseName must be a string.")
0291   }
0292   catalog <- callJMethod(sparkSession, "catalog")
0293   invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName))
0294 }
0295 
0296 #' Returns a list of databases available
0297 #'
0298 #' Returns a list of databases available.
0299 #'
0300 #' @return a SparkDataFrame of the list of databases.
0301 #' @rdname listDatabases
0302 #' @name listDatabases
0303 #' @examples
0304 #' \dontrun{
0305 #' sparkR.session()
0306 #' listDatabases()
0307 #' }
0308 #' @note since 2.2.0
0309 listDatabases <- function() {
0310   sparkSession <- getSparkSession()
0311   catalog <- callJMethod(sparkSession, "catalog")
0312   dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
0313 }
0314 
0315 #' Returns a list of tables or views in the specified database
0316 #'
0317 #' Returns a list of tables or views in the specified database.
0318 #' This includes all temporary views.
0319 #'
0320 #' @param databaseName (optional) name of the database
0321 #' @return a SparkDataFrame of the list of tables.
0322 #' @rdname listTables
0323 #' @name listTables
0324 #' @seealso \link{tables}
0325 #' @examples
0326 #' \dontrun{
0327 #' sparkR.session()
0328 #' listTables()
0329 #' listTables("default")
0330 #' }
0331 #' @note since 2.2.0
0332 listTables <- function(databaseName = NULL) {
0333   sparkSession <- getSparkSession()
0334   if (!is.null(databaseName) && class(databaseName) != "character") {
0335     stop("databaseName must be a string.")
0336   }
0337   catalog <- callJMethod(sparkSession, "catalog")
0338   jdst <- if (is.null(databaseName)) {
0339     callJMethod(catalog, "listTables")
0340   } else {
0341     handledCallJMethod(catalog, "listTables", databaseName)
0342   }
0343   dataFrame(callJMethod(jdst, "toDF"))
0344 }
0345 
0346 #' Returns a list of columns for the given table/view in the specified database
0347 #'
0348 #' Returns a list of columns for the given table/view in the specified database.
0349 #'
0350 #' @param tableName the qualified or unqualified name that designates a table/view. If no database
0351 #'                  identifier is provided, it refers to a table/view in the current database.
0352 #'                  If \code{databaseName} parameter is specified, this must be an unqualified name.
0353 #' @param databaseName (optional) name of the database
0354 #' @return a SparkDataFrame of the list of column descriptions.
0355 #' @rdname listColumns
0356 #' @name listColumns
0357 #' @examples
0358 #' \dontrun{
0359 #' sparkR.session()
0360 #' listColumns("mytable")
0361 #' }
0362 #' @note since 2.2.0
0363 listColumns <- function(tableName, databaseName = NULL) {
0364   sparkSession <- getSparkSession()
0365   if (!is.null(databaseName) && class(databaseName) != "character") {
0366     stop("databaseName must be a string.")
0367   }
0368   catalog <- callJMethod(sparkSession, "catalog")
0369   jdst <- if (is.null(databaseName)) {
0370     handledCallJMethod(catalog, "listColumns", tableName)
0371   } else {
0372     handledCallJMethod(catalog, "listColumns", databaseName, tableName)
0373   }
0374   dataFrame(callJMethod(jdst, "toDF"))
0375 }
0376 
0377 #' Returns a list of functions registered in the specified database
0378 #'
0379 #' Returns a list of functions registered in the specified database.
0380 #' This includes all temporary functions.
0381 #'
0382 #' @param databaseName (optional) name of the database
0383 #' @return a SparkDataFrame of the list of function descriptions.
0384 #' @rdname listFunctions
0385 #' @name listFunctions
0386 #' @examples
0387 #' \dontrun{
0388 #' sparkR.session()
0389 #' listFunctions()
0390 #' }
0391 #' @note since 2.2.0
0392 listFunctions <- function(databaseName = NULL) {
0393   sparkSession <- getSparkSession()
0394   if (!is.null(databaseName) && class(databaseName) != "character") {
0395     stop("databaseName must be a string.")
0396   }
0397   catalog <- callJMethod(sparkSession, "catalog")
0398   jdst <- if (is.null(databaseName)) {
0399     callJMethod(catalog, "listFunctions")
0400   } else {
0401     handledCallJMethod(catalog, "listFunctions", databaseName)
0402   }
0403   dataFrame(callJMethod(jdst, "toDF"))
0404 }
0405 
0406 #' Recovers all the partitions in the directory of a table and update the catalog
0407 #'
0408 #' Recovers all the partitions in the directory of a table and update the catalog. The name should
0409 #' reference a partitioned table, and not a view.
0410 #'
0411 #' @param tableName the qualified or unqualified name that designates a table. If no database
0412 #'                  identifier is provided, it refers to a table in the current database.
0413 #' @rdname recoverPartitions
0414 #' @name recoverPartitions
0415 #' @examples
0416 #' \dontrun{
0417 #' sparkR.session()
0418 #' recoverPartitions("myTable")
0419 #' }
0420 #' @note since 2.2.0
0421 recoverPartitions <- function(tableName) {
0422   sparkSession <- getSparkSession()
0423   catalog <- callJMethod(sparkSession, "catalog")
0424   invisible(handledCallJMethod(catalog, "recoverPartitions", tableName))
0425 }
0426 
0427 #' Invalidates and refreshes all the cached data and metadata of the given table
0428 #'
0429 #' Invalidates and refreshes all the cached data and metadata of the given table. For performance
0430 #' reasons, Spark SQL or the external data source library it uses might cache certain metadata about
0431 #' a table, such as the location of blocks. When those change outside of Spark SQL, users should
0432 #' call this function to invalidate the cache.
0433 #'
0434 #' If this table is cached as an InMemoryRelation, drop the original cached version and make the
0435 #' new version cached lazily.
0436 #'
0437 #' @param tableName the qualified or unqualified name that designates a table. If no database
0438 #'                  identifier is provided, it refers to a table in the current database.
0439 #' @rdname refreshTable
0440 #' @name refreshTable
0441 #' @examples
0442 #' \dontrun{
0443 #' sparkR.session()
0444 #' refreshTable("myTable")
0445 #' }
0446 #' @note since 2.2.0
0447 refreshTable <- function(tableName) {
0448   sparkSession <- getSparkSession()
0449   catalog <- callJMethod(sparkSession, "catalog")
0450   invisible(handledCallJMethod(catalog, "refreshTable", tableName))
0451 }
0452 
0453 #' Invalidates and refreshes all the cached data and metadata for SparkDataFrame containing path
0454 #'
0455 #' Invalidates and refreshes all the cached data (and the associated metadata) for any
0456 #' SparkDataFrame that contains the given data source path. Path matching is by prefix, i.e. "/"
0457 #' would invalidate everything that is cached.
0458 #'
0459 #' @param path the path of the data source.
0460 #' @rdname refreshByPath
0461 #' @name refreshByPath
0462 #' @examples
0463 #' \dontrun{
0464 #' sparkR.session()
0465 #' refreshByPath("/path")
0466 #' }
0467 #' @note since 2.2.0
0468 refreshByPath <- function(path) {
0469   sparkSession <- getSparkSession()
0470   catalog <- callJMethod(sparkSession, "catalog")
0471   invisible(handledCallJMethod(catalog, "refreshByPath", path))
0472 }