0001 ---
0002 layout: global
0003 displayTitle: Integration with Cloud Infrastructures
0004 title: Integration with Cloud Infrastructures
0005 description: Introduction to cloud storage support in Apache Spark SPARK_VERSION_SHORT
0006 license: |
0007 Licensed to the Apache Software Foundation (ASF) under one or more
0008 contributor license agreements. See the NOTICE file distributed with
0009 this work for additional information regarding copyright ownership.
0010 The ASF licenses this file to You under the Apache License, Version 2.0
0011 (the "License"); you may not use this file except in compliance with
0012 the License. You may obtain a copy of the License at
0013
0014 http://www.apache.org/licenses/LICENSE-2.0
0015
0016 Unless required by applicable law or agreed to in writing, software
0017 distributed under the License is distributed on an "AS IS" BASIS,
0018 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0019 See the License for the specific language governing permissions and
0020 limitations under the License.
0021 ---
0022
0023 * This will become a table of contents (this text will be scraped).
0024 {:toc}
0025
0026 ## Introduction
0027
0028
0029 All major cloud providers offer persistent data storage in *object stores*.
0030 These are not classic "POSIX" file systems.
0031 In order to store hundreds of petabytes of data without any single points of failure,
0032 object stores replace the classic file system directory tree
0033 with a simpler model of `object-name => data`. To enable remote access, operations
0034 on objects are usually offered as (slow) HTTP REST operations.
0035
0036 Spark can read and write data in object stores through filesystem connectors implemented
0037 in Hadoop or provided by the infrastructure suppliers themselves.
0038 These connectors make the object stores look *almost* like file systems, with directories and files
0039 and the classic operations on them such as list, delete and rename.
0040
0041
0042 ### Important: Cloud Object Stores are Not Real Filesystems
0043
0044 While the stores appear to be filesystems, underneath
0045 they are still object stores, [and the difference is significant](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/introduction.html)
0046
0047 They cannot be used as a direct replacement for a cluster filesystem such as HDFS
0048 *except where this is explicitly stated*.
0049
0050 Key differences are:
0051
0052 * Changes to stored objects may not be immediately visible, both in directory listings and actual data access.
0053 * The means by which directories are emulated may make working with them slow.
0054 * Rename operations may be very slow and, on failure, leave the store in an unknown state.
0055 * Seeking within a file may require new HTTP calls, hurting performance.
0056
0057 How does this affect Spark?
0058
0059 1. Reading and writing data can be significantly slower than working with a normal filesystem.
0060 1. Some directory structures may be very inefficient to scan during query split calculation.
0061 1. The output of work may not be immediately visible to a follow-on query.
0062 1. The rename-based algorithm by which Spark normally commits work when saving an RDD, DataFrame or Dataset
0063 is potentially both slow and unreliable.
0064
0065 For these reasons, it is not always safe to use an object store as a direct destination of queries, or as
0066 an intermediate store in a chain of queries. Consult the documentation of the object store and its
0067 connector to determine which uses are considered safe.
0068
0069 In particular: *without some form of consistency layer, Amazon S3 cannot
0070 be safely used as the direct destination of work with the normal rename-based committer.*
0071
0072 ### Installation
0073
0074 With the relevant libraries on the classpath and Spark configured with valid credentials,
0075 objects can be read or written by using their URLs as the path to data.
0076 For example `sparkContext.textFile("s3a://landsat-pds/scene_list.gz")` will create
0077 an RDD of the file `scene_list.gz` stored in S3, using the s3a connector.
0078
0079 To add the relevant libraries to an application's classpath, include the `hadoop-cloud`
0080 module and its dependencies.
0081
0082 In Maven, add the following to the `pom.xml` file, assuming `spark.version`
0083 is set to the chosen version of Spark:
0084
0085 {% highlight xml %}
0086 <dependencyManagement>
0087 ...
0088 <dependency>
0089 <groupId>org.apache.spark</groupId>
0090 <artifactId>hadoop-cloud_{{site.SCALA_BINARY_VERSION}}</artifactId>
0091 <version>${spark.version}</version>
0092 <scope>provided</scope>
0093 </dependency>
0094 ...
0095 </dependencyManagement>
0096 {% endhighlight %}
0097
0098 Commercial products based on Apache Spark generally directly set up the classpath
0099 for talking to cloud infrastructures, in which case this module may not be needed.
0100
0101 ### Authenticating
0102
0103 Spark jobs must authenticate with the object stores to access data within them.
0104
0105 1. When Spark is running in a cloud infrastructure, the credentials are usually automatically set up.
0106 1. `spark-submit` reads the `AWS_ACCESS_KEY`, `AWS_SECRET_KEY`
0107 and `AWS_SESSION_TOKEN` environment variables and sets the associated authentication options
0108 for the `s3n` and `s3a` connectors to Amazon S3.
0109 1. In a Hadoop cluster, settings may be set in the `core-site.xml` file.
0110 1. Authentication details may be manually added to the Spark configuration in `spark-defaults.conf`
0111 1. Alternatively, they can be programmatically set in the `SparkConf` instance used to configure
0112 the application's `SparkContext`.
0113
0114 *Important: never check authentication secrets into source code repositories,
0115 especially public ones*
0116
0117 Consult [the Hadoop documentation](https://hadoop.apache.org/docs/current/) for the relevant
0118 configuration and security options.
0119
0120 ## Configuring
0121
0122 Each cloud connector has its own set of configuration parameters, again,
0123 consult the relevant documentation.
0124
0125 ### Recommended settings for writing to object stores
0126
0127 For object stores whose consistency model means that rename-based commits are safe
0128 use the `FileOutputCommitter` v2 algorithm for performance; v1 for safety.
0129
0130 ```
0131 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
0132 ```
0133
0134 This does less renaming at the end of a job than the "version 1" algorithm.
0135 As it still uses `rename()` to commit files, it is unsafe to use
0136 when the object store does not have consistent metadata/listings.
0137
0138 The committer can also be set to ignore failures when cleaning up temporary
0139 files; this reduces the risk that a transient network problem is escalated into a
0140 job failure:
0141
0142 ```
0143 spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true
0144 ```
0145
0146 The original v1 commit algorithm renames the output of successful tasks
0147 to a job attempt directory, and then renames all the files in that directory
0148 into the final destination during the job commit phase:
0149
0150 ```
0151 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1
0152 ```
0153
0154 The slow performance of mimicked renames on Amazon S3 makes this algorithm
0155 very, very slow. The recommended solution to this is switch to an S3 "Zero Rename"
0156 committer (see below).
0157
0158 For reference, here are the performance and safety characteristics of
0159 different stores and connectors when renaming directories:
0160
0161 | Store | Connector | Directory Rename Safety | Rename Performance |
0162 |---------------|-----------|-------------------------|--------------------|
0163 | Amazon S3 | s3a | Unsafe | O(data) |
0164 | Azure Storage | wasb | Safe | O(files) |
0165 | Azure Datalake Gen 2 | abfs | Safe | O(1) |
0166 | Google GCS | gs | Safe | O(1) |
0167
0168 As storing temporary files can run up charges; delete
0169 directories called `"_temporary"` on a regular basis.
0170
0171 ### Parquet I/O Settings
0172
0173 For optimal performance when working with Parquet data use the following settings:
0174
0175 ```
0176 spark.hadoop.parquet.enable.summary-metadata false
0177 spark.sql.parquet.mergeSchema false
0178 spark.sql.parquet.filterPushdown true
0179 spark.sql.hive.metastorePartitionPruning true
0180 ```
0181
0182 These minimise the amount of data read during queries.
0183
0184 ### ORC I/O Settings
0185
0186 For best performance when working with ORC data, use these settings:
0187
0188 ```
0189 spark.sql.orc.filterPushdown true
0190 spark.sql.orc.splits.include.file.footer true
0191 spark.sql.orc.cache.stripe.details.size 10000
0192 spark.sql.hive.metastorePartitionPruning true
0193 ```
0194
0195 Again, these minimise the amount of data read during queries.
0196
0197 ## Spark Streaming and Object Storage
0198
0199 Spark Streaming can monitor files added to object stores, by
0200 creating a `FileInputDStream` to monitor a path in the store through a call to
0201 `StreamingContext.textFileStream()`.
0202
0203 1. The time to scan for new files is proportional to the number of files
0204 under the path, not the number of *new* files, so it can become a slow operation.
0205 The size of the window needs to be set to handle this.
0206
0207 1. Files only appear in an object store once they are completely written; there
0208 is no need for a workflow of write-then-rename to ensure that files aren't picked up
0209 while they are still being written. Applications can write straight to the monitored directory.
0210
0211 1. Streams should only be checkpointed to a store implementing a fast and
0212 atomic `rename()` operation.
0213 Otherwise the checkpointing may be slow and potentially unreliable.
0214
0215 ## Committing work into cloud storage safely and fast.
0216
0217 As covered earlier, commit-by-rename is dangerous on any object store which
0218 exhibits eventual consistency (example: S3), and often slower than classic
0219 filesystem renames.
0220
0221 Some object store connectors provide custom committers to commit tasks and
0222 jobs without using rename. In versions of Spark built with Hadoop 3.1 or later,
0223 the S3A connector for AWS S3 is such a committer.
0224
0225 Instead of writing data to a temporary directory on the store for renaming,
0226 these committers write the files to the final destination, but do not issue
0227 the final POST command to make a large "multi-part" upload visible. Those
0228 operations are postponed until the job commit itself. As a result, task and
0229 job commit are much faster, and task failures do not affect the result.
0230
0231 To switch to the S3A committers, use a version of Spark was built with Hadoop
0232 3.1 or later, and switch the committers through the following options.
0233
0234 ```
0235 spark.hadoop.fs.s3a.committer.name directory
0236 spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
0237 spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
0238 ```
0239
0240 It has been tested with the most common formats supported by Spark.
0241
0242 ```python
0243 mydataframe.write.format("parquet").save("s3a://bucket/destination")
0244 ```
0245
0246 More details on these committers can be found in the latest Hadoop documentation.
0247
0248 ## Further Reading
0249
0250 Here is the documentation on the standard connectors both from Apache and the cloud providers.
0251
0252 * [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
0253 * [Azure Blob Storage and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
0254 * [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
0255 * [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
0256 * [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
0257 * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon
0258 * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector). From Google
0259 * [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver)
0260 * IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage), [how-to-use-connector](https://developer.ibm.com/code/2018/08/16/installing-running-stocator-apache-spark-ibm-cloud-object-storage). From IBM
0261