Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 A collections of builtin avro functions
0020 """
0021 
0022 
0023 from pyspark import since, SparkContext
0024 from pyspark.rdd import ignore_unicode_prefix
0025 from pyspark.sql.column import Column, _to_java_column
0026 from pyspark.util import _print_missing_jar
0027 
0028 
0029 @ignore_unicode_prefix
0030 @since(3.0)
0031 def from_avro(data, jsonFormatSchema, options={}):
0032     """
0033     Converts a binary column of Avro format into its corresponding catalyst value.
0034     The specified schema must match the read data, otherwise the behavior is undefined:
0035     it may fail or return arbitrary result.
0036     To deserialize the data with a compatible and evolved schema, the expected Avro schema can be
0037     set via the option avroSchema.
0038 
0039     Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
0040     application as per the deployment section of "Apache Avro Data Source Guide".
0041 
0042     :param data: the binary column.
0043     :param jsonFormatSchema: the avro schema in JSON string format.
0044     :param options: options to control how the Avro record is parsed.
0045 
0046     >>> from pyspark.sql import Row
0047     >>> from pyspark.sql.avro.functions import from_avro, to_avro
0048     >>> data = [(1, Row(name='Alice', age=2))]
0049     >>> df = spark.createDataFrame(data, ("key", "value"))
0050     >>> avroDf = df.select(to_avro(df.value).alias("avro"))
0051     >>> avroDf.collect()
0052     [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
0053     >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields":
0054     ...     [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord",
0055     ...     "fields":[{"name":"age","type":["long","null"]},
0056     ...     {"name":"name","type":["string","null"]}]},"null"]}]}'''
0057     >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect()
0058     [Row(value=Row(avro=Row(age=2, name=u'Alice')))]
0059     """
0060 
0061     sc = SparkContext._active_spark_context
0062     try:
0063         jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(
0064             _to_java_column(data), jsonFormatSchema, options)
0065     except TypeError as e:
0066         if str(e) == "'JavaPackage' object is not callable":
0067             _print_missing_jar("Avro", "avro", "avro", sc.version)
0068         raise
0069     return Column(jc)
0070 
0071 
0072 @ignore_unicode_prefix
0073 @since(3.0)
0074 def to_avro(data, jsonFormatSchema=""):
0075     """
0076     Converts a column into binary of avro format.
0077 
0078     Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
0079     application as per the deployment section of "Apache Avro Data Source Guide".
0080 
0081     :param data: the data column.
0082     :param jsonFormatSchema: user-specified output avro schema in JSON string format.
0083 
0084     >>> from pyspark.sql import Row
0085     >>> from pyspark.sql.avro.functions import to_avro
0086     >>> data = ['SPADES']
0087     >>> df = spark.createDataFrame(data, "string")
0088     >>> df.select(to_avro(df.value).alias("suite")).collect()
0089     [Row(suite=bytearray(b'\\x00\\x0cSPADES'))]
0090     >>> jsonFormatSchema = '''["null", {"type": "enum", "name": "value",
0091     ...     "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]}]'''
0092     >>> df.select(to_avro(df.value, jsonFormatSchema).alias("suite")).collect()
0093     [Row(suite=bytearray(b'\\x02\\x00'))]
0094     """
0095 
0096     sc = SparkContext._active_spark_context
0097     try:
0098         if jsonFormatSchema == "":
0099             jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))
0100         else:
0101             jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(
0102                 _to_java_column(data), jsonFormatSchema)
0103     except TypeError as e:
0104         if str(e) == "'JavaPackage' object is not callable":
0105             _print_missing_jar("Avro", "avro", "avro", sc.version)
0106         raise
0107     return Column(jc)
0108 
0109 
0110 def _test():
0111     import os
0112     import sys
0113     from pyspark.testing.utils import search_jar
0114     avro_jar = search_jar("external/avro", "spark-avro", "spark-avro")
0115     if avro_jar is None:
0116         print(
0117             "Skipping all Avro Python tests as the optional Avro project was "
0118             "not compiled into a JAR. To run these tests, "
0119             "you need to build Spark with 'build/sbt -Pavro package' or "
0120             "'build/mvn -Pavro package' before running this test.")
0121         sys.exit(0)
0122     else:
0123         existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
0124         jars_args = "--jars %s" % avro_jar
0125         os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
0126 
0127     import doctest
0128     from pyspark.sql import Row, SparkSession
0129     import pyspark.sql.avro.functions
0130     globs = pyspark.sql.avro.functions.__dict__.copy()
0131     spark = SparkSession.builder\
0132         .master("local[4]")\
0133         .appName("sql.avro.functions tests")\
0134         .getOrCreate()
0135     globs['spark'] = spark
0136     (failure_count, test_count) = doctest.testmod(
0137         pyspark.sql.avro.functions, globs=globs,
0138         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
0139     spark.stop()
0140     if failure_count:
0141         sys.exit(-1)
0142 
0143 
0144 if __name__ == "__main__":
0145     _test()