Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # For this example, we shall use the "flights" dataset
0019 # The dataset consists of every flight departing Houston in 2011.
0020 # The data set is made up of 227,496 rows x 14 columns.
0021 
0022 # To run this example use
0023 # ./bin/spark-submit examples/src/main/r/data-manipulation.R <path_to_csv>
0024 
0025 # Load SparkR library into your R session
0026 library(SparkR)
0027 
0028 args <- commandArgs(trailing = TRUE)
0029 
0030 if (length(args) != 1) {
0031   print("Usage: data-manipulation.R <path-to-flights.csv>")
0032   print("The data can be downloaded from: http://s3-us-west-2.amazonaws.com/sparkr-data/flights.csv")
0033   q("no")
0034 }
0035 
0036 ## Initialize SparkSession
0037 sparkR.session(appName = "SparkR-data-manipulation-example")
0038 
0039 flightsCsvPath <- args[[1]]
0040 
0041 # Create a local R dataframe
0042 flights_df <- read.csv(flightsCsvPath, header = TRUE)
0043 flights_df$date <- as.Date(flights_df$date)
0044 
0045 ## Filter flights whose destination is San Francisco and write to a local data frame
0046 SFO_df <- flights_df[flights_df$dest == "SFO", ]
0047 
0048 # Convert the local data frame into a SparkDataFrame
0049 SFO_DF <- createDataFrame(SFO_df)
0050 
0051 #  Directly create a SparkDataFrame from the source data
0052 flightsDF <- read.df(flightsCsvPath, source = "csv", header = "true")
0053 
0054 # Print the schema of this SparkDataFrame
0055 printSchema(flightsDF)
0056 
0057 # Cache the SparkDataFrame
0058 cache(flightsDF)
0059 
0060 # Print the first 6 rows of the SparkDataFrame
0061 showDF(flightsDF, numRows = 6) ## Or
0062 head(flightsDF)
0063 
0064 # Show the column names in the SparkDataFrame
0065 columns(flightsDF)
0066 
0067 # Show the number of rows in the SparkDataFrame
0068 count(flightsDF)
0069 
0070 # Select specific columns
0071 destDF <- select(flightsDF, "dest", "cancelled")
0072 
0073 # Using SQL to select columns of data
0074 # First, register the flights SparkDataFrame as a table
0075 createOrReplaceTempView(flightsDF, "flightsTable")
0076 destDF <- sql("SELECT dest, cancelled FROM flightsTable")
0077 
0078 # Use collect to create a local R data frame
0079 local_df <- collect(destDF)
0080 
0081 # Print the newly created local data frame
0082 head(local_df)
0083 
0084 # Filter flights whose destination is JFK
0085 jfkDF <- filter(flightsDF, "dest = \"JFK\"") ##OR
0086 jfkDF <- filter(flightsDF, flightsDF$dest == "JFK")
0087 
0088 # If the magrittr library is available, we can use it to
0089 # chain data frame operations
0090 if("magrittr" %in% rownames(installed.packages())) {
0091   library(magrittr)
0092 
0093   # Group the flights by date and then find the average daily delay
0094   # Write the result into a SparkDataFrame
0095   groupBy(flightsDF, flightsDF$date) %>%
0096     summarize(avg(flightsDF$dep_delay), avg(flightsDF$arr_delay)) -> dailyDelayDF
0097 
0098   # Print the computed SparkDataFrame
0099   head(dailyDelayDF)
0100 }
0101 
0102 # Stop the SparkSession now
0103 sparkR.session.stop()