0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 A collections of builtin avro functions
0020 """
0021
0022
0023 from pyspark import since, SparkContext
0024 from pyspark.rdd import ignore_unicode_prefix
0025 from pyspark.sql.column import Column, _to_java_column
0026 from pyspark.util import _print_missing_jar
0027
0028
0029 @ignore_unicode_prefix
0030 @since(3.0)
0031 def from_avro(data, jsonFormatSchema, options={}):
0032 """
0033 Converts a binary column of Avro format into its corresponding catalyst value.
0034 The specified schema must match the read data, otherwise the behavior is undefined:
0035 it may fail or return arbitrary result.
0036 To deserialize the data with a compatible and evolved schema, the expected Avro schema can be
0037 set via the option avroSchema.
0038
0039 Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
0040 application as per the deployment section of "Apache Avro Data Source Guide".
0041
0042 :param data: the binary column.
0043 :param jsonFormatSchema: the avro schema in JSON string format.
0044 :param options: options to control how the Avro record is parsed.
0045
0046 >>> from pyspark.sql import Row
0047 >>> from pyspark.sql.avro.functions import from_avro, to_avro
0048 >>> data = [(1, Row(name='Alice', age=2))]
0049 >>> df = spark.createDataFrame(data, ("key", "value"))
0050 >>> avroDf = df.select(to_avro(df.value).alias("avro"))
0051 >>> avroDf.collect()
0052 [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
0053 >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields":
0054 ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord",
0055 ... "fields":[{"name":"age","type":["long","null"]},
0056 ... {"name":"name","type":["string","null"]}]},"null"]}]}'''
0057 >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect()
0058 [Row(value=Row(avro=Row(age=2, name=u'Alice')))]
0059 """
0060
0061 sc = SparkContext._active_spark_context
0062 try:
0063 jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(
0064 _to_java_column(data), jsonFormatSchema, options)
0065 except TypeError as e:
0066 if str(e) == "'JavaPackage' object is not callable":
0067 _print_missing_jar("Avro", "avro", "avro", sc.version)
0068 raise
0069 return Column(jc)
0070
0071
0072 @ignore_unicode_prefix
0073 @since(3.0)
0074 def to_avro(data, jsonFormatSchema=""):
0075 """
0076 Converts a column into binary of avro format.
0077
0078 Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
0079 application as per the deployment section of "Apache Avro Data Source Guide".
0080
0081 :param data: the data column.
0082 :param jsonFormatSchema: user-specified output avro schema in JSON string format.
0083
0084 >>> from pyspark.sql import Row
0085 >>> from pyspark.sql.avro.functions import to_avro
0086 >>> data = ['SPADES']
0087 >>> df = spark.createDataFrame(data, "string")
0088 >>> df.select(to_avro(df.value).alias("suite")).collect()
0089 [Row(suite=bytearray(b'\\x00\\x0cSPADES'))]
0090 >>> jsonFormatSchema = '''["null", {"type": "enum", "name": "value",
0091 ... "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]}]'''
0092 >>> df.select(to_avro(df.value, jsonFormatSchema).alias("suite")).collect()
0093 [Row(suite=bytearray(b'\\x02\\x00'))]
0094 """
0095
0096 sc = SparkContext._active_spark_context
0097 try:
0098 if jsonFormatSchema == "":
0099 jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))
0100 else:
0101 jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(
0102 _to_java_column(data), jsonFormatSchema)
0103 except TypeError as e:
0104 if str(e) == "'JavaPackage' object is not callable":
0105 _print_missing_jar("Avro", "avro", "avro", sc.version)
0106 raise
0107 return Column(jc)
0108
0109
0110 def _test():
0111 import os
0112 import sys
0113 from pyspark.testing.utils import search_jar
0114 avro_jar = search_jar("external/avro", "spark-avro", "spark-avro")
0115 if avro_jar is None:
0116 print(
0117 "Skipping all Avro Python tests as the optional Avro project was "
0118 "not compiled into a JAR. To run these tests, "
0119 "you need to build Spark with 'build/sbt -Pavro package' or "
0120 "'build/mvn -Pavro package' before running this test.")
0121 sys.exit(0)
0122 else:
0123 existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
0124 jars_args = "--jars %s" % avro_jar
0125 os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
0126
0127 import doctest
0128 from pyspark.sql import Row, SparkSession
0129 import pyspark.sql.avro.functions
0130 globs = pyspark.sql.avro.functions.__dict__.copy()
0131 spark = SparkSession.builder\
0132 .master("local[4]")\
0133 .appName("sql.avro.functions tests")\
0134 .getOrCreate()
0135 globs['spark'] = spark
0136 (failure_count, test_count) = doctest.testmod(
0137 pyspark.sql.avro.functions, globs=globs,
0138 optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
0139 spark.stop()
0140 if failure_count:
0141 sys.exit(-1)
0142
0143
0144 if __name__ == "__main__":
0145 _test()