Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # To run this example use
0019 # ./bin/spark-submit examples/src/main/r/RSparkSQLExample.R
0020 
0021 library(SparkR)
0022 
0023 # $example on:init_session$
0024 sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
0025 # $example off:init_session$
0026 
0027 
0028 # $example on:create_df$
0029 df <- read.json("examples/src/main/resources/people.json")
0030 
0031 # Displays the content of the DataFrame
0032 head(df)
0033 ##   age    name
0034 ## 1  NA Michael
0035 ## 2  30    Andy
0036 ## 3  19  Justin
0037 
0038 # Another method to print the first few rows and optionally truncate the printing of long values
0039 showDF(df)
0040 ## +----+-------+
0041 ## | age|   name|
0042 ## +----+-------+
0043 ## |null|Michael|
0044 ## |  30|   Andy|
0045 ## |  19| Justin|
0046 ## +----+-------+
0047 ## $example off:create_df$
0048 
0049 
0050 # $example on:untyped_ops$
0051 # Create the DataFrame
0052 df <- read.json("examples/src/main/resources/people.json")
0053 
0054 # Show the content of the DataFrame
0055 head(df)
0056 ##   age    name
0057 ## 1  NA Michael
0058 ## 2  30    Andy
0059 ## 3  19  Justin
0060 
0061 
0062 # Print the schema in a tree format
0063 printSchema(df)
0064 ## root
0065 ## |-- age: long (nullable = true)
0066 ## |-- name: string (nullable = true)
0067 
0068 # Select only the "name" column
0069 head(select(df, "name"))
0070 ##      name
0071 ## 1 Michael
0072 ## 2    Andy
0073 ## 3  Justin
0074 
0075 # Select everybody, but increment the age by 1
0076 head(select(df, df$name, df$age + 1))
0077 ##      name (age + 1.0)
0078 ## 1 Michael          NA
0079 ## 2    Andy          31
0080 ## 3  Justin          20
0081 
0082 # Select people older than 21
0083 head(where(df, df$age > 21))
0084 ##   age name
0085 ## 1  30 Andy
0086 
0087 # Count people by age
0088 head(count(groupBy(df, "age")))
0089 ##   age count
0090 ## 1  19     1
0091 ## 2  NA     1
0092 ## 3  30     1
0093 # $example off:untyped_ops$
0094 
0095 
0096 # Register this DataFrame as a table.
0097 createOrReplaceTempView(df, "table")
0098 # $example on:run_sql$
0099 df <- sql("SELECT * FROM table")
0100 # $example off:run_sql$
0101 
0102 # Ignore corrupt files
0103 # $example on:ignore_corrupt_files$
0104 # enable ignore corrupt files
0105 sql("set spark.sql.files.ignoreCorruptFiles=true")
0106 # dir1/file3.json is corrupt from parquet's view
0107 testCorruptDF <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"))
0108 head(testCorruptDF)
0109 #            file
0110 # 1 file1.parquet
0111 # 2 file2.parquet
0112 # $example off:ignore_corrupt_files$
0113 
0114 # $example on:recursive_file_lookup$
0115 recursiveLoadedDF <- read.df("examples/src/main/resources/dir1", "parquet", recursiveFileLookup = "true")
0116 head(recursiveLoadedDF)
0117 #            file
0118 # 1 file1.parquet
0119 # 2 file2.parquet
0120 # $example off:recursive_file_lookup$
0121 sql("set spark.sql.files.ignoreCorruptFiles=false")
0122 
0123 # $example on:generic_load_save_functions$
0124 df <- read.df("examples/src/main/resources/users.parquet")
0125 write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
0126 # $example off:generic_load_save_functions$
0127 
0128 
0129 # $example on:manual_load_options$
0130 df <- read.df("examples/src/main/resources/people.json", "json")
0131 namesAndAges <- select(df, "name", "age")
0132 write.df(namesAndAges, "namesAndAges.parquet", "parquet")
0133 # $example off:manual_load_options$
0134 
0135 
0136 # $example on:manual_load_options_csv$
0137 df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
0138 namesAndAges <- select(df, "name", "age")
0139 # $example off:manual_load_options_csv$
0140 
0141 # $example on:load_with_path_glob_filter$
0142 df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*.parquet")
0143 #            file
0144 # 1 file1.parquet
0145 # $example off:load_with_path_glob_filter$
0146 
0147 # $example on:manual_save_options_orc$
0148 df <- read.df("examples/src/main/resources/users.orc", "orc")
0149 write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
0150 # $example off:manual_save_options_orc$
0151 
0152 # $example on:direct_sql$
0153 df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
0154 # $example off:direct_sql$
0155 
0156 
0157 # $example on:basic_parquet_example$
0158 df <- read.df("examples/src/main/resources/people.json", "json")
0159 
0160 # SparkDataFrame can be saved as Parquet files, maintaining the schema information.
0161 write.parquet(df, "people.parquet")
0162 
0163 # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
0164 # The result of loading a parquet file is also a DataFrame.
0165 parquetFile <- read.parquet("people.parquet")
0166 
0167 # Parquet files can also be used to create a temporary view and then used in SQL statements.
0168 createOrReplaceTempView(parquetFile, "parquetFile")
0169 teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
0170 head(teenagers)
0171 ##     name
0172 ## 1 Justin
0173 
0174 # We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
0175 schema <- structType(structField("name", "string"))
0176 teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
0177 for (teenName in collect(teenNames)$name) {
0178   cat(teenName, "\n")
0179 }
0180 ## Name: Michael
0181 ## Name: Andy
0182 ## Name: Justin
0183 # $example off:basic_parquet_example$
0184 
0185 
0186 # $example on:schema_merging$
0187 df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
0188 df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))
0189 
0190 # Create a simple DataFrame, stored into a partition directory
0191 write.df(df1, "data/test_table/key=1", "parquet", "overwrite")
0192 
0193 # Create another DataFrame in a new partition directory,
0194 # adding a new column and dropping an existing column
0195 write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
0196 
0197 # Read the partitioned table
0198 df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
0199 printSchema(df3)
0200 # The final schema consists of all 3 columns in the Parquet files together
0201 # with the partitioning column appeared in the partition directory paths
0202 ## root
0203 ##  |-- single: double (nullable = true)
0204 ##  |-- double: double (nullable = true)
0205 ##  |-- triple: double (nullable = true)
0206 ##  |-- key: integer (nullable = true)
0207 # $example off:schema_merging$
0208 
0209 
0210 # $example on:json_dataset$
0211 # A JSON dataset is pointed to by path.
0212 # The path can be either a single text file or a directory storing text files.
0213 path <- "examples/src/main/resources/people.json"
0214 # Create a DataFrame from the file(s) pointed to by path
0215 people <- read.json(path)
0216 
0217 # The inferred schema can be visualized using the printSchema() method.
0218 printSchema(people)
0219 ## root
0220 ##  |-- age: long (nullable = true)
0221 ##  |-- name: string (nullable = true)
0222 
0223 # Register this DataFrame as a table.
0224 createOrReplaceTempView(people, "people")
0225 
0226 # SQL statements can be run by using the sql methods.
0227 teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
0228 head(teenagers)
0229 ##     name
0230 ## 1 Justin
0231 # $example off:json_dataset$
0232 
0233 
0234 # $example on:spark_hive$
0235 # enableHiveSupport defaults to TRUE
0236 sparkR.session(enableHiveSupport = TRUE)
0237 sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
0238 sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
0239 
0240 # Queries can be expressed in HiveQL.
0241 results <- collect(sql("FROM src SELECT key, value"))
0242 # $example off:spark_hive$
0243 
0244 
0245 # $example on:jdbc_dataset$
0246 # Loading data from a JDBC source
0247 df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
0248 
0249 # Saving data to a JDBC source
0250 write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
0251 # $example off:jdbc_dataset$
0252 
0253 # Stop the SparkSession now
0254 sparkR.session.stop()