0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # catalog.R: SparkSession catalog functions
0019
0020 #' (Deprecated) Create an external table
0021 #'
0022 #' Creates an external table based on the dataset in a data source,
0023 #' Returns a SparkDataFrame associated with the external table.
0024 #'
0025 #' The data source is specified by the \code{source} and a set of options(...).
0026 #' If \code{source} is not specified, the default data source configured by
0027 #' "spark.sql.sources.default" will be used.
0028 #'
0029 #' @param tableName a name of the table.
0030 #' @param path the path of files to load.
0031 #' @param source the name of external data source.
0032 #' @param schema the schema of the data required for some data sources.
0033 #' @param ... additional argument(s) passed to the method.
0034 #' @return A SparkDataFrame.
0035 #' @rdname createExternalTable-deprecated
0036 #' @seealso \link{createTable}
0037 #' @examples
0038 #'\dontrun{
0039 #' sparkR.session()
0040 #' df <- createExternalTable("myjson", path="path/to/json", source="json", schema)
0041 #' }
0042 #' @name createExternalTable
0043 #' @note createExternalTable since 1.4.0
0044 createExternalTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
0045 .Deprecated("createTable", old = "createExternalTable")
0046 createTable(tableName, path, source, schema, ...)
0047 }
0048
0049 #' Creates a table based on the dataset in a data source
0050 #'
0051 #' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated with
0052 #' the table.
0053 #'
0054 #' The data source is specified by the \code{source} and a set of options(...).
0055 #' If \code{source} is not specified, the default data source configured by
0056 #' "spark.sql.sources.default" will be used. When a \code{path} is specified, an external table is
0057 #' created from the data at the given path. Otherwise a managed table is created.
0058 #'
0059 #' @param tableName the qualified or unqualified name that designates a table. If no database
0060 #' identifier is provided, it refers to a table in the current database.
0061 #' @param path (optional) the path of files to load.
0062 #' @param source (optional) the name of the data source.
0063 #' @param schema (optional) the schema of the data required for some data sources.
0064 #' @param ... additional named parameters as options for the data source.
0065 #' @return A SparkDataFrame.
0066 #' @rdname createTable
0067 #' @examples
0068 #'\dontrun{
0069 #' sparkR.session()
0070 #' df <- createTable("myjson", path="path/to/json", source="json", schema)
0071 #'
0072 #' createTable("people", source = "json", schema = schema)
0073 #' insertInto(df, "people")
0074 #' }
0075 #' @name createTable
0076 #' @note createTable since 2.2.0
0077 createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
0078 sparkSession <- getSparkSession()
0079 options <- varargsToStrEnv(...)
0080 if (!is.null(path)) {
0081 options[["path"]] <- path
0082 }
0083 if (is.null(source)) {
0084 source <- getDefaultSqlSource()
0085 }
0086 catalog <- callJMethod(sparkSession, "catalog")
0087 if (is.null(schema)) {
0088 sdf <- callJMethod(catalog, "createTable", tableName, source, options)
0089 } else if (class(schema) == "structType") {
0090 sdf <- callJMethod(catalog, "createTable", tableName, source, schema$jobj, options)
0091 } else {
0092 stop("schema must be a structType.")
0093 }
0094 dataFrame(sdf)
0095 }
0096
0097 #' Cache Table
0098 #'
0099 #' Caches the specified table in-memory.
0100 #'
0101 #' @param tableName the qualified or unqualified name that designates a table. If no database
0102 #' identifier is provided, it refers to a table in the current database.
0103 #' @return SparkDataFrame
0104 #' @rdname cacheTable
0105 #' @examples
0106 #'\dontrun{
0107 #' sparkR.session()
0108 #' path <- "path/to/file.json"
0109 #' df <- read.json(path)
0110 #' createOrReplaceTempView(df, "table")
0111 #' cacheTable("table")
0112 #' }
0113 #' @name cacheTable
0114 #' @note cacheTable since 1.4.0
0115 cacheTable <- function(tableName) {
0116 sparkSession <- getSparkSession()
0117 catalog <- callJMethod(sparkSession, "catalog")
0118 invisible(handledCallJMethod(catalog, "cacheTable", tableName))
0119 }
0120
0121 #' Uncache Table
0122 #'
0123 #' Removes the specified table from the in-memory cache.
0124 #'
0125 #' @param tableName the qualified or unqualified name that designates a table. If no database
0126 #' identifier is provided, it refers to a table in the current database.
0127 #' @return SparkDataFrame
0128 #' @rdname uncacheTable
0129 #' @examples
0130 #'\dontrun{
0131 #' sparkR.session()
0132 #' path <- "path/to/file.json"
0133 #' df <- read.json(path)
0134 #' createOrReplaceTempView(df, "table")
0135 #' uncacheTable("table")
0136 #' }
0137 #' @name uncacheTable
0138 #' @note uncacheTable since 1.4.0
0139 uncacheTable <- function(tableName) {
0140 sparkSession <- getSparkSession()
0141 catalog <- callJMethod(sparkSession, "catalog")
0142 invisible(handledCallJMethod(catalog, "uncacheTable", tableName))
0143 }
0144
0145 #' Clear Cache
0146 #'
0147 #' Removes all cached tables from the in-memory cache.
0148 #'
0149 #' @rdname clearCache
0150 #' @examples
0151 #' \dontrun{
0152 #' clearCache()
0153 #' }
0154 #' @name clearCache
0155 #' @note clearCache since 1.4.0
0156 clearCache <- function() {
0157 sparkSession <- getSparkSession()
0158 catalog <- callJMethod(sparkSession, "catalog")
0159 invisible(callJMethod(catalog, "clearCache"))
0160 }
0161
0162 #' (Deprecated) Drop Temporary Table
0163 #'
0164 #' Drops the temporary table with the given table name in the catalog.
0165 #' If the table has been cached/persisted before, it's also unpersisted.
0166 #'
0167 #' @param tableName The name of the SparkSQL table to be dropped.
0168 #' @seealso \link{dropTempView}
0169 #' @rdname dropTempTable-deprecated
0170 #' @examples
0171 #' \dontrun{
0172 #' sparkR.session()
0173 #' df <- read.df(path, "parquet")
0174 #' createOrReplaceTempView(df, "table")
0175 #' dropTempTable("table")
0176 #' }
0177 #' @name dropTempTable
0178 #' @note dropTempTable since 1.4.0
0179 dropTempTable <- function(tableName) {
0180 .Deprecated("dropTempView", old = "dropTempTable")
0181 if (class(tableName) != "character") {
0182 stop("tableName must be a string.")
0183 }
0184 dropTempView(tableName)
0185 }
0186
0187 #' Drops the temporary view with the given view name in the catalog.
0188 #'
0189 #' Drops the temporary view with the given view name in the catalog.
0190 #' If the view has been cached before, then it will also be uncached.
0191 #'
0192 #' @param viewName the name of the temporary view to be dropped.
0193 #' @return TRUE if the view is dropped successfully, FALSE otherwise.
0194 #' @rdname dropTempView
0195 #' @name dropTempView
0196 #' @examples
0197 #' \dontrun{
0198 #' sparkR.session()
0199 #' df <- read.df(path, "parquet")
0200 #' createOrReplaceTempView(df, "table")
0201 #' dropTempView("table")
0202 #' }
0203 #' @note since 2.0.0
0204 dropTempView <- function(viewName) {
0205 sparkSession <- getSparkSession()
0206 if (class(viewName) != "character") {
0207 stop("viewName must be a string.")
0208 }
0209 catalog <- callJMethod(sparkSession, "catalog")
0210 callJMethod(catalog, "dropTempView", viewName)
0211 }
0212
0213 #' Tables
0214 #'
0215 #' Returns a SparkDataFrame containing names of tables in the given database.
0216 #'
0217 #' @param databaseName (optional) name of the database
0218 #' @return a SparkDataFrame
0219 #' @rdname tables
0220 #' @seealso \link{listTables}
0221 #' @examples
0222 #'\dontrun{
0223 #' sparkR.session()
0224 #' tables("hive")
0225 #' }
0226 #' @name tables
0227 #' @note tables since 1.4.0
0228 tables <- function(databaseName = NULL) {
0229 # rename column to match previous output schema
0230 withColumnRenamed(listTables(databaseName), "name", "tableName")
0231 }
0232
0233 #' Table Names
0234 #'
0235 #' Returns the names of tables in the given database as an array.
0236 #'
0237 #' @param databaseName (optional) name of the database
0238 #' @return a list of table names
0239 #' @rdname tableNames
0240 #' @examples
0241 #'\dontrun{
0242 #' sparkR.session()
0243 #' tableNames("hive")
0244 #' }
0245 #' @name tableNames
0246 #' @note tableNames since 1.4.0
0247 tableNames <- function(databaseName = NULL) {
0248 sparkSession <- getSparkSession()
0249 callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0250 "getTableNames",
0251 sparkSession,
0252 databaseName)
0253 }
0254
0255 #' Returns the current default database
0256 #'
0257 #' Returns the current default database.
0258 #'
0259 #' @return name of the current default database.
0260 #' @rdname currentDatabase
0261 #' @name currentDatabase
0262 #' @examples
0263 #' \dontrun{
0264 #' sparkR.session()
0265 #' currentDatabase()
0266 #' }
0267 #' @note since 2.2.0
0268 currentDatabase <- function() {
0269 sparkSession <- getSparkSession()
0270 catalog <- callJMethod(sparkSession, "catalog")
0271 callJMethod(catalog, "currentDatabase")
0272 }
0273
0274 #' Sets the current default database
0275 #'
0276 #' Sets the current default database.
0277 #'
0278 #' @param databaseName name of the database
0279 #' @rdname setCurrentDatabase
0280 #' @name setCurrentDatabase
0281 #' @examples
0282 #' \dontrun{
0283 #' sparkR.session()
0284 #' setCurrentDatabase("default")
0285 #' }
0286 #' @note since 2.2.0
0287 setCurrentDatabase <- function(databaseName) {
0288 sparkSession <- getSparkSession()
0289 if (class(databaseName) != "character") {
0290 stop("databaseName must be a string.")
0291 }
0292 catalog <- callJMethod(sparkSession, "catalog")
0293 invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName))
0294 }
0295
0296 #' Returns a list of databases available
0297 #'
0298 #' Returns a list of databases available.
0299 #'
0300 #' @return a SparkDataFrame of the list of databases.
0301 #' @rdname listDatabases
0302 #' @name listDatabases
0303 #' @examples
0304 #' \dontrun{
0305 #' sparkR.session()
0306 #' listDatabases()
0307 #' }
0308 #' @note since 2.2.0
0309 listDatabases <- function() {
0310 sparkSession <- getSparkSession()
0311 catalog <- callJMethod(sparkSession, "catalog")
0312 dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
0313 }
0314
0315 #' Returns a list of tables or views in the specified database
0316 #'
0317 #' Returns a list of tables or views in the specified database.
0318 #' This includes all temporary views.
0319 #'
0320 #' @param databaseName (optional) name of the database
0321 #' @return a SparkDataFrame of the list of tables.
0322 #' @rdname listTables
0323 #' @name listTables
0324 #' @seealso \link{tables}
0325 #' @examples
0326 #' \dontrun{
0327 #' sparkR.session()
0328 #' listTables()
0329 #' listTables("default")
0330 #' }
0331 #' @note since 2.2.0
0332 listTables <- function(databaseName = NULL) {
0333 sparkSession <- getSparkSession()
0334 if (!is.null(databaseName) && class(databaseName) != "character") {
0335 stop("databaseName must be a string.")
0336 }
0337 catalog <- callJMethod(sparkSession, "catalog")
0338 jdst <- if (is.null(databaseName)) {
0339 callJMethod(catalog, "listTables")
0340 } else {
0341 handledCallJMethod(catalog, "listTables", databaseName)
0342 }
0343 dataFrame(callJMethod(jdst, "toDF"))
0344 }
0345
0346 #' Returns a list of columns for the given table/view in the specified database
0347 #'
0348 #' Returns a list of columns for the given table/view in the specified database.
0349 #'
0350 #' @param tableName the qualified or unqualified name that designates a table/view. If no database
0351 #' identifier is provided, it refers to a table/view in the current database.
0352 #' If \code{databaseName} parameter is specified, this must be an unqualified name.
0353 #' @param databaseName (optional) name of the database
0354 #' @return a SparkDataFrame of the list of column descriptions.
0355 #' @rdname listColumns
0356 #' @name listColumns
0357 #' @examples
0358 #' \dontrun{
0359 #' sparkR.session()
0360 #' listColumns("mytable")
0361 #' }
0362 #' @note since 2.2.0
0363 listColumns <- function(tableName, databaseName = NULL) {
0364 sparkSession <- getSparkSession()
0365 if (!is.null(databaseName) && class(databaseName) != "character") {
0366 stop("databaseName must be a string.")
0367 }
0368 catalog <- callJMethod(sparkSession, "catalog")
0369 jdst <- if (is.null(databaseName)) {
0370 handledCallJMethod(catalog, "listColumns", tableName)
0371 } else {
0372 handledCallJMethod(catalog, "listColumns", databaseName, tableName)
0373 }
0374 dataFrame(callJMethod(jdst, "toDF"))
0375 }
0376
0377 #' Returns a list of functions registered in the specified database
0378 #'
0379 #' Returns a list of functions registered in the specified database.
0380 #' This includes all temporary functions.
0381 #'
0382 #' @param databaseName (optional) name of the database
0383 #' @return a SparkDataFrame of the list of function descriptions.
0384 #' @rdname listFunctions
0385 #' @name listFunctions
0386 #' @examples
0387 #' \dontrun{
0388 #' sparkR.session()
0389 #' listFunctions()
0390 #' }
0391 #' @note since 2.2.0
0392 listFunctions <- function(databaseName = NULL) {
0393 sparkSession <- getSparkSession()
0394 if (!is.null(databaseName) && class(databaseName) != "character") {
0395 stop("databaseName must be a string.")
0396 }
0397 catalog <- callJMethod(sparkSession, "catalog")
0398 jdst <- if (is.null(databaseName)) {
0399 callJMethod(catalog, "listFunctions")
0400 } else {
0401 handledCallJMethod(catalog, "listFunctions", databaseName)
0402 }
0403 dataFrame(callJMethod(jdst, "toDF"))
0404 }
0405
0406 #' Recovers all the partitions in the directory of a table and update the catalog
0407 #'
0408 #' Recovers all the partitions in the directory of a table and update the catalog. The name should
0409 #' reference a partitioned table, and not a view.
0410 #'
0411 #' @param tableName the qualified or unqualified name that designates a table. If no database
0412 #' identifier is provided, it refers to a table in the current database.
0413 #' @rdname recoverPartitions
0414 #' @name recoverPartitions
0415 #' @examples
0416 #' \dontrun{
0417 #' sparkR.session()
0418 #' recoverPartitions("myTable")
0419 #' }
0420 #' @note since 2.2.0
0421 recoverPartitions <- function(tableName) {
0422 sparkSession <- getSparkSession()
0423 catalog <- callJMethod(sparkSession, "catalog")
0424 invisible(handledCallJMethod(catalog, "recoverPartitions", tableName))
0425 }
0426
0427 #' Invalidates and refreshes all the cached data and metadata of the given table
0428 #'
0429 #' Invalidates and refreshes all the cached data and metadata of the given table. For performance
0430 #' reasons, Spark SQL or the external data source library it uses might cache certain metadata about
0431 #' a table, such as the location of blocks. When those change outside of Spark SQL, users should
0432 #' call this function to invalidate the cache.
0433 #'
0434 #' If this table is cached as an InMemoryRelation, drop the original cached version and make the
0435 #' new version cached lazily.
0436 #'
0437 #' @param tableName the qualified or unqualified name that designates a table. If no database
0438 #' identifier is provided, it refers to a table in the current database.
0439 #' @rdname refreshTable
0440 #' @name refreshTable
0441 #' @examples
0442 #' \dontrun{
0443 #' sparkR.session()
0444 #' refreshTable("myTable")
0445 #' }
0446 #' @note since 2.2.0
0447 refreshTable <- function(tableName) {
0448 sparkSession <- getSparkSession()
0449 catalog <- callJMethod(sparkSession, "catalog")
0450 invisible(handledCallJMethod(catalog, "refreshTable", tableName))
0451 }
0452
0453 #' Invalidates and refreshes all the cached data and metadata for SparkDataFrame containing path
0454 #'
0455 #' Invalidates and refreshes all the cached data (and the associated metadata) for any
0456 #' SparkDataFrame that contains the given data source path. Path matching is by prefix, i.e. "/"
0457 #' would invalidate everything that is cached.
0458 #'
0459 #' @param path the path of the data source.
0460 #' @rdname refreshByPath
0461 #' @name refreshByPath
0462 #' @examples
0463 #' \dontrun{
0464 #' sparkR.session()
0465 #' refreshByPath("/path")
0466 #' }
0467 #' @note since 2.2.0
0468 refreshByPath <- function(path) {
0469 sparkSession <- getSparkSession()
0470 catalog <- callJMethod(sparkSession, "catalog")
0471 invisible(handledCallJMethod(catalog, "refreshByPath", path))
0472 }