Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 A simple example demonstrating basic Spark SQL features.
0020 Run with:
0021   ./bin/spark-submit examples/src/main/python/sql/basic.py
0022 """
0023 from __future__ import print_function
0024 
0025 # $example on:init_session$
0026 from pyspark.sql import SparkSession
0027 # $example off:init_session$
0028 
0029 # $example on:schema_inferring$
0030 from pyspark.sql import Row
0031 # $example off:schema_inferring$
0032 
0033 # $example on:programmatic_schema$
0034 # Import data types
0035 from pyspark.sql.types import *
0036 # $example off:programmatic_schema$
0037 
0038 
0039 def basic_df_example(spark):
0040     # $example on:create_df$
0041     # spark is an existing SparkSession
0042     df = spark.read.json("examples/src/main/resources/people.json")
0043     # Displays the content of the DataFrame to stdout
0044     df.show()
0045     # +----+-------+
0046     # | age|   name|
0047     # +----+-------+
0048     # |null|Michael|
0049     # |  30|   Andy|
0050     # |  19| Justin|
0051     # +----+-------+
0052     # $example off:create_df$
0053 
0054     # $example on:untyped_ops$
0055     # spark, df are from the previous example
0056     # Print the schema in a tree format
0057     df.printSchema()
0058     # root
0059     # |-- age: long (nullable = true)
0060     # |-- name: string (nullable = true)
0061 
0062     # Select only the "name" column
0063     df.select("name").show()
0064     # +-------+
0065     # |   name|
0066     # +-------+
0067     # |Michael|
0068     # |   Andy|
0069     # | Justin|
0070     # +-------+
0071 
0072     # Select everybody, but increment the age by 1
0073     df.select(df['name'], df['age'] + 1).show()
0074     # +-------+---------+
0075     # |   name|(age + 1)|
0076     # +-------+---------+
0077     # |Michael|     null|
0078     # |   Andy|       31|
0079     # | Justin|       20|
0080     # +-------+---------+
0081 
0082     # Select people older than 21
0083     df.filter(df['age'] > 21).show()
0084     # +---+----+
0085     # |age|name|
0086     # +---+----+
0087     # | 30|Andy|
0088     # +---+----+
0089 
0090     # Count people by age
0091     df.groupBy("age").count().show()
0092     # +----+-----+
0093     # | age|count|
0094     # +----+-----+
0095     # |  19|    1|
0096     # |null|    1|
0097     # |  30|    1|
0098     # +----+-----+
0099     # $example off:untyped_ops$
0100 
0101     # $example on:run_sql$
0102     # Register the DataFrame as a SQL temporary view
0103     df.createOrReplaceTempView("people")
0104 
0105     sqlDF = spark.sql("SELECT * FROM people")
0106     sqlDF.show()
0107     # +----+-------+
0108     # | age|   name|
0109     # +----+-------+
0110     # |null|Michael|
0111     # |  30|   Andy|
0112     # |  19| Justin|
0113     # +----+-------+
0114     # $example off:run_sql$
0115 
0116     # $example on:global_temp_view$
0117     # Register the DataFrame as a global temporary view
0118     df.createGlobalTempView("people")
0119 
0120     # Global temporary view is tied to a system preserved database `global_temp`
0121     spark.sql("SELECT * FROM global_temp.people").show()
0122     # +----+-------+
0123     # | age|   name|
0124     # +----+-------+
0125     # |null|Michael|
0126     # |  30|   Andy|
0127     # |  19| Justin|
0128     # +----+-------+
0129 
0130     # Global temporary view is cross-session
0131     spark.newSession().sql("SELECT * FROM global_temp.people").show()
0132     # +----+-------+
0133     # | age|   name|
0134     # +----+-------+
0135     # |null|Michael|
0136     # |  30|   Andy|
0137     # |  19| Justin|
0138     # +----+-------+
0139     # $example off:global_temp_view$
0140 
0141 
0142 def schema_inference_example(spark):
0143     # $example on:schema_inferring$
0144     sc = spark.sparkContext
0145 
0146     # Load a text file and convert each line to a Row.
0147     lines = sc.textFile("examples/src/main/resources/people.txt")
0148     parts = lines.map(lambda l: l.split(","))
0149     people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
0150 
0151     # Infer the schema, and register the DataFrame as a table.
0152     schemaPeople = spark.createDataFrame(people)
0153     schemaPeople.createOrReplaceTempView("people")
0154 
0155     # SQL can be run over DataFrames that have been registered as a table.
0156     teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
0157 
0158     # The results of SQL queries are Dataframe objects.
0159     # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
0160     teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
0161     for name in teenNames:
0162         print(name)
0163     # Name: Justin
0164     # $example off:schema_inferring$
0165 
0166 
0167 def programmatic_schema_example(spark):
0168     # $example on:programmatic_schema$
0169     sc = spark.sparkContext
0170 
0171     # Load a text file and convert each line to a Row.
0172     lines = sc.textFile("examples/src/main/resources/people.txt")
0173     parts = lines.map(lambda l: l.split(","))
0174     # Each line is converted to a tuple.
0175     people = parts.map(lambda p: (p[0], p[1].strip()))
0176 
0177     # The schema is encoded in a string.
0178     schemaString = "name age"
0179 
0180     fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
0181     schema = StructType(fields)
0182 
0183     # Apply the schema to the RDD.
0184     schemaPeople = spark.createDataFrame(people, schema)
0185 
0186     # Creates a temporary view using the DataFrame
0187     schemaPeople.createOrReplaceTempView("people")
0188 
0189     # SQL can be run over DataFrames that have been registered as a table.
0190     results = spark.sql("SELECT name FROM people")
0191 
0192     results.show()
0193     # +-------+
0194     # |   name|
0195     # +-------+
0196     # |Michael|
0197     # |   Andy|
0198     # | Justin|
0199     # +-------+
0200     # $example off:programmatic_schema$
0201 
0202 if __name__ == "__main__":
0203     # $example on:init_session$
0204     spark = SparkSession \
0205         .builder \
0206         .appName("Python Spark SQL basic example") \
0207         .config("spark.some.config.option", "some-value") \
0208         .getOrCreate()
0209     # $example off:init_session$
0210 
0211     basic_df_example(spark)
0212     schema_inference_example(spark)
0213     programmatic_schema_example(spark)
0214 
0215     spark.stop()