Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: PySpark Usage Guide for Pandas with Apache Arrow
0004 displayTitle: PySpark Usage Guide for Pandas with Apache Arrow
0005 license: |
0006   Licensed to the Apache Software Foundation (ASF) under one or more
0007   contributor license agreements.  See the NOTICE file distributed with
0008   this work for additional information regarding copyright ownership.
0009   The ASF licenses this file to You under the Apache License, Version 2.0
0010   (the "License"); you may not use this file except in compliance with
0011   the License.  You may obtain a copy of the License at
0012  
0013      http://www.apache.org/licenses/LICENSE-2.0
0014  
0015   Unless required by applicable law or agreed to in writing, software
0016   distributed under the License is distributed on an "AS IS" BASIS,
0017   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018   See the License for the specific language governing permissions and
0019   limitations under the License.
0020 ---
0021 
0022 * Table of contents
0023 {:toc}
0024 
0025 ## Apache Arrow in PySpark
0026 
0027 Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
0028 data between JVM and Python processes. This currently is most beneficial to Python users that
0029 work with Pandas/NumPy data. Its usage is not automatic and might require some minor
0030 changes to configuration or code to take full advantage and ensure compatibility. This guide will
0031 give a high-level description of how to use Arrow in Spark and highlight any differences when
0032 working with Arrow-enabled data.
0033 
0034 ### Ensure PyArrow Installed
0035 
0036 To use Apache Arrow in PySpark, [the recommended version of PyArrow](#recommended-pandas-and-pyarrow-versions)
0037 should be installed.
0038 If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the
0039 SQL module with the command `pip install pyspark[sql]`. Otherwise, you must ensure that PyArrow
0040 is installed and available on all cluster nodes.
0041 You can install using pip or conda from the conda-forge channel. See PyArrow
0042 [installation](https://arrow.apache.org/docs/python/install.html) for details.
0043 
0044 ## Enabling for Conversion to/from Pandas
0045 
0046 Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame
0047 using the call `toPandas()` and when creating a Spark DataFrame from a Pandas DataFrame with
0048 `createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set
0049 the Spark configuration `spark.sql.execution.arrow.pyspark.enabled` to `true`. This is disabled by default.
0050 
0051 In addition, optimizations enabled by `spark.sql.execution.arrow.pyspark.enabled` could fallback automatically
0052 to non-Arrow optimization implementation if an error occurs before the actual computation within Spark.
0053 This can be controlled by `spark.sql.execution.arrow.pyspark.fallback.enabled`.
0054 
0055 <div class="codetabs">
0056 <div data-lang="python" markdown="1">
0057 {% include_example dataframe_with_arrow python/sql/arrow.py %}
0058 </div>
0059 </div>
0060 
0061 Using the above optimizations with Arrow will produce the same results as when Arrow is not
0062 enabled. Note that even with Arrow, `toPandas()` results in the collection of all records in the
0063 DataFrame to the driver program and should be done on a small subset of the data. Not all Spark
0064 data types are currently supported and an error can be raised if a column has an unsupported type,
0065 see [Supported SQL Types](#supported-sql-types). If an error occurs during `createDataFrame()`,
0066 Spark will fall back to create the DataFrame without Arrow.
0067 
0068 ## Pandas UDFs (a.k.a. Vectorized UDFs)
0069 
0070 Pandas UDFs are user defined functions that are executed by Spark using
0071 Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas
0072 UDF is defined using the `pandas_udf` as a decorator or to wrap the function, and no additional
0073 configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.
0074 
0075 Before Spark 3.0, Pandas UDFs used to be defined with `PandasUDFType`. From Spark 3.0
0076 with Python 3.6+, you can also use [Python type hints](https://www.python.org/dev/peps/pep-0484).
0077 Using Python type hints are preferred and using `PandasUDFType` will be deprecated in
0078 the future release.
0079 
0080 Note that the type hint should use `pandas.Series` in all cases but there is one variant
0081 that `pandas.DataFrame` should be used for its input or output type hint instead when the input
0082 or output column is of `StructType`. The following example shows a Pandas UDF which takes long
0083 column, string column and struct column, and outputs a struct column. It requires the function to
0084 specify the type hints of `pandas.Series` and `pandas.DataFrame` as below:
0085 
0086 <p>
0087 <div class="codetabs">
0088 <div data-lang="python" markdown="1">
0089 {% include_example ser_to_frame_pandas_udf python/sql/arrow.py %}
0090 </div>
0091 </div>
0092 </p>
0093 
0094 In the following sections, it describes the combinations of the supported type hints. For simplicity,
0095 `pandas.DataFrame` variant is omitted.
0096 
0097 ### Series to Series
0098 
0099 The type hint can be expressed as `pandas.Series`, ... -> `pandas.Series`.
0100 
0101 By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given
0102 function takes one or more `pandas.Series` and outputs one `pandas.Series`. The output of the function should
0103 always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting
0104 columns into batches and calling the function for each batch as a subset of the data, then concatenating
0105 the results together.
0106 
0107 The following example shows how to create this Pandas UDF that computes the product of 2 columns.
0108 
0109 <div class="codetabs">
0110 <div data-lang="python" markdown="1">
0111 {% include_example ser_to_ser_pandas_udf python/sql/arrow.py %}
0112 </div>
0113 </div>
0114 
0115 For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
0116 
0117 ### Iterator of Series to Iterator of Series
0118 
0119 The type hint can be expressed as `Iterator[pandas.Series]` -> `Iterator[pandas.Series]`.
0120 
0121 By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given
0122 function takes an iterator of `pandas.Series` and outputs an iterator of `pandas.Series`. The
0123 length of the entire output from the function should be the same length of the entire input; therefore, it can
0124 prefetch the data from the input iterator as long as the lengths are the same.
0125 In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use
0126 multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator
0127 of Series.
0128 
0129 It is also useful when the UDF execution requires initializing some states although internally it works
0130 identically as Series to Series case. The pseudocode below illustrates the example.
0131 
0132 {% highlight python %}
0133 @pandas_udf("long")
0134 def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
0135     # Do some expensive initialization with a state
0136     state = very_expensive_initialization()
0137     for x in iterator:
0138         # Use that state for whole iterator.
0139         yield calculate_with_state(x, state)
0140 
0141 df.select(calculate("value")).show()
0142 {% endhighlight %}
0143 
0144 The following example shows how to create this Pandas UDF:
0145 
0146 <div class="codetabs">
0147 <div data-lang="python" markdown="1">
0148 {% include_example iter_ser_to_iter_ser_pandas_udf python/sql/arrow.py %}
0149 </div>
0150 </div>
0151 
0152 For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
0153 
0154 ### Iterator of Multiple Series to Iterator of Series
0155 
0156 The type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` -> `Iterator[pandas.Series]`.
0157 
0158 By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the
0159 given function takes an iterator of a tuple of multiple `pandas.Series` and outputs an iterator of `pandas.Series`.
0160 In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple
0161 when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series
0162 to Iterator of Series case.
0163 
0164 The following example shows how to create this Pandas UDF:
0165 
0166 <div class="codetabs">
0167 <div data-lang="python" markdown="1">
0168 {% include_example iter_sers_to_iter_ser_pandas_udf python/sql/arrow.py %}
0169 </div>
0170 </div>
0171 
0172 For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
0173 
0174 ### Series to Scalar
0175 
0176 The type hint can be expressed as `pandas.Series`, ... -> `Any`.
0177 
0178 By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF similar
0179 to PySpark's aggregate functions. The given function takes `pandas.Series` and returns a scalar value.
0180 The return type should be a primitive data type, and the returned scalar can be either a python
0181 primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
0182 `Any` should ideally be a specific scalar type accordingly.
0183 
0184 This UDF can be also used with `groupBy().agg()` and [`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window).
0185 It defines an aggregation from one or more `pandas.Series` to a scalar value, where each `pandas.Series`
0186 represents a column within the group or window.
0187 
0188 Note that this type of UDF does not support partial aggregation and all data for a group or window
0189 will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas
0190 UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by
0191 and window operations:
0192 
0193 <div class="codetabs">
0194 <div data-lang="python" markdown="1">
0195 {% include_example ser_to_scalar_pandas_udf python/sql/arrow.py %}
0196 </div>
0197 </div>
0198 
0199 For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
0200 
0201 
0202 ## Pandas Function APIs
0203 
0204 Pandas Function APIs can directly apply a Python native function against the whole `DataFrame` by
0205 using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer
0206 data and Pandas to work with the data, which allows vectorized operations. However, A Pandas Function
0207 API behaves as a regular API under PySpark `DataFrame` instead of `Column`, and Python type hints in Pandas
0208 Functions APIs are optional and do not affect how it works internally at this moment although they
0209 might be required in the future.
0210 
0211 From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API,
0212 `DataFrame.groupby().applyInPandas()`. It is still possible to use it with `PandasUDFType`
0213 and `DataFrame.groupby().apply()` as it was; however, it is preferred to use
0214 `DataFrame.groupby().applyInPandas()` directly. Using `PandasUDFType` will be deprecated
0215 in the future.
0216 
0217 ### Grouped Map
0218 
0219 Grouped map operations with Pandas instances are supported by `DataFrame.groupby().applyInPandas()`
0220 which requires a Python function that takes a `pandas.DataFrame` and return another `pandas.DataFrame`.
0221 It maps each group to each `pandas.DataFrame` in the Python function.
0222 
0223 This API implements the "split-apply-combine" pattern which consists of three steps:
0224 * Split the data into groups by using `DataFrame.groupBy`.
0225 * Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
0226   input data contains all the rows and columns for each group.
0227 * Combine the results into a new PySpark `DataFrame`.
0228 
0229 To use `groupBy().applyInPandas()`, the user needs to define the following:
0230 * A Python function that defines the computation for each group.
0231 * A `StructType` object or a string that defines the schema of the output PySpark `DataFrame`.
0232 
0233 The column labels of the returned `pandas.DataFrame` must either match the field names in the
0234 defined output schema if specified as strings, or match the field data types by position if not
0235 strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
0236 on how to label columns when constructing a `pandas.DataFrame`.
0237 
0238 Note that all data for a group will be loaded into memory before the function is applied. This can
0239 lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for
0240 [maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user
0241 to ensure that the grouped data will fit into the available memory.
0242 
0243 The following example shows how to use `groupby().applyInPandas()` to subtract the mean from each value
0244 in the group.
0245 
0246 <div class="codetabs">
0247 <div data-lang="python" markdown="1">
0248 {% include_example grouped_apply_in_pandas python/sql/arrow.py %}
0249 </div>
0250 </div>
0251 
0252 For detailed usage, please see [`pyspark.sql.GroupedData.applyInPandas`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.applyInPandas).
0253 
0254 ### Map
0255 
0256 Map operations with Pandas instances are supported by `DataFrame.mapInPandas()` which maps an iterator
0257 of `pandas.DataFrame`s to another iterator of `pandas.DataFrame`s that represents the current
0258 PySpark `DataFrame` and returns the result as a PySpark `DataFrame`. The functions takes and outputs
0259 an iterator of `pandas.DataFrame`. It can return the output of arbitrary length in contrast to some
0260 Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.
0261 
0262 The following example shows how to use `mapInPandas()`:
0263 
0264 <div class="codetabs">
0265 <div data-lang="python" markdown="1">
0266 {% include_example map_in_pandas python/sql/arrow.py %}
0267 </div>
0268 </div>
0269 
0270 For detailed usage, please see [`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
0271 
0272 ### Co-grouped Map
0273 
0274 Co-grouped map operations with Pandas instances are supported by `DataFrame.groupby().cogroup().applyInPandas()` which
0275 allows two PySpark `DataFrame`s to be cogrouped by a common key and then a Python function applied to each
0276 cogroup. It consists of the following steps:
0277 * Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
0278 * Apply a function to each cogroup. The input of the function is two `pandas.DataFrame` (with an optional tuple
0279 representing the key). The output of the function is a `pandas.DataFrame`.
0280 * Combine the `pandas.DataFrame`s from all groups into a new PySpark `DataFrame`. 
0281 
0282 To use `groupBy().cogroup().applyInPandas()`, the user needs to define the following:
0283 * A Python function that defines the computation for each cogroup.
0284 * A `StructType` object or a string that defines the schema of the output PySpark `DataFrame`.
0285 
0286 The column labels of the returned `pandas.DataFrame` must either match the field names in the
0287 defined output schema if specified as strings, or match the field data types by position if not
0288 strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
0289 on how to label columns when constructing a `pandas.DataFrame`.
0290 
0291 Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
0292 memory exceptions, especially if the group sizes are skewed. The configuration for [maxRecordsPerBatch](#setting-arrow-batch-size)
0293 is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.
0294 
0295 The following example shows how to use `groupby().cogroup().applyInPandas()` to perform an asof join between two datasets.
0296 
0297 <div class="codetabs">
0298 <div data-lang="python" markdown="1">
0299 {% include_example cogrouped_apply_in_pandas python/sql/arrow.py %}
0300 </div>
0301 </div>
0302 
0303 For detailed usage, please see [`pyspark.sql.PandasCogroupedOps.applyInPandas()`](api/python/pyspark.sql.html#pyspark.sql.PandasCogroupedOps.applyInPandas).
0304 
0305 
0306 ## Usage Notes
0307 
0308 ### Supported SQL Types
0309 
0310 Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`,
0311 `ArrayType` of `TimestampType`, and nested `StructType`.
0312 
0313 ### Setting Arrow Batch Size
0314 
0315 Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to
0316 high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
0317 record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch"
0318 to an integer that will determine the maximum number of rows for each batch. The default value is
0319 10,000 records per batch. If the number of columns is large, the value should be adjusted
0320 accordingly. Using this limit, each data partition will be made into 1 or more record batches for
0321 processing.
0322 
0323 ### Timestamp with Time Zone Semantics
0324 
0325 Spark internally stores timestamps as UTC values, and timestamp data that is brought in without
0326 a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp
0327 data is exported or displayed in Spark, the session time zone is used to localize the timestamp
0328 values. The session time zone is set with the configuration 'spark.sql.session.timeZone' and will
0329 default to the JVM system local time zone if not set. Pandas uses a `datetime64` type with nanosecond
0330 resolution, `datetime64[ns]`, with optional time zone on a per-column basis.
0331 
0332 When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds
0333 and each column will be converted to the Spark session time zone then localized to that time
0334 zone, which removes the time zone and displays values as local time. This will occur
0335 when calling `toPandas()` or `pandas_udf` with timestamp columns.
0336 
0337 When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This
0338 occurs when calling `createDataFrame` with a Pandas DataFrame or when returning a timestamp from a
0339 `pandas_udf`. These conversions are done automatically to ensure Spark will have data in the
0340 expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond
0341 values will be truncated.
0342 
0343 Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
0344 different than a Pandas timestamp. It is recommended to use Pandas time series functionality when
0345 working with timestamps in `pandas_udf`s to get the best performance, see
0346 [here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.
0347 
0348 ### Recommended Pandas and PyArrow Versions
0349 
0350 For usage with pyspark.sql, the supported versions of Pandas is 0.24.2 and PyArrow is 0.15.1. Higher
0351 versions may be used, however, compatibility and data correctness can not be guaranteed and should
0352 be verified by the user.
0353 
0354 ### Compatibility Setting for PyArrow >= 0.15.0 and Spark 2.3.x, 2.4.x
0355 
0356 Since Arrow 0.15.0, a change in the binary IPC format requires an environment variable to be
0357 compatible with previous versions of Arrow <= 0.14.1. This is only necessary to do for PySpark
0358 users with versions 2.3.x and 2.4.x that have manually upgraded PyArrow to 0.15.0. The following
0359 can be added to `conf/spark-env.sh` to use the legacy Arrow IPC format:
0360 
0361 ```
0362 ARROW_PRE_0_15_IPC_FORMAT=1
0363 ```
0364 
0365 This will instruct PyArrow >= 0.15.0 to use the legacy IPC format with the older Arrow Java that
0366 is in Spark 2.3.x and 2.4.x. Not setting this environment variable will lead to a similar error as
0367 described in [SPARK-29367](https://issues.apache.org/jira/browse/SPARK-29367) when running
0368 `pandas_udf`s or `toPandas()` with Arrow enabled. More information about the Arrow IPC change can
0369 be read on the Arrow 0.15.0 release [blog](http://arrow.apache.org/blog/2019/10/06/0.15.0-release/#columnar-streaming-protocol-change-since-0140).