Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 A simple example demonstrating Spark SQL data sources.
0020 Run with:
0021   ./bin/spark-submit examples/src/main/python/sql/datasource.py
0022 """
0023 from __future__ import print_function
0024 
0025 from pyspark.sql import SparkSession
0026 # $example on:schema_merging$
0027 from pyspark.sql import Row
0028 # $example off:schema_merging$
0029 
0030 
0031 def generic_file_source_options_example(spark):
0032     # $example on:ignore_corrupt_files$
0033     # enable ignore corrupt files
0034     spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
0035     # dir1/file3.json is corrupt from parquet's view
0036     test_corrupt_df = spark.read.parquet("examples/src/main/resources/dir1/",
0037                                          "examples/src/main/resources/dir1/dir2/")
0038     test_corrupt_df.show()
0039     # +-------------+
0040     # |         file|
0041     # +-------------+
0042     # |file1.parquet|
0043     # |file2.parquet|
0044     # +-------------+
0045     # $example off:ignore_corrupt_files$
0046 
0047     # $example on:recursive_file_lookup$
0048     recursive_loaded_df = spark.read.format("parquet")\
0049         .option("recursiveFileLookup", "true")\
0050         .load("examples/src/main/resources/dir1")
0051     recursive_loaded_df.show()
0052     # +-------------+
0053     # |         file|
0054     # +-------------+
0055     # |file1.parquet|
0056     # |file2.parquet|
0057     # +-------------+
0058     # $example off:recursive_file_lookup$
0059     spark.sql("set spark.sql.files.ignoreCorruptFiles=false")
0060 
0061     # $example on:load_with_path_glob_filter$
0062     df = spark.read.load("examples/src/main/resources/dir1",
0063                          format="parquet", pathGlobFilter="*.parquet")
0064     df.show()
0065     # +-------------+
0066     # |         file|
0067     # +-------------+
0068     # |file1.parquet|
0069     # +-------------+
0070     # $example off:load_with_path_glob_filter$
0071 
0072 
0073 def basic_datasource_example(spark):
0074     # $example on:generic_load_save_functions$
0075     df = spark.read.load("examples/src/main/resources/users.parquet")
0076     df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
0077     # $example off:generic_load_save_functions$
0078 
0079     # $example on:write_partitioning$
0080     df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
0081     # $example off:write_partitioning$
0082 
0083     # $example on:write_partition_and_bucket$
0084     df = spark.read.parquet("examples/src/main/resources/users.parquet")
0085     (df
0086         .write
0087         .partitionBy("favorite_color")
0088         .bucketBy(42, "name")
0089         .saveAsTable("people_partitioned_bucketed"))
0090     # $example off:write_partition_and_bucket$
0091 
0092     # $example on:manual_load_options$
0093     df = spark.read.load("examples/src/main/resources/people.json", format="json")
0094     df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
0095     # $example off:manual_load_options$
0096 
0097     # $example on:manual_load_options_csv$
0098     df = spark.read.load("examples/src/main/resources/people.csv",
0099                          format="csv", sep=":", inferSchema="true", header="true")
0100     # $example off:manual_load_options_csv$
0101 
0102     # $example on:manual_save_options_orc$
0103     df = spark.read.orc("examples/src/main/resources/users.orc")
0104     (df.write.format("orc")
0105         .option("orc.bloom.filter.columns", "favorite_color")
0106         .option("orc.dictionary.key.threshold", "1.0")
0107         .option("orc.column.encoding.direct", "name")
0108         .save("users_with_options.orc"))
0109     # $example off:manual_save_options_orc$
0110 
0111     # $example on:write_sorting_and_bucketing$
0112     df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
0113     # $example off:write_sorting_and_bucketing$
0114 
0115     # $example on:direct_sql$
0116     df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
0117     # $example off:direct_sql$
0118 
0119     spark.sql("DROP TABLE IF EXISTS people_bucketed")
0120     spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")
0121 
0122 
0123 def parquet_example(spark):
0124     # $example on:basic_parquet_example$
0125     peopleDF = spark.read.json("examples/src/main/resources/people.json")
0126 
0127     # DataFrames can be saved as Parquet files, maintaining the schema information.
0128     peopleDF.write.parquet("people.parquet")
0129 
0130     # Read in the Parquet file created above.
0131     # Parquet files are self-describing so the schema is preserved.
0132     # The result of loading a parquet file is also a DataFrame.
0133     parquetFile = spark.read.parquet("people.parquet")
0134 
0135     # Parquet files can also be used to create a temporary view and then used in SQL statements.
0136     parquetFile.createOrReplaceTempView("parquetFile")
0137     teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
0138     teenagers.show()
0139     # +------+
0140     # |  name|
0141     # +------+
0142     # |Justin|
0143     # +------+
0144     # $example off:basic_parquet_example$
0145 
0146 
0147 def parquet_schema_merging_example(spark):
0148     # $example on:schema_merging$
0149     # spark is from the previous example.
0150     # Create a simple DataFrame, stored into a partition directory
0151     sc = spark.sparkContext
0152 
0153     squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
0154                                       .map(lambda i: Row(single=i, double=i ** 2)))
0155     squaresDF.write.parquet("data/test_table/key=1")
0156 
0157     # Create another DataFrame in a new partition directory,
0158     # adding a new column and dropping an existing column
0159     cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
0160                                     .map(lambda i: Row(single=i, triple=i ** 3)))
0161     cubesDF.write.parquet("data/test_table/key=2")
0162 
0163     # Read the partitioned table
0164     mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
0165     mergedDF.printSchema()
0166 
0167     # The final schema consists of all 3 columns in the Parquet files together
0168     # with the partitioning column appeared in the partition directory paths.
0169     # root
0170     #  |-- double: long (nullable = true)
0171     #  |-- single: long (nullable = true)
0172     #  |-- triple: long (nullable = true)
0173     #  |-- key: integer (nullable = true)
0174     # $example off:schema_merging$
0175 
0176 
0177 def json_dataset_example(spark):
0178     # $example on:json_dataset$
0179     # spark is from the previous example.
0180     sc = spark.sparkContext
0181 
0182     # A JSON dataset is pointed to by path.
0183     # The path can be either a single text file or a directory storing text files
0184     path = "examples/src/main/resources/people.json"
0185     peopleDF = spark.read.json(path)
0186 
0187     # The inferred schema can be visualized using the printSchema() method
0188     peopleDF.printSchema()
0189     # root
0190     #  |-- age: long (nullable = true)
0191     #  |-- name: string (nullable = true)
0192 
0193     # Creates a temporary view using the DataFrame
0194     peopleDF.createOrReplaceTempView("people")
0195 
0196     # SQL statements can be run by using the sql methods provided by spark
0197     teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
0198     teenagerNamesDF.show()
0199     # +------+
0200     # |  name|
0201     # +------+
0202     # |Justin|
0203     # +------+
0204 
0205     # Alternatively, a DataFrame can be created for a JSON dataset represented by
0206     # an RDD[String] storing one JSON object per string
0207     jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
0208     otherPeopleRDD = sc.parallelize(jsonStrings)
0209     otherPeople = spark.read.json(otherPeopleRDD)
0210     otherPeople.show()
0211     # +---------------+----+
0212     # |        address|name|
0213     # +---------------+----+
0214     # |[Columbus,Ohio]| Yin|
0215     # +---------------+----+
0216     # $example off:json_dataset$
0217 
0218 
0219 def jdbc_dataset_example(spark):
0220     # $example on:jdbc_dataset$
0221     # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
0222     # Loading data from a JDBC source
0223     jdbcDF = spark.read \
0224         .format("jdbc") \
0225         .option("url", "jdbc:postgresql:dbserver") \
0226         .option("dbtable", "schema.tablename") \
0227         .option("user", "username") \
0228         .option("password", "password") \
0229         .load()
0230 
0231     jdbcDF2 = spark.read \
0232         .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
0233               properties={"user": "username", "password": "password"})
0234 
0235     # Specifying dataframe column data types on read
0236     jdbcDF3 = spark.read \
0237         .format("jdbc") \
0238         .option("url", "jdbc:postgresql:dbserver") \
0239         .option("dbtable", "schema.tablename") \
0240         .option("user", "username") \
0241         .option("password", "password") \
0242         .option("customSchema", "id DECIMAL(38, 0), name STRING") \
0243         .load()
0244 
0245     # Saving data to a JDBC source
0246     jdbcDF.write \
0247         .format("jdbc") \
0248         .option("url", "jdbc:postgresql:dbserver") \
0249         .option("dbtable", "schema.tablename") \
0250         .option("user", "username") \
0251         .option("password", "password") \
0252         .save()
0253 
0254     jdbcDF2.write \
0255         .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
0256               properties={"user": "username", "password": "password"})
0257 
0258     # Specifying create table column data types on write
0259     jdbcDF.write \
0260         .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
0261         .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
0262               properties={"user": "username", "password": "password"})
0263     # $example off:jdbc_dataset$
0264 
0265 
0266 if __name__ == "__main__":
0267     spark = SparkSession \
0268         .builder \
0269         .appName("Python Spark SQL data source example") \
0270         .getOrCreate()
0271 
0272     basic_datasource_example(spark)
0273     generic_file_source_options_example(spark)
0274     parquet_example(spark)
0275     parquet_schema_merging_example(spark)
0276     json_dataset_example(spark)
0277     jdbc_dataset_example(spark)
0278 
0279     spark.stop()