Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 A simple example demonstrating Arrow in Spark.
0020 Run with:
0021   ./bin/spark-submit examples/src/main/python/sql/arrow.py
0022 """
0023 
0024 from __future__ import print_function
0025 
0026 import sys
0027 
0028 from pyspark.sql import SparkSession
0029 from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
0030 
0031 require_minimum_pandas_version()
0032 require_minimum_pyarrow_version()
0033 
0034 if sys.version_info < (3, 6):
0035     raise Exception(
0036         "Running this example file requires Python 3.6+; however, "
0037         "your Python version was:\n %s" % sys.version)
0038 
0039 
0040 def dataframe_with_arrow_example(spark):
0041     # $example on:dataframe_with_arrow$
0042     import numpy as np
0043     import pandas as pd
0044 
0045     # Enable Arrow-based columnar data transfers
0046     spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
0047 
0048     # Generate a Pandas DataFrame
0049     pdf = pd.DataFrame(np.random.rand(100, 3))
0050 
0051     # Create a Spark DataFrame from a Pandas DataFrame using Arrow
0052     df = spark.createDataFrame(pdf)
0053 
0054     # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
0055     result_pdf = df.select("*").toPandas()
0056     # $example off:dataframe_with_arrow$
0057     print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))
0058 
0059 
0060 def ser_to_frame_pandas_udf_example(spark):
0061     # $example on:ser_to_frame_pandas_udf$
0062     import pandas as pd
0063 
0064     from pyspark.sql.functions import pandas_udf
0065 
0066     @pandas_udf("col1 string, col2 long")
0067     def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
0068         s3['col2'] = s1 + s2.str.len()
0069         return s3
0070 
0071     # Create a Spark DataFrame that has three columns including a sturct column.
0072     df = spark.createDataFrame(
0073         [[1, "a string", ("a nested string",)]],
0074         "long_col long, string_col string, struct_col struct<col1:string>")
0075 
0076     df.printSchema()
0077     # root
0078     # |-- long_column: long (nullable = true)
0079     # |-- string_column: string (nullable = true)
0080     # |-- struct_column: struct (nullable = true)
0081     # |    |-- col1: string (nullable = true)
0082 
0083     df.select(func("long_col", "string_col", "struct_col")).printSchema()
0084     # |-- func(long_col, string_col, struct_col): struct (nullable = true)
0085     # |    |-- col1: string (nullable = true)
0086     # |    |-- col2: long (nullable = true)
0087     # $example off:ser_to_frame_pandas_udf$$
0088 
0089 
0090 def ser_to_ser_pandas_udf_example(spark):
0091     # $example on:ser_to_ser_pandas_udf$
0092     import pandas as pd
0093 
0094     from pyspark.sql.functions import col, pandas_udf
0095     from pyspark.sql.types import LongType
0096 
0097     # Declare the function and create the UDF
0098     def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
0099         return a * b
0100 
0101     multiply = pandas_udf(multiply_func, returnType=LongType())
0102 
0103     # The function for a pandas_udf should be able to execute with local Pandas data
0104     x = pd.Series([1, 2, 3])
0105     print(multiply_func(x, x))
0106     # 0    1
0107     # 1    4
0108     # 2    9
0109     # dtype: int64
0110 
0111     # Create a Spark DataFrame, 'spark' is an existing SparkSession
0112     df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
0113 
0114     # Execute function as a Spark vectorized UDF
0115     df.select(multiply(col("x"), col("x"))).show()
0116     # +-------------------+
0117     # |multiply_func(x, x)|
0118     # +-------------------+
0119     # |                  1|
0120     # |                  4|
0121     # |                  9|
0122     # +-------------------+
0123     # $example off:ser_to_ser_pandas_udf$
0124 
0125 
0126 def iter_ser_to_iter_ser_pandas_udf_example(spark):
0127     # $example on:iter_ser_to_iter_ser_pandas_udf$
0128     from typing import Iterator
0129 
0130     import pandas as pd
0131 
0132     from pyspark.sql.functions import pandas_udf
0133 
0134     pdf = pd.DataFrame([1, 2, 3], columns=["x"])
0135     df = spark.createDataFrame(pdf)
0136 
0137     # Declare the function and create the UDF
0138     @pandas_udf("long")
0139     def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
0140         for x in iterator:
0141             yield x + 1
0142 
0143     df.select(plus_one("x")).show()
0144     # +-----------+
0145     # |plus_one(x)|
0146     # +-----------+
0147     # |          2|
0148     # |          3|
0149     # |          4|
0150     # +-----------+
0151     # $example off:iter_ser_to_iter_ser_pandas_udf$
0152 
0153 
0154 def iter_sers_to_iter_ser_pandas_udf_example(spark):
0155     # $example on:iter_sers_to_iter_ser_pandas_udf$
0156     from typing import Iterator, Tuple
0157 
0158     import pandas as pd
0159 
0160     from pyspark.sql.functions import pandas_udf
0161 
0162     pdf = pd.DataFrame([1, 2, 3], columns=["x"])
0163     df = spark.createDataFrame(pdf)
0164 
0165     # Declare the function and create the UDF
0166     @pandas_udf("long")
0167     def multiply_two_cols(
0168             iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
0169         for a, b in iterator:
0170             yield a * b
0171 
0172     df.select(multiply_two_cols("x", "x")).show()
0173     # +-----------------------+
0174     # |multiply_two_cols(x, x)|
0175     # +-----------------------+
0176     # |                      1|
0177     # |                      4|
0178     # |                      9|
0179     # +-----------------------+
0180     # $example off:iter_sers_to_iter_ser_pandas_udf$
0181 
0182 
0183 def ser_to_scalar_pandas_udf_example(spark):
0184     # $example on:ser_to_scalar_pandas_udf$
0185     import pandas as pd
0186 
0187     from pyspark.sql.functions import pandas_udf
0188     from pyspark.sql import Window
0189 
0190     df = spark.createDataFrame(
0191         [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
0192         ("id", "v"))
0193 
0194     # Declare the function and create the UDF
0195     @pandas_udf("double")
0196     def mean_udf(v: pd.Series) -> float:
0197         return v.mean()
0198 
0199     df.select(mean_udf(df['v'])).show()
0200     # +-----------+
0201     # |mean_udf(v)|
0202     # +-----------+
0203     # |        4.2|
0204     # +-----------+
0205 
0206     df.groupby("id").agg(mean_udf(df['v'])).show()
0207     # +---+-----------+
0208     # | id|mean_udf(v)|
0209     # +---+-----------+
0210     # |  1|        1.5|
0211     # |  2|        6.0|
0212     # +---+-----------+
0213 
0214     w = Window \
0215         .partitionBy('id') \
0216         .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
0217     df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
0218     # +---+----+------+
0219     # | id|   v|mean_v|
0220     # +---+----+------+
0221     # |  1| 1.0|   1.5|
0222     # |  1| 2.0|   1.5|
0223     # |  2| 3.0|   6.0|
0224     # |  2| 5.0|   6.0|
0225     # |  2|10.0|   6.0|
0226     # +---+----+------+
0227     # $example off:ser_to_scalar_pandas_udf$
0228 
0229 
0230 def grouped_apply_in_pandas_example(spark):
0231     # $example on:grouped_apply_in_pandas$
0232     df = spark.createDataFrame(
0233         [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
0234         ("id", "v"))
0235 
0236     def subtract_mean(pdf):
0237         # pdf is a pandas.DataFrame
0238         v = pdf.v
0239         return pdf.assign(v=v - v.mean())
0240 
0241     df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
0242     # +---+----+
0243     # | id|   v|
0244     # +---+----+
0245     # |  1|-0.5|
0246     # |  1| 0.5|
0247     # |  2|-3.0|
0248     # |  2|-1.0|
0249     # |  2| 4.0|
0250     # +---+----+
0251     # $example off:grouped_apply_in_pandas$
0252 
0253 
0254 def map_in_pandas_example(spark):
0255     # $example on:map_in_pandas$
0256     df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
0257 
0258     def filter_func(iterator):
0259         for pdf in iterator:
0260             yield pdf[pdf.id == 1]
0261 
0262     df.mapInPandas(filter_func, schema=df.schema).show()
0263     # +---+---+
0264     # | id|age|
0265     # +---+---+
0266     # |  1| 21|
0267     # +---+---+
0268     # $example off:map_in_pandas$
0269 
0270 
0271 def cogrouped_apply_in_pandas_example(spark):
0272     # $example on:cogrouped_apply_in_pandas$
0273     import pandas as pd
0274 
0275     df1 = spark.createDataFrame(
0276         [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
0277         ("time", "id", "v1"))
0278 
0279     df2 = spark.createDataFrame(
0280         [(20000101, 1, "x"), (20000101, 2, "y")],
0281         ("time", "id", "v2"))
0282 
0283     def asof_join(l, r):
0284         return pd.merge_asof(l, r, on="time", by="id")
0285 
0286     df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
0287         asof_join, schema="time int, id int, v1 double, v2 string").show()
0288     # +--------+---+---+---+
0289     # |    time| id| v1| v2|
0290     # +--------+---+---+---+
0291     # |20000101|  1|1.0|  x|
0292     # |20000102|  1|3.0|  x|
0293     # |20000101|  2|2.0|  y|
0294     # |20000102|  2|4.0|  y|
0295     # +--------+---+---+---+
0296     # $example off:cogrouped_apply_in_pandas$
0297 
0298 
0299 if __name__ == "__main__":
0300     spark = SparkSession \
0301         .builder \
0302         .appName("Python Arrow-in-Spark example") \
0303         .getOrCreate()
0304 
0305     print("Running Pandas to/from conversion example")
0306     dataframe_with_arrow_example(spark)
0307     print("Running pandas_udf example: Series to Frame")
0308     ser_to_frame_pandas_udf_example(spark)
0309     print("Running pandas_udf example: Series to Series")
0310     ser_to_ser_pandas_udf_example(spark)
0311     print("Running pandas_udf example: Iterator of Series to Iterator of Seires")
0312     iter_ser_to_iter_ser_pandas_udf_example(spark)
0313     print("Running pandas_udf example: Iterator of Multiple Series to Iterator of Series")
0314     iter_sers_to_iter_ser_pandas_udf_example(spark)
0315     print("Running pandas_udf example: Series to Scalar")
0316     ser_to_scalar_pandas_udf_example(spark)
0317     print("Running pandas function example: Grouped Map")
0318     grouped_apply_in_pandas_example(spark)
0319     print("Running pandas function example: Map")
0320     map_in_pandas_example(spark)
0321     print("Running pandas function example: Co-grouped Map")
0322     cogrouped_apply_in_pandas_example(spark)
0323 
0324     spark.stop()