|
||||
0001 # 0002 # Licensed to the Apache Software Foundation (ASF) under one or more 0003 # contributor license agreements. See the NOTICE file distributed with 0004 # this work for additional information regarding copyright ownership. 0005 # The ASF licenses this file to You under the Apache License, Version 2.0 0006 # (the "License"); you may not use this file except in compliance with 0007 # the License. You may obtain a copy of the License at 0008 # 0009 # http://www.apache.org/licenses/LICENSE-2.0 0010 # 0011 # Unless required by applicable law or agreed to in writing, software 0012 # distributed under the License is distributed on an "AS IS" BASIS, 0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 # See the License for the specific language governing permissions and 0015 # limitations under the License. 0016 # 0017 0018 """ 0019 A simple example demonstrating Spark SQL Hive integration. 0020 Run with: 0021 ./bin/spark-submit examples/src/main/python/sql/hive.py 0022 """ 0023 from __future__ import print_function 0024 0025 # $example on:spark_hive$ 0026 from os.path import join, abspath 0027 0028 from pyspark.sql import SparkSession 0029 from pyspark.sql import Row 0030 # $example off:spark_hive$ 0031 0032 0033 if __name__ == "__main__": 0034 # $example on:spark_hive$ 0035 # warehouse_location points to the default location for managed databases and tables 0036 warehouse_location = abspath('spark-warehouse') 0037 0038 spark = SparkSession \ 0039 .builder \ 0040 .appName("Python Spark SQL Hive integration example") \ 0041 .config("spark.sql.warehouse.dir", warehouse_location) \ 0042 .enableHiveSupport() \ 0043 .getOrCreate() 0044 0045 # spark is an existing SparkSession 0046 spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") 0047 spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") 0048 0049 # Queries are expressed in HiveQL 0050 spark.sql("SELECT * FROM src").show() 0051 # +---+-------+ 0052 # |key| value| 0053 # +---+-------+ 0054 # |238|val_238| 0055 # | 86| val_86| 0056 # |311|val_311| 0057 # ... 0058 0059 # Aggregation queries are also supported. 0060 spark.sql("SELECT COUNT(*) FROM src").show() 0061 # +--------+ 0062 # |count(1)| 0063 # +--------+ 0064 # | 500 | 0065 # +--------+ 0066 0067 # The results of SQL queries are themselves DataFrames and support all normal functions. 0068 sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") 0069 0070 # The items in DataFrames are of type Row, which allows you to access each column by ordinal. 0071 stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value)) 0072 for record in stringsDS.collect(): 0073 print(record) 0074 # Key: 0, Value: val_0 0075 # Key: 0, Value: val_0 0076 # Key: 0, Value: val_0 0077 # ... 0078 0079 # You can also use DataFrames to create temporary views within a SparkSession. 0080 Record = Row("key", "value") 0081 recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)]) 0082 recordsDF.createOrReplaceTempView("records") 0083 0084 # Queries can then join DataFrame data with data stored in Hive. 0085 spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() 0086 # +---+------+---+------+ 0087 # |key| value|key| value| 0088 # +---+------+---+------+ 0089 # | 2| val_2| 2| val_2| 0090 # | 4| val_4| 4| val_4| 0091 # | 5| val_5| 5| val_5| 0092 # ... 0093 # $example off:spark_hive$ 0094 0095 spark.stop()
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |