|
||||
0001 # 0002 # Licensed to the Apache Software Foundation (ASF) under one or more 0003 # contributor license agreements. See the NOTICE file distributed with 0004 # this work for additional information regarding copyright ownership. 0005 # The ASF licenses this file to You under the Apache License, Version 2.0 0006 # (the "License"); you may not use this file except in compliance with 0007 # the License. You may obtain a copy of the License at 0008 # 0009 # http://www.apache.org/licenses/LICENSE-2.0 0010 # 0011 # Unless required by applicable law or agreed to in writing, software 0012 # distributed under the License is distributed on an "AS IS" BASIS, 0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 # See the License for the specific language governing permissions and 0015 # limitations under the License. 0016 # 0017 0018 """ 0019 A simple example demonstrating basic Spark SQL features. 0020 Run with: 0021 ./bin/spark-submit examples/src/main/python/sql/basic.py 0022 """ 0023 from __future__ import print_function 0024 0025 # $example on:init_session$ 0026 from pyspark.sql import SparkSession 0027 # $example off:init_session$ 0028 0029 # $example on:schema_inferring$ 0030 from pyspark.sql import Row 0031 # $example off:schema_inferring$ 0032 0033 # $example on:programmatic_schema$ 0034 # Import data types 0035 from pyspark.sql.types import * 0036 # $example off:programmatic_schema$ 0037 0038 0039 def basic_df_example(spark): 0040 # $example on:create_df$ 0041 # spark is an existing SparkSession 0042 df = spark.read.json("examples/src/main/resources/people.json") 0043 # Displays the content of the DataFrame to stdout 0044 df.show() 0045 # +----+-------+ 0046 # | age| name| 0047 # +----+-------+ 0048 # |null|Michael| 0049 # | 30| Andy| 0050 # | 19| Justin| 0051 # +----+-------+ 0052 # $example off:create_df$ 0053 0054 # $example on:untyped_ops$ 0055 # spark, df are from the previous example 0056 # Print the schema in a tree format 0057 df.printSchema() 0058 # root 0059 # |-- age: long (nullable = true) 0060 # |-- name: string (nullable = true) 0061 0062 # Select only the "name" column 0063 df.select("name").show() 0064 # +-------+ 0065 # | name| 0066 # +-------+ 0067 # |Michael| 0068 # | Andy| 0069 # | Justin| 0070 # +-------+ 0071 0072 # Select everybody, but increment the age by 1 0073 df.select(df['name'], df['age'] + 1).show() 0074 # +-------+---------+ 0075 # | name|(age + 1)| 0076 # +-------+---------+ 0077 # |Michael| null| 0078 # | Andy| 31| 0079 # | Justin| 20| 0080 # +-------+---------+ 0081 0082 # Select people older than 21 0083 df.filter(df['age'] > 21).show() 0084 # +---+----+ 0085 # |age|name| 0086 # +---+----+ 0087 # | 30|Andy| 0088 # +---+----+ 0089 0090 # Count people by age 0091 df.groupBy("age").count().show() 0092 # +----+-----+ 0093 # | age|count| 0094 # +----+-----+ 0095 # | 19| 1| 0096 # |null| 1| 0097 # | 30| 1| 0098 # +----+-----+ 0099 # $example off:untyped_ops$ 0100 0101 # $example on:run_sql$ 0102 # Register the DataFrame as a SQL temporary view 0103 df.createOrReplaceTempView("people") 0104 0105 sqlDF = spark.sql("SELECT * FROM people") 0106 sqlDF.show() 0107 # +----+-------+ 0108 # | age| name| 0109 # +----+-------+ 0110 # |null|Michael| 0111 # | 30| Andy| 0112 # | 19| Justin| 0113 # +----+-------+ 0114 # $example off:run_sql$ 0115 0116 # $example on:global_temp_view$ 0117 # Register the DataFrame as a global temporary view 0118 df.createGlobalTempView("people") 0119 0120 # Global temporary view is tied to a system preserved database `global_temp` 0121 spark.sql("SELECT * FROM global_temp.people").show() 0122 # +----+-------+ 0123 # | age| name| 0124 # +----+-------+ 0125 # |null|Michael| 0126 # | 30| Andy| 0127 # | 19| Justin| 0128 # +----+-------+ 0129 0130 # Global temporary view is cross-session 0131 spark.newSession().sql("SELECT * FROM global_temp.people").show() 0132 # +----+-------+ 0133 # | age| name| 0134 # +----+-------+ 0135 # |null|Michael| 0136 # | 30| Andy| 0137 # | 19| Justin| 0138 # +----+-------+ 0139 # $example off:global_temp_view$ 0140 0141 0142 def schema_inference_example(spark): 0143 # $example on:schema_inferring$ 0144 sc = spark.sparkContext 0145 0146 # Load a text file and convert each line to a Row. 0147 lines = sc.textFile("examples/src/main/resources/people.txt") 0148 parts = lines.map(lambda l: l.split(",")) 0149 people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) 0150 0151 # Infer the schema, and register the DataFrame as a table. 0152 schemaPeople = spark.createDataFrame(people) 0153 schemaPeople.createOrReplaceTempView("people") 0154 0155 # SQL can be run over DataFrames that have been registered as a table. 0156 teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") 0157 0158 # The results of SQL queries are Dataframe objects. 0159 # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. 0160 teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() 0161 for name in teenNames: 0162 print(name) 0163 # Name: Justin 0164 # $example off:schema_inferring$ 0165 0166 0167 def programmatic_schema_example(spark): 0168 # $example on:programmatic_schema$ 0169 sc = spark.sparkContext 0170 0171 # Load a text file and convert each line to a Row. 0172 lines = sc.textFile("examples/src/main/resources/people.txt") 0173 parts = lines.map(lambda l: l.split(",")) 0174 # Each line is converted to a tuple. 0175 people = parts.map(lambda p: (p[0], p[1].strip())) 0176 0177 # The schema is encoded in a string. 0178 schemaString = "name age" 0179 0180 fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] 0181 schema = StructType(fields) 0182 0183 # Apply the schema to the RDD. 0184 schemaPeople = spark.createDataFrame(people, schema) 0185 0186 # Creates a temporary view using the DataFrame 0187 schemaPeople.createOrReplaceTempView("people") 0188 0189 # SQL can be run over DataFrames that have been registered as a table. 0190 results = spark.sql("SELECT name FROM people") 0191 0192 results.show() 0193 # +-------+ 0194 # | name| 0195 # +-------+ 0196 # |Michael| 0197 # | Andy| 0198 # | Justin| 0199 # +-------+ 0200 # $example off:programmatic_schema$ 0201 0202 if __name__ == "__main__": 0203 # $example on:init_session$ 0204 spark = SparkSession \ 0205 .builder \ 0206 .appName("Python Spark SQL basic example") \ 0207 .config("spark.some.config.option", "some-value") \ 0208 .getOrCreate() 0209 # $example off:init_session$ 0210 0211 basic_df_example(spark) 0212 schema_inference_example(spark) 0213 programmatic_schema_example(spark) 0214 0215 spark.stop()
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |