Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 A simple example demonstrating Spark SQL Hive integration.
0020 Run with:
0021   ./bin/spark-submit examples/src/main/python/sql/hive.py
0022 """
0023 from __future__ import print_function
0024 
0025 # $example on:spark_hive$
0026 from os.path import join, abspath
0027 
0028 from pyspark.sql import SparkSession
0029 from pyspark.sql import Row
0030 # $example off:spark_hive$
0031 
0032 
0033 if __name__ == "__main__":
0034     # $example on:spark_hive$
0035     # warehouse_location points to the default location for managed databases and tables
0036     warehouse_location = abspath('spark-warehouse')
0037 
0038     spark = SparkSession \
0039         .builder \
0040         .appName("Python Spark SQL Hive integration example") \
0041         .config("spark.sql.warehouse.dir", warehouse_location) \
0042         .enableHiveSupport() \
0043         .getOrCreate()
0044 
0045     # spark is an existing SparkSession
0046     spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
0047     spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
0048 
0049     # Queries are expressed in HiveQL
0050     spark.sql("SELECT * FROM src").show()
0051     # +---+-------+
0052     # |key|  value|
0053     # +---+-------+
0054     # |238|val_238|
0055     # | 86| val_86|
0056     # |311|val_311|
0057     # ...
0058 
0059     # Aggregation queries are also supported.
0060     spark.sql("SELECT COUNT(*) FROM src").show()
0061     # +--------+
0062     # |count(1)|
0063     # +--------+
0064     # |    500 |
0065     # +--------+
0066 
0067     # The results of SQL queries are themselves DataFrames and support all normal functions.
0068     sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
0069 
0070     # The items in DataFrames are of type Row, which allows you to access each column by ordinal.
0071     stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
0072     for record in stringsDS.collect():
0073         print(record)
0074     # Key: 0, Value: val_0
0075     # Key: 0, Value: val_0
0076     # Key: 0, Value: val_0
0077     # ...
0078 
0079     # You can also use DataFrames to create temporary views within a SparkSession.
0080     Record = Row("key", "value")
0081     recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
0082     recordsDF.createOrReplaceTempView("records")
0083 
0084     # Queries can then join DataFrame data with data stored in Hive.
0085     spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
0086     # +---+------+---+------+
0087     # |key| value|key| value|
0088     # +---+------+---+------+
0089     # |  2| val_2|  2| val_2|
0090     # |  4| val_4|  4| val_4|
0091     # |  5| val_5|  5| val_5|
0092     # ...
0093     # $example off:spark_hive$
0094 
0095     spark.stop()