0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 A simple example demonstrating Arrow in Spark.
0020 Run with:
0021 ./bin/spark-submit examples/src/main/python/sql/arrow.py
0022 """
0023
0024 from __future__ import print_function
0025
0026 import sys
0027
0028 from pyspark.sql import SparkSession
0029 from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
0030
0031 require_minimum_pandas_version()
0032 require_minimum_pyarrow_version()
0033
0034 if sys.version_info < (3, 6):
0035 raise Exception(
0036 "Running this example file requires Python 3.6+; however, "
0037 "your Python version was:\n %s" % sys.version)
0038
0039
0040 def dataframe_with_arrow_example(spark):
0041
0042 import numpy as np
0043 import pandas as pd
0044
0045
0046 spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
0047
0048
0049 pdf = pd.DataFrame(np.random.rand(100, 3))
0050
0051
0052 df = spark.createDataFrame(pdf)
0053
0054
0055 result_pdf = df.select("*").toPandas()
0056
0057 print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))
0058
0059
0060 def ser_to_frame_pandas_udf_example(spark):
0061
0062 import pandas as pd
0063
0064 from pyspark.sql.functions import pandas_udf
0065
0066 @pandas_udf("col1 string, col2 long")
0067 def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
0068 s3['col2'] = s1 + s2.str.len()
0069 return s3
0070
0071
0072 df = spark.createDataFrame(
0073 [[1, "a string", ("a nested string",)]],
0074 "long_col long, string_col string, struct_col struct<col1:string>")
0075
0076 df.printSchema()
0077
0078
0079
0080
0081
0082
0083 df.select(func("long_col", "string_col", "struct_col")).printSchema()
0084
0085
0086
0087
0088
0089
0090 def ser_to_ser_pandas_udf_example(spark):
0091
0092 import pandas as pd
0093
0094 from pyspark.sql.functions import col, pandas_udf
0095 from pyspark.sql.types import LongType
0096
0097
0098 def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
0099 return a * b
0100
0101 multiply = pandas_udf(multiply_func, returnType=LongType())
0102
0103
0104 x = pd.Series([1, 2, 3])
0105 print(multiply_func(x, x))
0106
0107
0108
0109
0110
0111
0112 df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
0113
0114
0115 df.select(multiply(col("x"), col("x"))).show()
0116
0117
0118
0119
0120
0121
0122
0123
0124
0125
0126 def iter_ser_to_iter_ser_pandas_udf_example(spark):
0127
0128 from typing import Iterator
0129
0130 import pandas as pd
0131
0132 from pyspark.sql.functions import pandas_udf
0133
0134 pdf = pd.DataFrame([1, 2, 3], columns=["x"])
0135 df = spark.createDataFrame(pdf)
0136
0137
0138 @pandas_udf("long")
0139 def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
0140 for x in iterator:
0141 yield x + 1
0142
0143 df.select(plus_one("x")).show()
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154 def iter_sers_to_iter_ser_pandas_udf_example(spark):
0155
0156 from typing import Iterator, Tuple
0157
0158 import pandas as pd
0159
0160 from pyspark.sql.functions import pandas_udf
0161
0162 pdf = pd.DataFrame([1, 2, 3], columns=["x"])
0163 df = spark.createDataFrame(pdf)
0164
0165
0166 @pandas_udf("long")
0167 def multiply_two_cols(
0168 iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
0169 for a, b in iterator:
0170 yield a * b
0171
0172 df.select(multiply_two_cols("x", "x")).show()
0173
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183 def ser_to_scalar_pandas_udf_example(spark):
0184
0185 import pandas as pd
0186
0187 from pyspark.sql.functions import pandas_udf
0188 from pyspark.sql import Window
0189
0190 df = spark.createDataFrame(
0191 [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
0192 ("id", "v"))
0193
0194
0195 @pandas_udf("double")
0196 def mean_udf(v: pd.Series) -> float:
0197 return v.mean()
0198
0199 df.select(mean_udf(df['v'])).show()
0200
0201
0202
0203
0204
0205
0206 df.groupby("id").agg(mean_udf(df['v'])).show()
0207
0208
0209
0210
0211
0212
0213
0214 w = Window \
0215 .partitionBy('id') \
0216 .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
0217 df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
0218
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230 def grouped_apply_in_pandas_example(spark):
0231
0232 df = spark.createDataFrame(
0233 [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
0234 ("id", "v"))
0235
0236 def subtract_mean(pdf):
0237
0238 v = pdf.v
0239 return pdf.assign(v=v - v.mean())
0240
0241 df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
0242
0243
0244
0245
0246
0247
0248
0249
0250
0251
0252
0253
0254 def map_in_pandas_example(spark):
0255
0256 df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
0257
0258 def filter_func(iterator):
0259 for pdf in iterator:
0260 yield pdf[pdf.id == 1]
0261
0262 df.mapInPandas(filter_func, schema=df.schema).show()
0263
0264
0265
0266
0267
0268
0269
0270
0271 def cogrouped_apply_in_pandas_example(spark):
0272
0273 import pandas as pd
0274
0275 df1 = spark.createDataFrame(
0276 [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
0277 ("time", "id", "v1"))
0278
0279 df2 = spark.createDataFrame(
0280 [(20000101, 1, "x"), (20000101, 2, "y")],
0281 ("time", "id", "v2"))
0282
0283 def asof_join(l, r):
0284 return pd.merge_asof(l, r, on="time", by="id")
0285
0286 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
0287 asof_join, schema="time int, id int, v1 double, v2 string").show()
0288
0289
0290
0291
0292
0293
0294
0295
0296
0297
0298
0299 if __name__ == "__main__":
0300 spark = SparkSession \
0301 .builder \
0302 .appName("Python Arrow-in-Spark example") \
0303 .getOrCreate()
0304
0305 print("Running Pandas to/from conversion example")
0306 dataframe_with_arrow_example(spark)
0307 print("Running pandas_udf example: Series to Frame")
0308 ser_to_frame_pandas_udf_example(spark)
0309 print("Running pandas_udf example: Series to Series")
0310 ser_to_ser_pandas_udf_example(spark)
0311 print("Running pandas_udf example: Iterator of Series to Iterator of Seires")
0312 iter_ser_to_iter_ser_pandas_udf_example(spark)
0313 print("Running pandas_udf example: Iterator of Multiple Series to Iterator of Series")
0314 iter_sers_to_iter_ser_pandas_udf_example(spark)
0315 print("Running pandas_udf example: Series to Scalar")
0316 ser_to_scalar_pandas_udf_example(spark)
0317 print("Running pandas function example: Grouped Map")
0318 grouped_apply_in_pandas_example(spark)
0319 print("Running pandas function example: Map")
0320 map_in_pandas_example(spark)
0321 print("Running pandas function example: Co-grouped Map")
0322 cogrouped_apply_in_pandas_example(spark)
0323
0324 spark.stop()