Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import sys
0019 
0020 if sys.version >= '3':
0021     basestring = unicode = str
0022 
0023 from py4j.java_gateway import JavaClass
0024 
0025 from pyspark import RDD, since
0026 from pyspark.rdd import ignore_unicode_prefix
0027 from pyspark.sql.column import _to_seq
0028 from pyspark.sql.types import *
0029 from pyspark.sql import utils
0030 from pyspark.sql.utils import to_str
0031 
0032 __all__ = ["DataFrameReader", "DataFrameWriter"]
0033 
0034 
0035 class OptionUtils(object):
0036 
0037     def _set_opts(self, schema=None, **options):
0038         """
0039         Set named options (filter out those the value is None)
0040         """
0041         if schema is not None:
0042             self.schema(schema)
0043         for k, v in options.items():
0044             if v is not None:
0045                 self.option(k, v)
0046 
0047 
0048 class DataFrameReader(OptionUtils):
0049     """
0050     Interface used to load a :class:`DataFrame` from external storage systems
0051     (e.g. file systems, key-value stores, etc). Use :attr:`SparkSession.read`
0052     to access this.
0053 
0054     .. versionadded:: 1.4
0055     """
0056 
0057     def __init__(self, spark):
0058         self._jreader = spark._ssql_ctx.read()
0059         self._spark = spark
0060 
0061     def _df(self, jdf):
0062         from pyspark.sql.dataframe import DataFrame
0063         return DataFrame(jdf, self._spark)
0064 
0065     @since(1.4)
0066     def format(self, source):
0067         """Specifies the input data source format.
0068 
0069         :param source: string, name of the data source, e.g. 'json', 'parquet'.
0070 
0071         >>> df = spark.read.format('json').load('python/test_support/sql/people.json')
0072         >>> df.dtypes
0073         [('age', 'bigint'), ('name', 'string')]
0074 
0075         """
0076         self._jreader = self._jreader.format(source)
0077         return self
0078 
0079     @since(1.4)
0080     def schema(self, schema):
0081         """Specifies the input schema.
0082 
0083         Some data sources (e.g. JSON) can infer the input schema automatically from data.
0084         By specifying the schema here, the underlying data source can skip the schema
0085         inference step, and thus speed up data loading.
0086 
0087         :param schema: a :class:`pyspark.sql.types.StructType` object or a DDL-formatted string
0088                        (For example ``col0 INT, col1 DOUBLE``).
0089 
0090         >>> s = spark.read.schema("col0 INT, col1 DOUBLE")
0091         """
0092         from pyspark.sql import SparkSession
0093         spark = SparkSession.builder.getOrCreate()
0094         if isinstance(schema, StructType):
0095             jschema = spark._jsparkSession.parseDataType(schema.json())
0096             self._jreader = self._jreader.schema(jschema)
0097         elif isinstance(schema, basestring):
0098             self._jreader = self._jreader.schema(schema)
0099         else:
0100             raise TypeError("schema should be StructType or string")
0101         return self
0102 
0103     @since(1.5)
0104     def option(self, key, value):
0105         """Adds an input option for the underlying data source.
0106 
0107         You can set the following option(s) for reading files:
0108             * ``timeZone``: sets the string that indicates a time zone ID to be used to parse
0109                 timestamps in the JSON/CSV datasources or partition values. The following
0110                 formats of `timeZone` are supported:
0111 
0112                 * Region-based zone ID: It should have the form 'area/city', such as \
0113                   'America/Los_Angeles'.
0114                 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0115                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0116 
0117                 Other short names like 'CST' are not recommended to use because they can be
0118                 ambiguous. If it isn't set, the current value of the SQL config
0119                 ``spark.sql.session.timeZone`` is used by default.
0120             * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
0121                 the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
0122                 It does not change the behavior of partition discovery.
0123         """
0124         self._jreader = self._jreader.option(key, to_str(value))
0125         return self
0126 
0127     @since(1.4)
0128     def options(self, **options):
0129         """Adds input options for the underlying data source.
0130 
0131         You can set the following option(s) for reading files:
0132             * ``timeZone``: sets the string that indicates a time zone ID to be used to parse
0133                 timestamps in the JSON/CSV datasources or partition values. The following
0134                 formats of `timeZone` are supported:
0135 
0136                 * Region-based zone ID: It should have the form 'area/city', such as \
0137                   'America/Los_Angeles'.
0138                 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0139                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0140 
0141                 Other short names like 'CST' are not recommended to use because they can be
0142                 ambiguous. If it isn't set, the current value of the SQL config
0143                 ``spark.sql.session.timeZone`` is used by default.
0144             * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
0145                 the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
0146                 It does not change the behavior of partition discovery.
0147         """
0148         for k in options:
0149             self._jreader = self._jreader.option(k, to_str(options[k]))
0150         return self
0151 
0152     @since(1.4)
0153     def load(self, path=None, format=None, schema=None, **options):
0154         """Loads data from a data source and returns it as a :class:`DataFrame`.
0155 
0156         :param path: optional string or a list of string for file-system backed data sources.
0157         :param format: optional string for format of the data source. Default to 'parquet'.
0158         :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema
0159                        or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0160         :param options: all other string options
0161 
0162         >>> df = spark.read.format("parquet").load('python/test_support/sql/parquet_partitioned',
0163         ...     opt1=True, opt2=1, opt3='str')
0164         >>> df.dtypes
0165         [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
0166 
0167         >>> df = spark.read.format('json').load(['python/test_support/sql/people.json',
0168         ...     'python/test_support/sql/people1.json'])
0169         >>> df.dtypes
0170         [('age', 'bigint'), ('aka', 'string'), ('name', 'string')]
0171         """
0172         if format is not None:
0173             self.format(format)
0174         if schema is not None:
0175             self.schema(schema)
0176         self.options(**options)
0177         if isinstance(path, basestring):
0178             return self._df(self._jreader.load(path))
0179         elif path is not None:
0180             if type(path) != list:
0181                 path = [path]
0182             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
0183         else:
0184             return self._df(self._jreader.load())
0185 
0186     @since(1.4)
0187     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
0188              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
0189              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
0190              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
0191              multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
0192              dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None,
0193              recursiveFileLookup=None):
0194         """
0195         Loads JSON files and returns the results as a :class:`DataFrame`.
0196 
0197         `JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
0198         For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
0199 
0200         If the ``schema`` parameter is not specified, this function goes
0201         through the input once to determine the input schema.
0202 
0203         :param path: string represents path to the JSON dataset, or a list of paths,
0204                      or RDD of Strings storing JSON objects.
0205         :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema or
0206                        a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0207         :param primitivesAsString: infers all primitive values as a string type. If None is set,
0208                                    it uses the default value, ``false``.
0209         :param prefersDecimal: infers all floating-point values as a decimal type. If the values
0210                                do not fit in decimal, then it infers them as doubles. If None is
0211                                set, it uses the default value, ``false``.
0212         :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
0213                               it uses the default value, ``false``.
0214         :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
0215                                         it uses the default value, ``false``.
0216         :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
0217                                         set, it uses the default value, ``true``.
0218         :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
0219                                         set, it uses the default value, ``false``.
0220         :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
0221                                                    using backslash quoting mechanism. If None is
0222                                                    set, it uses the default value, ``false``.
0223         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
0224                      set, it uses the default value, ``PERMISSIVE``.
0225 
0226                 * ``PERMISSIVE``: when it meets a corrupted record, puts the malformed string \
0227                   into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
0228                   fields to ``null``. To keep corrupt records, an user can set a string type \
0229                   field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
0230                   schema does not have the field, it drops corrupt records during parsing. \
0231                   When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
0232                   field in an output schema.
0233                 *  ``DROPMALFORMED``: ignores the whole corrupted records.
0234                 *  ``FAILFAST``: throws an exception when it meets corrupted records.
0235 
0236         :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
0237                                           created by ``PERMISSIVE`` mode. This overrides
0238                                           ``spark.sql.columnNameOfCorruptRecord``. If None is set,
0239                                           it uses the value specified in
0240                                           ``spark.sql.columnNameOfCorruptRecord``.
0241         :param dateFormat: sets the string that indicates a date format. Custom date formats
0242                            follow the formats at `datetime pattern`_.
0243                            This applies to date type. If None is set, it uses the
0244                            default value, ``yyyy-MM-dd``.
0245         :param timestampFormat: sets the string that indicates a timestamp format.
0246                                 Custom date formats follow the formats at `datetime pattern`_.
0247                                 This applies to timestamp type. If None is set, it uses the
0248                                 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0249         :param multiLine: parse one record, which may span multiple lines, per file. If None is
0250                           set, it uses the default value, ``false``.
0251         :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
0252                                           characters (ASCII characters with value less than 32,
0253                                           including tab and line feed characters) or not.
0254         :param encoding: allows to forcibly set one of standard basic or extended encoding for
0255                          the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
0256                          the encoding of input JSON will be detected automatically
0257                          when the multiLine option is set to ``true``.
0258         :param lineSep: defines the line separator that should be used for parsing. If None is
0259                         set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0260         :param samplingRatio: defines fraction of input JSON objects used for schema inferring.
0261                               If None is set, it uses the default value, ``1.0``.
0262         :param dropFieldIfAllNull: whether to ignore column of all null values or empty
0263                                    array/struct during schema inference. If None is set, it
0264                                    uses the default value, ``false``.
0265         :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
0266                        it uses the default value, ``en-US``. For instance, ``locale`` is used while
0267                        parsing dates and timestamps.
0268         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0269                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0270                                It does not change the behavior of `partition discovery`_.
0271         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0272                                     disables `partition discovery`_.
0273 
0274         .. _partition discovery:
0275           https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
0276         .. _datetime pattern: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
0277 
0278         >>> df1 = spark.read.json('python/test_support/sql/people.json')
0279         >>> df1.dtypes
0280         [('age', 'bigint'), ('name', 'string')]
0281         >>> rdd = sc.textFile('python/test_support/sql/people.json')
0282         >>> df2 = spark.read.json(rdd)
0283         >>> df2.dtypes
0284         [('age', 'bigint'), ('name', 'string')]
0285 
0286         """
0287         self._set_opts(
0288             schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
0289             allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
0290             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
0291             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
0292             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
0293             timestampFormat=timestampFormat, multiLine=multiLine,
0294             allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
0295             samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
0296             locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
0297         if isinstance(path, basestring):
0298             path = [path]
0299         if type(path) == list:
0300             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
0301         elif isinstance(path, RDD):
0302             def func(iterator):
0303                 for x in iterator:
0304                     if not isinstance(x, basestring):
0305                         x = unicode(x)
0306                     if isinstance(x, unicode):
0307                         x = x.encode("utf-8")
0308                     yield x
0309             keyed = path.mapPartitions(func)
0310             keyed._bypass_serializer = True
0311             jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
0312             return self._df(self._jreader.json(jrdd))
0313         else:
0314             raise TypeError("path can be only string, list or RDD")
0315 
0316     @since(1.4)
0317     def table(self, tableName):
0318         """Returns the specified table as a :class:`DataFrame`.
0319 
0320         :param tableName: string, name of the table.
0321 
0322         >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
0323         >>> df.createOrReplaceTempView('tmpTable')
0324         >>> spark.read.table('tmpTable').dtypes
0325         [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
0326         """
0327         return self._df(self._jreader.table(tableName))
0328 
0329     @since(1.4)
0330     def parquet(self, *paths, **options):
0331         """
0332         Loads Parquet files, returning the result as a :class:`DataFrame`.
0333 
0334         :param mergeSchema: sets whether we should merge schemas collected from all
0335                             Parquet part-files. This will override
0336                             ``spark.sql.parquet.mergeSchema``. The default value is specified in
0337                             ``spark.sql.parquet.mergeSchema``.
0338         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0339                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0340                                It does not change the behavior of `partition discovery`_.
0341         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0342                                     disables `partition discovery`_.
0343 
0344         >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
0345         >>> df.dtypes
0346         [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
0347         """
0348         mergeSchema = options.get('mergeSchema', None)
0349         pathGlobFilter = options.get('pathGlobFilter', None)
0350         recursiveFileLookup = options.get('recursiveFileLookup', None)
0351         self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
0352                        recursiveFileLookup=recursiveFileLookup)
0353         return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
0354 
0355     @ignore_unicode_prefix
0356     @since(1.6)
0357     def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
0358              recursiveFileLookup=None):
0359         """
0360         Loads text files and returns a :class:`DataFrame` whose schema starts with a
0361         string column named "value", and followed by partitioned columns if there
0362         are any.
0363         The text files must be encoded as UTF-8.
0364 
0365         By default, each line in the text file is a new row in the resulting DataFrame.
0366 
0367         :param paths: string, or list of strings, for input path(s).
0368         :param wholetext: if true, read each file from input path(s) as a single row.
0369         :param lineSep: defines the line separator that should be used for parsing. If None is
0370                         set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0371         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0372                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0373                                It does not change the behavior of `partition discovery`_.
0374         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0375                                     disables `partition discovery`_.
0376 
0377         >>> df = spark.read.text('python/test_support/sql/text-test.txt')
0378         >>> df.collect()
0379         [Row(value=u'hello'), Row(value=u'this')]
0380         >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
0381         >>> df.collect()
0382         [Row(value=u'hello\\nthis')]
0383         """
0384         self._set_opts(
0385             wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
0386             recursiveFileLookup=recursiveFileLookup)
0387         if isinstance(paths, basestring):
0388             paths = [paths]
0389         return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
0390 
0391     @since(2.0)
0392     def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
0393             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
0394             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
0395             negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
0396             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
0397             columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
0398             samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
0399             pathGlobFilter=None, recursiveFileLookup=None):
0400         r"""Loads a CSV file and returns the result as a  :class:`DataFrame`.
0401 
0402         This function will go through the input once to determine the input schema if
0403         ``inferSchema`` is enabled. To avoid going through the entire data once, disable
0404         ``inferSchema`` option or specify the schema explicitly using ``schema``.
0405 
0406         :param path: string, or list of strings, for input path(s),
0407                      or RDD of Strings storing CSV rows.
0408         :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
0409                        or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0410         :param sep: sets a separator (one or more characters) for each field and value. If None is
0411                     set, it uses the default value, ``,``.
0412         :param encoding: decodes the CSV files by the given encoding type. If None is set,
0413                          it uses the default value, ``UTF-8``.
0414         :param quote: sets a single character used for escaping quoted values where the
0415                       separator can be part of the value. If None is set, it uses the default
0416                       value, ``"``. If you would like to turn off quotations, you need to set an
0417                       empty string.
0418         :param escape: sets a single character used for escaping quotes inside an already
0419                        quoted value. If None is set, it uses the default value, ``\``.
0420         :param comment: sets a single character used for skipping lines beginning with this
0421                         character. By default (None), it is disabled.
0422         :param header: uses the first line as names of columns. If None is set, it uses the
0423                        default value, ``false``.
0424         :param inferSchema: infers the input schema automatically from data. It requires one extra
0425                        pass over the data. If None is set, it uses the default value, ``false``.
0426         :param enforceSchema: If it is set to ``true``, the specified or inferred schema will be
0427                               forcibly applied to datasource files, and headers in CSV files will be
0428                               ignored. If the option is set to ``false``, the schema will be
0429                               validated against all headers in CSV files or the first header in RDD
0430                               if the ``header`` option is set to ``true``. Field names in the schema
0431                               and column names in CSV headers are checked by their positions
0432                               taking into account ``spark.sql.caseSensitive``. If None is set,
0433                               ``true`` is used by default. Though the default value is ``true``,
0434                               it is recommended to disable the ``enforceSchema`` option
0435                               to avoid incorrect results.
0436         :param ignoreLeadingWhiteSpace: A flag indicating whether or not leading whitespaces from
0437                                         values being read should be skipped. If None is set, it
0438                                         uses the default value, ``false``.
0439         :param ignoreTrailingWhiteSpace: A flag indicating whether or not trailing whitespaces from
0440                                          values being read should be skipped. If None is set, it
0441                                          uses the default value, ``false``.
0442         :param nullValue: sets the string representation of a null value. If None is set, it uses
0443                           the default value, empty string. Since 2.0.1, this ``nullValue`` param
0444                           applies to all supported types including the string type.
0445         :param nanValue: sets the string representation of a non-number value. If None is set, it
0446                          uses the default value, ``NaN``.
0447         :param positiveInf: sets the string representation of a positive infinity value. If None
0448                             is set, it uses the default value, ``Inf``.
0449         :param negativeInf: sets the string representation of a negative infinity value. If None
0450                             is set, it uses the default value, ``Inf``.
0451         :param dateFormat: sets the string that indicates a date format. Custom date formats
0452                            follow the formats at `datetime pattern`_.
0453                            This applies to date type. If None is set, it uses the
0454                            default value, ``yyyy-MM-dd``.
0455         :param timestampFormat: sets the string that indicates a timestamp format.
0456                                 Custom date formats follow the formats at `datetime pattern`_.
0457                                 This applies to timestamp type. If None is set, it uses the
0458                                 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0459         :param maxColumns: defines a hard limit of how many columns a record can have. If None is
0460                            set, it uses the default value, ``20480``.
0461         :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
0462                                   value being read. If None is set, it uses the default value,
0463                                   ``-1`` meaning unlimited length.
0464         :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0.
0465                                             If specified, it is ignored.
0466         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
0467                      set, it uses the default value, ``PERMISSIVE``. Note that Spark tries to
0468                      parse only required columns in CSV under column pruning. Therefore, corrupt
0469                      records can be different based on required set of fields. This behavior can
0470                      be controlled by ``spark.sql.csv.parser.columnPruning.enabled``
0471                      (enabled by default).
0472 
0473                 * ``PERMISSIVE``: when it meets a corrupted record, puts the malformed string \
0474                   into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
0475                   fields to ``null``. To keep corrupt records, an user can set a string type \
0476                   field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
0477                   schema does not have the field, it drops corrupt records during parsing. \
0478                   A record with less/more tokens than schema is not a corrupted record to CSV. \
0479                   When it meets a record having fewer tokens than the length of the schema, \
0480                   sets ``null`` to extra fields. When the record has more tokens than the \
0481                   length of the schema, it drops extra tokens.
0482                 * ``DROPMALFORMED``: ignores the whole corrupted records.
0483                 * ``FAILFAST``: throws an exception when it meets corrupted records.
0484 
0485         :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
0486                                           created by ``PERMISSIVE`` mode. This overrides
0487                                           ``spark.sql.columnNameOfCorruptRecord``. If None is set,
0488                                           it uses the value specified in
0489                                           ``spark.sql.columnNameOfCorruptRecord``.
0490         :param multiLine: parse records, which may span multiple lines. If None is
0491                           set, it uses the default value, ``false``.
0492         :param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for
0493                                           the quote character. If None is set, the default value is
0494                                           escape character when escape and quote characters are
0495                                           different, ``\0`` otherwise.
0496         :param samplingRatio: defines fraction of rows used for schema inferring.
0497                               If None is set, it uses the default value, ``1.0``.
0498         :param emptyValue: sets the string representation of an empty value. If None is set, it uses
0499                            the default value, empty string.
0500         :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
0501                        it uses the default value, ``en-US``. For instance, ``locale`` is used while
0502                        parsing dates and timestamps.
0503         :param lineSep: defines the line separator that should be used for parsing. If None is
0504                         set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0505                         Maximum length is 1 character.
0506         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0507                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0508                                It does not change the behavior of `partition discovery`_.
0509         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0510                                     disables `partition discovery`_.
0511 
0512         >>> df = spark.read.csv('python/test_support/sql/ages.csv')
0513         >>> df.dtypes
0514         [('_c0', 'string'), ('_c1', 'string')]
0515         >>> rdd = sc.textFile('python/test_support/sql/ages.csv')
0516         >>> df2 = spark.read.csv(rdd)
0517         >>> df2.dtypes
0518         [('_c0', 'string'), ('_c1', 'string')]
0519         """
0520         self._set_opts(
0521             schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
0522             header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
0523             ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
0524             nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
0525             dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
0526             maxCharsPerColumn=maxCharsPerColumn,
0527             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
0528             columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
0529             charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
0530             enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
0531             pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
0532         if isinstance(path, basestring):
0533             path = [path]
0534         if type(path) == list:
0535             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
0536         elif isinstance(path, RDD):
0537             def func(iterator):
0538                 for x in iterator:
0539                     if not isinstance(x, basestring):
0540                         x = unicode(x)
0541                     if isinstance(x, unicode):
0542                         x = x.encode("utf-8")
0543                     yield x
0544             keyed = path.mapPartitions(func)
0545             keyed._bypass_serializer = True
0546             jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
0547             # see SPARK-22112
0548             # There aren't any jvm api for creating a dataframe from rdd storing csv.
0549             # We can do it through creating a jvm dataset firstly and using the jvm api
0550             # for creating a dataframe from dataset storing csv.
0551             jdataset = self._spark._ssql_ctx.createDataset(
0552                 jrdd.rdd(),
0553                 self._spark._jvm.Encoders.STRING())
0554             return self._df(self._jreader.csv(jdataset))
0555         else:
0556             raise TypeError("path can be only string, list or RDD")
0557 
0558     @since(1.5)
0559     def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
0560         """Loads ORC files, returning the result as a :class:`DataFrame`.
0561 
0562         :param mergeSchema: sets whether we should merge schemas collected from all
0563                             ORC part-files. This will override ``spark.sql.orc.mergeSchema``.
0564                             The default value is specified in ``spark.sql.orc.mergeSchema``.
0565         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0566                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0567                                It does not change the behavior of `partition discovery`_.
0568         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0569                                     disables `partition discovery`_.
0570 
0571         >>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
0572         >>> df.dtypes
0573         [('a', 'bigint'), ('b', 'int'), ('c', 'int')]
0574         """
0575         self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
0576                        recursiveFileLookup=recursiveFileLookup)
0577         if isinstance(path, basestring):
0578             path = [path]
0579         return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))
0580 
0581     @since(1.4)
0582     def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
0583              predicates=None, properties=None):
0584         """
0585         Construct a :class:`DataFrame` representing the database table named ``table``
0586         accessible via JDBC URL ``url`` and connection ``properties``.
0587 
0588         Partitions of the table will be retrieved in parallel if either ``column`` or
0589         ``predicates`` is specified. ``lowerBound`, ``upperBound`` and ``numPartitions``
0590         is needed when ``column`` is specified.
0591 
0592         If both ``column`` and ``predicates`` are specified, ``column`` will be used.
0593 
0594         .. note:: Don't create too many partitions in parallel on a large cluster;
0595             otherwise Spark might crash your external database systems.
0596 
0597         :param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
0598         :param table: the name of the table
0599         :param column: the name of a column of numeric, date, or timestamp type
0600                        that will be used for partitioning;
0601                        if this parameter is specified, then ``numPartitions``, ``lowerBound``
0602                        (inclusive), and ``upperBound`` (exclusive) will form partition strides
0603                        for generated WHERE clause expressions used to split the column
0604                        ``column`` evenly
0605         :param lowerBound: the minimum value of ``column`` used to decide partition stride
0606         :param upperBound: the maximum value of ``column`` used to decide partition stride
0607         :param numPartitions: the number of partitions
0608         :param predicates: a list of expressions suitable for inclusion in WHERE clauses;
0609                            each one defines one partition of the :class:`DataFrame`
0610         :param properties: a dictionary of JDBC database connection arguments. Normally at
0611                            least properties "user" and "password" with their corresponding values.
0612                            For example { 'user' : 'SYSTEM', 'password' : 'mypassword' }
0613         :return: a DataFrame
0614         """
0615         if properties is None:
0616             properties = dict()
0617         jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
0618         for k in properties:
0619             jprop.setProperty(k, properties[k])
0620         if column is not None:
0621             assert lowerBound is not None, "lowerBound can not be None when ``column`` is specified"
0622             assert upperBound is not None, "upperBound can not be None when ``column`` is specified"
0623             assert numPartitions is not None, \
0624                 "numPartitions can not be None when ``column`` is specified"
0625             return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound),
0626                                                int(numPartitions), jprop))
0627         if predicates is not None:
0628             gateway = self._spark._sc._gateway
0629             jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
0630             return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
0631         return self._df(self._jreader.jdbc(url, table, jprop))
0632 
0633 
0634 class DataFrameWriter(OptionUtils):
0635     """
0636     Interface used to write a :class:`DataFrame` to external storage systems
0637     (e.g. file systems, key-value stores, etc). Use :attr:`DataFrame.write`
0638     to access this.
0639 
0640     .. versionadded:: 1.4
0641     """
0642     def __init__(self, df):
0643         self._df = df
0644         self._spark = df.sql_ctx
0645         self._jwrite = df._jdf.write()
0646 
0647     def _sq(self, jsq):
0648         from pyspark.sql.streaming import StreamingQuery
0649         return StreamingQuery(jsq)
0650 
0651     @since(1.4)
0652     def mode(self, saveMode):
0653         """Specifies the behavior when data or table already exists.
0654 
0655         Options include:
0656 
0657         * `append`: Append contents of this :class:`DataFrame` to existing data.
0658         * `overwrite`: Overwrite existing data.
0659         * `error` or `errorifexists`: Throw an exception if data already exists.
0660         * `ignore`: Silently ignore this operation if data already exists.
0661 
0662         >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
0663         """
0664         # At the JVM side, the default value of mode is already set to "error".
0665         # So, if the given saveMode is None, we will not call JVM-side's mode method.
0666         if saveMode is not None:
0667             self._jwrite = self._jwrite.mode(saveMode)
0668         return self
0669 
0670     @since(1.4)
0671     def format(self, source):
0672         """Specifies the underlying output data source.
0673 
0674         :param source: string, name of the data source, e.g. 'json', 'parquet'.
0675 
0676         >>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
0677         """
0678         self._jwrite = self._jwrite.format(source)
0679         return self
0680 
0681     @since(1.5)
0682     def option(self, key, value):
0683         """Adds an output option for the underlying data source.
0684 
0685         You can set the following option(s) for writing files:
0686             * ``timeZone``: sets the string that indicates a time zone ID to be used to format
0687                 timestamps in the JSON/CSV datasources or partition values. The following
0688                 formats of `timeZone` are supported:
0689 
0690                 * Region-based zone ID: It should have the form 'area/city', such as \
0691                   'America/Los_Angeles'.
0692                 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0693                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0694 
0695                 Other short names like 'CST' are not recommended to use because they can be
0696                 ambiguous. If it isn't set, the current value of the SQL config
0697                 ``spark.sql.session.timeZone`` is used by default.
0698         """
0699         self._jwrite = self._jwrite.option(key, to_str(value))
0700         return self
0701 
0702     @since(1.4)
0703     def options(self, **options):
0704         """Adds output options for the underlying data source.
0705 
0706         You can set the following option(s) for writing files:
0707             * ``timeZone``: sets the string that indicates a time zone ID to be used to format
0708                 timestamps in the JSON/CSV datasources or partition values. The following
0709                 formats of `timeZone` are supported:
0710 
0711                 * Region-based zone ID: It should have the form 'area/city', such as \
0712                   'America/Los_Angeles'.
0713                 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0714                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0715 
0716                 Other short names like 'CST' are not recommended to use because they can be
0717                 ambiguous. If it isn't set, the current value of the SQL config
0718                 ``spark.sql.session.timeZone`` is used by default.
0719         """
0720         for k in options:
0721             self._jwrite = self._jwrite.option(k, to_str(options[k]))
0722         return self
0723 
0724     @since(1.4)
0725     def partitionBy(self, *cols):
0726         """Partitions the output by the given columns on the file system.
0727 
0728         If specified, the output is laid out on the file system similar
0729         to Hive's partitioning scheme.
0730 
0731         :param cols: name of columns
0732 
0733         >>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
0734         """
0735         if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
0736             cols = cols[0]
0737         self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
0738         return self
0739 
0740     @since(2.3)
0741     def bucketBy(self, numBuckets, col, *cols):
0742         """Buckets the output by the given columns.If specified,
0743         the output is laid out on the file system similar to Hive's bucketing scheme.
0744 
0745         :param numBuckets: the number of buckets to save
0746         :param col: a name of a column, or a list of names.
0747         :param cols: additional names (optional). If `col` is a list it should be empty.
0748 
0749         .. note:: Applicable for file-based data sources in combination with
0750                   :py:meth:`DataFrameWriter.saveAsTable`.
0751 
0752         >>> (df.write.format('parquet')  # doctest: +SKIP
0753         ...     .bucketBy(100, 'year', 'month')
0754         ...     .mode("overwrite")
0755         ...     .saveAsTable('bucketed_table'))
0756         """
0757         if not isinstance(numBuckets, int):
0758             raise TypeError("numBuckets should be an int, got {0}.".format(type(numBuckets)))
0759 
0760         if isinstance(col, (list, tuple)):
0761             if cols:
0762                 raise ValueError("col is a {0} but cols are not empty".format(type(col)))
0763 
0764             col, cols = col[0], col[1:]
0765 
0766         if not all(isinstance(c, basestring) for c in cols) or not(isinstance(col, basestring)):
0767             raise TypeError("all names should be `str`")
0768 
0769         self._jwrite = self._jwrite.bucketBy(numBuckets, col, _to_seq(self._spark._sc, cols))
0770         return self
0771 
0772     @since(2.3)
0773     def sortBy(self, col, *cols):
0774         """Sorts the output in each bucket by the given columns on the file system.
0775 
0776         :param col: a name of a column, or a list of names.
0777         :param cols: additional names (optional). If `col` is a list it should be empty.
0778 
0779         >>> (df.write.format('parquet')  # doctest: +SKIP
0780         ...     .bucketBy(100, 'year', 'month')
0781         ...     .sortBy('day')
0782         ...     .mode("overwrite")
0783         ...     .saveAsTable('sorted_bucketed_table'))
0784         """
0785         if isinstance(col, (list, tuple)):
0786             if cols:
0787                 raise ValueError("col is a {0} but cols are not empty".format(type(col)))
0788 
0789             col, cols = col[0], col[1:]
0790 
0791         if not all(isinstance(c, basestring) for c in cols) or not(isinstance(col, basestring)):
0792             raise TypeError("all names should be `str`")
0793 
0794         self._jwrite = self._jwrite.sortBy(col, _to_seq(self._spark._sc, cols))
0795         return self
0796 
0797     @since(1.4)
0798     def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
0799         """Saves the contents of the :class:`DataFrame` to a data source.
0800 
0801         The data source is specified by the ``format`` and a set of ``options``.
0802         If ``format`` is not specified, the default data source configured by
0803         ``spark.sql.sources.default`` will be used.
0804 
0805         :param path: the path in a Hadoop supported file system
0806         :param format: the format used to save
0807         :param mode: specifies the behavior of the save operation when data already exists.
0808 
0809             * ``append``: Append contents of this :class:`DataFrame` to existing data.
0810             * ``overwrite``: Overwrite existing data.
0811             * ``ignore``: Silently ignore this operation if data already exists.
0812             * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
0813                 exists.
0814         :param partitionBy: names of partitioning columns
0815         :param options: all other string options
0816 
0817         >>> df.write.mode("append").save(os.path.join(tempfile.mkdtemp(), 'data'))
0818         """
0819         self.mode(mode).options(**options)
0820         if partitionBy is not None:
0821             self.partitionBy(partitionBy)
0822         if format is not None:
0823             self.format(format)
0824         if path is None:
0825             self._jwrite.save()
0826         else:
0827             self._jwrite.save(path)
0828 
0829     @since(1.4)
0830     def insertInto(self, tableName, overwrite=None):
0831         """Inserts the content of the :class:`DataFrame` to the specified table.
0832 
0833         It requires that the schema of the :class:`DataFrame` is the same as the
0834         schema of the table.
0835 
0836         Optionally overwriting any existing data.
0837         """
0838         if overwrite is not None:
0839             self.mode("overwrite" if overwrite else "append")
0840         self._jwrite.insertInto(tableName)
0841 
0842     @since(1.4)
0843     def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options):
0844         """Saves the content of the :class:`DataFrame` as the specified table.
0845 
0846         In the case the table already exists, behavior of this function depends on the
0847         save mode, specified by the `mode` function (default to throwing an exception).
0848         When `mode` is `Overwrite`, the schema of the :class:`DataFrame` does not need to be
0849         the same as that of the existing table.
0850 
0851         * `append`: Append contents of this :class:`DataFrame` to existing data.
0852         * `overwrite`: Overwrite existing data.
0853         * `error` or `errorifexists`: Throw an exception if data already exists.
0854         * `ignore`: Silently ignore this operation if data already exists.
0855 
0856         :param name: the table name
0857         :param format: the format used to save
0858         :param mode: one of `append`, `overwrite`, `error`, `errorifexists`, `ignore` \
0859                      (default: error)
0860         :param partitionBy: names of partitioning columns
0861         :param options: all other string options
0862         """
0863         self.mode(mode).options(**options)
0864         if partitionBy is not None:
0865             self.partitionBy(partitionBy)
0866         if format is not None:
0867             self.format(format)
0868         self._jwrite.saveAsTable(name)
0869 
0870     @since(1.4)
0871     def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
0872              lineSep=None, encoding=None, ignoreNullFields=None):
0873         """Saves the content of the :class:`DataFrame` in JSON format
0874         (`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
0875         specified path.
0876 
0877         :param path: the path in any Hadoop supported file system
0878         :param mode: specifies the behavior of the save operation when data already exists.
0879 
0880             * ``append``: Append contents of this :class:`DataFrame` to existing data.
0881             * ``overwrite``: Overwrite existing data.
0882             * ``ignore``: Silently ignore this operation if data already exists.
0883             * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
0884                 exists.
0885         :param compression: compression codec to use when saving to file. This can be one of the
0886                             known case-insensitive shorten names (none, bzip2, gzip, lz4,
0887                             snappy and deflate).
0888         :param dateFormat: sets the string that indicates a date format. Custom date formats
0889                            follow the formats at `datetime pattern`_.
0890                            This applies to date type. If None is set, it uses the
0891                            default value, ``yyyy-MM-dd``.
0892         :param timestampFormat: sets the string that indicates a timestamp format.
0893                                 Custom date formats follow the formats at `datetime pattern`_.
0894                                 This applies to timestamp type. If None is set, it uses the
0895                                 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0896         :param encoding: specifies encoding (charset) of saved json files. If None is set,
0897                         the default UTF-8 charset will be used.
0898         :param lineSep: defines the line separator that should be used for writing. If None is
0899                         set, it uses the default value, ``\\n``.
0900         :param ignoreNullFields: Whether to ignore null fields when generating JSON objects.
0901                         If None is set, it uses the default value, ``true``.
0902 
0903         >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
0904         """
0905         self.mode(mode)
0906         self._set_opts(
0907             compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
0908             lineSep=lineSep, encoding=encoding, ignoreNullFields=ignoreNullFields)
0909         self._jwrite.json(path)
0910 
0911     @since(1.4)
0912     def parquet(self, path, mode=None, partitionBy=None, compression=None):
0913         """Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
0914 
0915         :param path: the path in any Hadoop supported file system
0916         :param mode: specifies the behavior of the save operation when data already exists.
0917 
0918             * ``append``: Append contents of this :class:`DataFrame` to existing data.
0919             * ``overwrite``: Overwrite existing data.
0920             * ``ignore``: Silently ignore this operation if data already exists.
0921             * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
0922                 exists.
0923         :param partitionBy: names of partitioning columns
0924         :param compression: compression codec to use when saving to file. This can be one of the
0925                             known case-insensitive shorten names (none, uncompressed, snappy, gzip,
0926                             lzo, brotli, lz4, and zstd). This will override
0927                             ``spark.sql.parquet.compression.codec``. If None is set, it uses the
0928                             value specified in ``spark.sql.parquet.compression.codec``.
0929 
0930         >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
0931         """
0932         self.mode(mode)
0933         if partitionBy is not None:
0934             self.partitionBy(partitionBy)
0935         self._set_opts(compression=compression)
0936         self._jwrite.parquet(path)
0937 
0938     @since(1.6)
0939     def text(self, path, compression=None, lineSep=None):
0940         """Saves the content of the DataFrame in a text file at the specified path.
0941         The text files will be encoded as UTF-8.
0942 
0943         :param path: the path in any Hadoop supported file system
0944         :param compression: compression codec to use when saving to file. This can be one of the
0945                             known case-insensitive shorten names (none, bzip2, gzip, lz4,
0946                             snappy and deflate).
0947         :param lineSep: defines the line separator that should be used for writing. If None is
0948                         set, it uses the default value, ``\\n``.
0949 
0950         The DataFrame must have only one column that is of string type.
0951         Each row becomes a new line in the output file.
0952         """
0953         self._set_opts(compression=compression, lineSep=lineSep)
0954         self._jwrite.text(path)
0955 
0956     @since(2.0)
0957     def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
0958             header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
0959             timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
0960             charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None):
0961         r"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
0962 
0963         :param path: the path in any Hadoop supported file system
0964         :param mode: specifies the behavior of the save operation when data already exists.
0965 
0966             * ``append``: Append contents of this :class:`DataFrame` to existing data.
0967             * ``overwrite``: Overwrite existing data.
0968             * ``ignore``: Silently ignore this operation if data already exists.
0969             * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
0970                 exists.
0971 
0972         :param compression: compression codec to use when saving to file. This can be one of the
0973                             known case-insensitive shorten names (none, bzip2, gzip, lz4,
0974                             snappy and deflate).
0975         :param sep: sets a separator (one or more characters) for each field and value. If None is
0976                     set, it uses the default value, ``,``.
0977         :param quote: sets a single character used for escaping quoted values where the
0978                       separator can be part of the value. If None is set, it uses the default
0979                       value, ``"``. If an empty string is set, it uses ``u0000`` (null character).
0980         :param escape: sets a single character used for escaping quotes inside an already
0981                        quoted value. If None is set, it uses the default value, ``\``
0982         :param escapeQuotes: a flag indicating whether values containing quotes should always
0983                              be enclosed in quotes. If None is set, it uses the default value
0984                              ``true``, escaping all values containing a quote character.
0985         :param quoteAll: a flag indicating whether all values should always be enclosed in
0986                           quotes. If None is set, it uses the default value ``false``,
0987                           only escaping values containing a quote character.
0988         :param header: writes the names of columns as the first line. If None is set, it uses
0989                        the default value, ``false``.
0990         :param nullValue: sets the string representation of a null value. If None is set, it uses
0991                           the default value, empty string.
0992         :param dateFormat: sets the string that indicates a date format. Custom date formats follow
0993                            the formats at `datetime pattern`_.
0994                            This applies to date type. If None is set, it uses the
0995                            default value, ``yyyy-MM-dd``.
0996         :param timestampFormat: sets the string that indicates a timestamp format.
0997                                 Custom date formats follow the formats at `datetime pattern`_.
0998                                 This applies to timestamp type. If None is set, it uses the
0999                                 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
1000         :param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from
1001                                         values being written should be skipped. If None is set, it
1002                                         uses the default value, ``true``.
1003         :param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from
1004                                          values being written should be skipped. If None is set, it
1005                                          uses the default value, ``true``.
1006         :param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for
1007                                           the quote character. If None is set, the default value is
1008                                           escape character when escape and quote characters are
1009                                           different, ``\0`` otherwise..
1010         :param encoding: sets the encoding (charset) of saved csv files. If None is set,
1011                          the default UTF-8 charset will be used.
1012         :param emptyValue: sets the string representation of an empty value. If None is set, it uses
1013                            the default value, ``""``.
1014         :param lineSep: defines the line separator that should be used for writing. If None is
1015                         set, it uses the default value, ``\\n``. Maximum length is 1 character.
1016 
1017         >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
1018         """
1019         self.mode(mode)
1020         self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
1021                        nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
1022                        dateFormat=dateFormat, timestampFormat=timestampFormat,
1023                        ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
1024                        ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
1025                        charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
1026                        encoding=encoding, emptyValue=emptyValue, lineSep=lineSep)
1027         self._jwrite.csv(path)
1028 
1029     @since(1.5)
1030     def orc(self, path, mode=None, partitionBy=None, compression=None):
1031         """Saves the content of the :class:`DataFrame` in ORC format at the specified path.
1032 
1033         :param path: the path in any Hadoop supported file system
1034         :param mode: specifies the behavior of the save operation when data already exists.
1035 
1036             * ``append``: Append contents of this :class:`DataFrame` to existing data.
1037             * ``overwrite``: Overwrite existing data.
1038             * ``ignore``: Silently ignore this operation if data already exists.
1039             * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
1040                 exists.
1041         :param partitionBy: names of partitioning columns
1042         :param compression: compression codec to use when saving to file. This can be one of the
1043                             known case-insensitive shorten names (none, snappy, zlib, and lzo).
1044                             This will override ``orc.compress`` and
1045                             ``spark.sql.orc.compression.codec``. If None is set, it uses the value
1046                             specified in ``spark.sql.orc.compression.codec``.
1047 
1048         >>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
1049         >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
1050         """
1051         self.mode(mode)
1052         if partitionBy is not None:
1053             self.partitionBy(partitionBy)
1054         self._set_opts(compression=compression)
1055         self._jwrite.orc(path)
1056 
1057     @since(1.4)
1058     def jdbc(self, url, table, mode=None, properties=None):
1059         """Saves the content of the :class:`DataFrame` to an external database table via JDBC.
1060 
1061         .. note:: Don't create too many partitions in parallel on a large cluster;
1062             otherwise Spark might crash your external database systems.
1063 
1064         :param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
1065         :param table: Name of the table in the external database.
1066         :param mode: specifies the behavior of the save operation when data already exists.
1067 
1068             * ``append``: Append contents of this :class:`DataFrame` to existing data.
1069             * ``overwrite``: Overwrite existing data.
1070             * ``ignore``: Silently ignore this operation if data already exists.
1071             * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
1072                 exists.
1073         :param properties: a dictionary of JDBC database connection arguments. Normally at
1074                            least properties "user" and "password" with their corresponding values.
1075                            For example { 'user' : 'SYSTEM', 'password' : 'mypassword' }
1076         """
1077         if properties is None:
1078             properties = dict()
1079         jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
1080         for k in properties:
1081             jprop.setProperty(k, properties[k])
1082         self.mode(mode)._jwrite.jdbc(url, table, jprop)
1083 
1084 
1085 def _test():
1086     import doctest
1087     import os
1088     import tempfile
1089     import py4j
1090     from pyspark.context import SparkContext
1091     from pyspark.sql import SparkSession, Row
1092     import pyspark.sql.readwriter
1093 
1094     os.chdir(os.environ["SPARK_HOME"])
1095 
1096     globs = pyspark.sql.readwriter.__dict__.copy()
1097     sc = SparkContext('local[4]', 'PythonTest')
1098     try:
1099         spark = SparkSession.builder.getOrCreate()
1100     except py4j.protocol.Py4JError:
1101         spark = SparkSession(sc)
1102 
1103     globs['tempfile'] = tempfile
1104     globs['os'] = os
1105     globs['sc'] = sc
1106     globs['spark'] = spark
1107     globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
1108     (failure_count, test_count) = doctest.testmod(
1109         pyspark.sql.readwriter, globs=globs,
1110         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
1111     sc.stop()
1112     if failure_count:
1113         sys.exit(-1)
1114 
1115 
1116 if __name__ == "__main__":
1117     _test()