0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import sys
0019
0020 if sys.version >= '3':
0021 basestring = unicode = str
0022
0023 from py4j.java_gateway import JavaClass
0024
0025 from pyspark import RDD, since
0026 from pyspark.rdd import ignore_unicode_prefix
0027 from pyspark.sql.column import _to_seq
0028 from pyspark.sql.types import *
0029 from pyspark.sql import utils
0030 from pyspark.sql.utils import to_str
0031
0032 __all__ = ["DataFrameReader", "DataFrameWriter"]
0033
0034
0035 class OptionUtils(object):
0036
0037 def _set_opts(self, schema=None, **options):
0038 """
0039 Set named options (filter out those the value is None)
0040 """
0041 if schema is not None:
0042 self.schema(schema)
0043 for k, v in options.items():
0044 if v is not None:
0045 self.option(k, v)
0046
0047
0048 class DataFrameReader(OptionUtils):
0049 """
0050 Interface used to load a :class:`DataFrame` from external storage systems
0051 (e.g. file systems, key-value stores, etc). Use :attr:`SparkSession.read`
0052 to access this.
0053
0054 .. versionadded:: 1.4
0055 """
0056
0057 def __init__(self, spark):
0058 self._jreader = spark._ssql_ctx.read()
0059 self._spark = spark
0060
0061 def _df(self, jdf):
0062 from pyspark.sql.dataframe import DataFrame
0063 return DataFrame(jdf, self._spark)
0064
0065 @since(1.4)
0066 def format(self, source):
0067 """Specifies the input data source format.
0068
0069 :param source: string, name of the data source, e.g. 'json', 'parquet'.
0070
0071 >>> df = spark.read.format('json').load('python/test_support/sql/people.json')
0072 >>> df.dtypes
0073 [('age', 'bigint'), ('name', 'string')]
0074
0075 """
0076 self._jreader = self._jreader.format(source)
0077 return self
0078
0079 @since(1.4)
0080 def schema(self, schema):
0081 """Specifies the input schema.
0082
0083 Some data sources (e.g. JSON) can infer the input schema automatically from data.
0084 By specifying the schema here, the underlying data source can skip the schema
0085 inference step, and thus speed up data loading.
0086
0087 :param schema: a :class:`pyspark.sql.types.StructType` object or a DDL-formatted string
0088 (For example ``col0 INT, col1 DOUBLE``).
0089
0090 >>> s = spark.read.schema("col0 INT, col1 DOUBLE")
0091 """
0092 from pyspark.sql import SparkSession
0093 spark = SparkSession.builder.getOrCreate()
0094 if isinstance(schema, StructType):
0095 jschema = spark._jsparkSession.parseDataType(schema.json())
0096 self._jreader = self._jreader.schema(jschema)
0097 elif isinstance(schema, basestring):
0098 self._jreader = self._jreader.schema(schema)
0099 else:
0100 raise TypeError("schema should be StructType or string")
0101 return self
0102
0103 @since(1.5)
0104 def option(self, key, value):
0105 """Adds an input option for the underlying data source.
0106
0107 You can set the following option(s) for reading files:
0108 * ``timeZone``: sets the string that indicates a time zone ID to be used to parse
0109 timestamps in the JSON/CSV datasources or partition values. The following
0110 formats of `timeZone` are supported:
0111
0112 * Region-based zone ID: It should have the form 'area/city', such as \
0113 'America/Los_Angeles'.
0114 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0115 '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0116
0117 Other short names like 'CST' are not recommended to use because they can be
0118 ambiguous. If it isn't set, the current value of the SQL config
0119 ``spark.sql.session.timeZone`` is used by default.
0120 * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
0121 the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
0122 It does not change the behavior of partition discovery.
0123 """
0124 self._jreader = self._jreader.option(key, to_str(value))
0125 return self
0126
0127 @since(1.4)
0128 def options(self, **options):
0129 """Adds input options for the underlying data source.
0130
0131 You can set the following option(s) for reading files:
0132 * ``timeZone``: sets the string that indicates a time zone ID to be used to parse
0133 timestamps in the JSON/CSV datasources or partition values. The following
0134 formats of `timeZone` are supported:
0135
0136 * Region-based zone ID: It should have the form 'area/city', such as \
0137 'America/Los_Angeles'.
0138 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0139 '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0140
0141 Other short names like 'CST' are not recommended to use because they can be
0142 ambiguous. If it isn't set, the current value of the SQL config
0143 ``spark.sql.session.timeZone`` is used by default.
0144 * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
0145 the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
0146 It does not change the behavior of partition discovery.
0147 """
0148 for k in options:
0149 self._jreader = self._jreader.option(k, to_str(options[k]))
0150 return self
0151
0152 @since(1.4)
0153 def load(self, path=None, format=None, schema=None, **options):
0154 """Loads data from a data source and returns it as a :class:`DataFrame`.
0155
0156 :param path: optional string or a list of string for file-system backed data sources.
0157 :param format: optional string for format of the data source. Default to 'parquet'.
0158 :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema
0159 or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0160 :param options: all other string options
0161
0162 >>> df = spark.read.format("parquet").load('python/test_support/sql/parquet_partitioned',
0163 ... opt1=True, opt2=1, opt3='str')
0164 >>> df.dtypes
0165 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
0166
0167 >>> df = spark.read.format('json').load(['python/test_support/sql/people.json',
0168 ... 'python/test_support/sql/people1.json'])
0169 >>> df.dtypes
0170 [('age', 'bigint'), ('aka', 'string'), ('name', 'string')]
0171 """
0172 if format is not None:
0173 self.format(format)
0174 if schema is not None:
0175 self.schema(schema)
0176 self.options(**options)
0177 if isinstance(path, basestring):
0178 return self._df(self._jreader.load(path))
0179 elif path is not None:
0180 if type(path) != list:
0181 path = [path]
0182 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
0183 else:
0184 return self._df(self._jreader.load())
0185
0186 @since(1.4)
0187 def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
0188 allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
0189 allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
0190 mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
0191 multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
0192 dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None,
0193 recursiveFileLookup=None):
0194 """
0195 Loads JSON files and returns the results as a :class:`DataFrame`.
0196
0197 `JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
0198 For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
0199
0200 If the ``schema`` parameter is not specified, this function goes
0201 through the input once to determine the input schema.
0202
0203 :param path: string represents path to the JSON dataset, or a list of paths,
0204 or RDD of Strings storing JSON objects.
0205 :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema or
0206 a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0207 :param primitivesAsString: infers all primitive values as a string type. If None is set,
0208 it uses the default value, ``false``.
0209 :param prefersDecimal: infers all floating-point values as a decimal type. If the values
0210 do not fit in decimal, then it infers them as doubles. If None is
0211 set, it uses the default value, ``false``.
0212 :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
0213 it uses the default value, ``false``.
0214 :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
0215 it uses the default value, ``false``.
0216 :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
0217 set, it uses the default value, ``true``.
0218 :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
0219 set, it uses the default value, ``false``.
0220 :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
0221 using backslash quoting mechanism. If None is
0222 set, it uses the default value, ``false``.
0223 :param mode: allows a mode for dealing with corrupt records during parsing. If None is
0224 set, it uses the default value, ``PERMISSIVE``.
0225
0226 * ``PERMISSIVE``: when it meets a corrupted record, puts the malformed string \
0227 into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
0228 fields to ``null``. To keep corrupt records, an user can set a string type \
0229 field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
0230 schema does not have the field, it drops corrupt records during parsing. \
0231 When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
0232 field in an output schema.
0233 * ``DROPMALFORMED``: ignores the whole corrupted records.
0234 * ``FAILFAST``: throws an exception when it meets corrupted records.
0235
0236 :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
0237 created by ``PERMISSIVE`` mode. This overrides
0238 ``spark.sql.columnNameOfCorruptRecord``. If None is set,
0239 it uses the value specified in
0240 ``spark.sql.columnNameOfCorruptRecord``.
0241 :param dateFormat: sets the string that indicates a date format. Custom date formats
0242 follow the formats at `datetime pattern`_.
0243 This applies to date type. If None is set, it uses the
0244 default value, ``yyyy-MM-dd``.
0245 :param timestampFormat: sets the string that indicates a timestamp format.
0246 Custom date formats follow the formats at `datetime pattern`_.
0247 This applies to timestamp type. If None is set, it uses the
0248 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0249 :param multiLine: parse one record, which may span multiple lines, per file. If None is
0250 set, it uses the default value, ``false``.
0251 :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
0252 characters (ASCII characters with value less than 32,
0253 including tab and line feed characters) or not.
0254 :param encoding: allows to forcibly set one of standard basic or extended encoding for
0255 the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
0256 the encoding of input JSON will be detected automatically
0257 when the multiLine option is set to ``true``.
0258 :param lineSep: defines the line separator that should be used for parsing. If None is
0259 set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0260 :param samplingRatio: defines fraction of input JSON objects used for schema inferring.
0261 If None is set, it uses the default value, ``1.0``.
0262 :param dropFieldIfAllNull: whether to ignore column of all null values or empty
0263 array/struct during schema inference. If None is set, it
0264 uses the default value, ``false``.
0265 :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
0266 it uses the default value, ``en-US``. For instance, ``locale`` is used while
0267 parsing dates and timestamps.
0268 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0269 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0270 It does not change the behavior of `partition discovery`_.
0271 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0272 disables `partition discovery`_.
0273
0274 .. _partition discovery:
0275 https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
0276 .. _datetime pattern: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
0277
0278 >>> df1 = spark.read.json('python/test_support/sql/people.json')
0279 >>> df1.dtypes
0280 [('age', 'bigint'), ('name', 'string')]
0281 >>> rdd = sc.textFile('python/test_support/sql/people.json')
0282 >>> df2 = spark.read.json(rdd)
0283 >>> df2.dtypes
0284 [('age', 'bigint'), ('name', 'string')]
0285
0286 """
0287 self._set_opts(
0288 schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
0289 allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
0290 allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
0291 allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
0292 mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
0293 timestampFormat=timestampFormat, multiLine=multiLine,
0294 allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
0295 samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
0296 locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
0297 if isinstance(path, basestring):
0298 path = [path]
0299 if type(path) == list:
0300 return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
0301 elif isinstance(path, RDD):
0302 def func(iterator):
0303 for x in iterator:
0304 if not isinstance(x, basestring):
0305 x = unicode(x)
0306 if isinstance(x, unicode):
0307 x = x.encode("utf-8")
0308 yield x
0309 keyed = path.mapPartitions(func)
0310 keyed._bypass_serializer = True
0311 jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
0312 return self._df(self._jreader.json(jrdd))
0313 else:
0314 raise TypeError("path can be only string, list or RDD")
0315
0316 @since(1.4)
0317 def table(self, tableName):
0318 """Returns the specified table as a :class:`DataFrame`.
0319
0320 :param tableName: string, name of the table.
0321
0322 >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
0323 >>> df.createOrReplaceTempView('tmpTable')
0324 >>> spark.read.table('tmpTable').dtypes
0325 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
0326 """
0327 return self._df(self._jreader.table(tableName))
0328
0329 @since(1.4)
0330 def parquet(self, *paths, **options):
0331 """
0332 Loads Parquet files, returning the result as a :class:`DataFrame`.
0333
0334 :param mergeSchema: sets whether we should merge schemas collected from all
0335 Parquet part-files. This will override
0336 ``spark.sql.parquet.mergeSchema``. The default value is specified in
0337 ``spark.sql.parquet.mergeSchema``.
0338 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0339 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0340 It does not change the behavior of `partition discovery`_.
0341 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0342 disables `partition discovery`_.
0343
0344 >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
0345 >>> df.dtypes
0346 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
0347 """
0348 mergeSchema = options.get('mergeSchema', None)
0349 pathGlobFilter = options.get('pathGlobFilter', None)
0350 recursiveFileLookup = options.get('recursiveFileLookup', None)
0351 self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
0352 recursiveFileLookup=recursiveFileLookup)
0353 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
0354
0355 @ignore_unicode_prefix
0356 @since(1.6)
0357 def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
0358 recursiveFileLookup=None):
0359 """
0360 Loads text files and returns a :class:`DataFrame` whose schema starts with a
0361 string column named "value", and followed by partitioned columns if there
0362 are any.
0363 The text files must be encoded as UTF-8.
0364
0365 By default, each line in the text file is a new row in the resulting DataFrame.
0366
0367 :param paths: string, or list of strings, for input path(s).
0368 :param wholetext: if true, read each file from input path(s) as a single row.
0369 :param lineSep: defines the line separator that should be used for parsing. If None is
0370 set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0371 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0372 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0373 It does not change the behavior of `partition discovery`_.
0374 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0375 disables `partition discovery`_.
0376
0377 >>> df = spark.read.text('python/test_support/sql/text-test.txt')
0378 >>> df.collect()
0379 [Row(value=u'hello'), Row(value=u'this')]
0380 >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
0381 >>> df.collect()
0382 [Row(value=u'hello\\nthis')]
0383 """
0384 self._set_opts(
0385 wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
0386 recursiveFileLookup=recursiveFileLookup)
0387 if isinstance(paths, basestring):
0388 paths = [paths]
0389 return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
0390
0391 @since(2.0)
0392 def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
0393 comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
0394 ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
0395 negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
0396 maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
0397 columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
0398 samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
0399 pathGlobFilter=None, recursiveFileLookup=None):
0400 r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
0401
0402 This function will go through the input once to determine the input schema if
0403 ``inferSchema`` is enabled. To avoid going through the entire data once, disable
0404 ``inferSchema`` option or specify the schema explicitly using ``schema``.
0405
0406 :param path: string, or list of strings, for input path(s),
0407 or RDD of Strings storing CSV rows.
0408 :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
0409 or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0410 :param sep: sets a separator (one or more characters) for each field and value. If None is
0411 set, it uses the default value, ``,``.
0412 :param encoding: decodes the CSV files by the given encoding type. If None is set,
0413 it uses the default value, ``UTF-8``.
0414 :param quote: sets a single character used for escaping quoted values where the
0415 separator can be part of the value. If None is set, it uses the default
0416 value, ``"``. If you would like to turn off quotations, you need to set an
0417 empty string.
0418 :param escape: sets a single character used for escaping quotes inside an already
0419 quoted value. If None is set, it uses the default value, ``\``.
0420 :param comment: sets a single character used for skipping lines beginning with this
0421 character. By default (None), it is disabled.
0422 :param header: uses the first line as names of columns. If None is set, it uses the
0423 default value, ``false``.
0424 :param inferSchema: infers the input schema automatically from data. It requires one extra
0425 pass over the data. If None is set, it uses the default value, ``false``.
0426 :param enforceSchema: If it is set to ``true``, the specified or inferred schema will be
0427 forcibly applied to datasource files, and headers in CSV files will be
0428 ignored. If the option is set to ``false``, the schema will be
0429 validated against all headers in CSV files or the first header in RDD
0430 if the ``header`` option is set to ``true``. Field names in the schema
0431 and column names in CSV headers are checked by their positions
0432 taking into account ``spark.sql.caseSensitive``. If None is set,
0433 ``true`` is used by default. Though the default value is ``true``,
0434 it is recommended to disable the ``enforceSchema`` option
0435 to avoid incorrect results.
0436 :param ignoreLeadingWhiteSpace: A flag indicating whether or not leading whitespaces from
0437 values being read should be skipped. If None is set, it
0438 uses the default value, ``false``.
0439 :param ignoreTrailingWhiteSpace: A flag indicating whether or not trailing whitespaces from
0440 values being read should be skipped. If None is set, it
0441 uses the default value, ``false``.
0442 :param nullValue: sets the string representation of a null value. If None is set, it uses
0443 the default value, empty string. Since 2.0.1, this ``nullValue`` param
0444 applies to all supported types including the string type.
0445 :param nanValue: sets the string representation of a non-number value. If None is set, it
0446 uses the default value, ``NaN``.
0447 :param positiveInf: sets the string representation of a positive infinity value. If None
0448 is set, it uses the default value, ``Inf``.
0449 :param negativeInf: sets the string representation of a negative infinity value. If None
0450 is set, it uses the default value, ``Inf``.
0451 :param dateFormat: sets the string that indicates a date format. Custom date formats
0452 follow the formats at `datetime pattern`_.
0453 This applies to date type. If None is set, it uses the
0454 default value, ``yyyy-MM-dd``.
0455 :param timestampFormat: sets the string that indicates a timestamp format.
0456 Custom date formats follow the formats at `datetime pattern`_.
0457 This applies to timestamp type. If None is set, it uses the
0458 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0459 :param maxColumns: defines a hard limit of how many columns a record can have. If None is
0460 set, it uses the default value, ``20480``.
0461 :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
0462 value being read. If None is set, it uses the default value,
0463 ``-1`` meaning unlimited length.
0464 :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0.
0465 If specified, it is ignored.
0466 :param mode: allows a mode for dealing with corrupt records during parsing. If None is
0467 set, it uses the default value, ``PERMISSIVE``. Note that Spark tries to
0468 parse only required columns in CSV under column pruning. Therefore, corrupt
0469 records can be different based on required set of fields. This behavior can
0470 be controlled by ``spark.sql.csv.parser.columnPruning.enabled``
0471 (enabled by default).
0472
0473 * ``PERMISSIVE``: when it meets a corrupted record, puts the malformed string \
0474 into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
0475 fields to ``null``. To keep corrupt records, an user can set a string type \
0476 field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
0477 schema does not have the field, it drops corrupt records during parsing. \
0478 A record with less/more tokens than schema is not a corrupted record to CSV. \
0479 When it meets a record having fewer tokens than the length of the schema, \
0480 sets ``null`` to extra fields. When the record has more tokens than the \
0481 length of the schema, it drops extra tokens.
0482 * ``DROPMALFORMED``: ignores the whole corrupted records.
0483 * ``FAILFAST``: throws an exception when it meets corrupted records.
0484
0485 :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
0486 created by ``PERMISSIVE`` mode. This overrides
0487 ``spark.sql.columnNameOfCorruptRecord``. If None is set,
0488 it uses the value specified in
0489 ``spark.sql.columnNameOfCorruptRecord``.
0490 :param multiLine: parse records, which may span multiple lines. If None is
0491 set, it uses the default value, ``false``.
0492 :param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for
0493 the quote character. If None is set, the default value is
0494 escape character when escape and quote characters are
0495 different, ``\0`` otherwise.
0496 :param samplingRatio: defines fraction of rows used for schema inferring.
0497 If None is set, it uses the default value, ``1.0``.
0498 :param emptyValue: sets the string representation of an empty value. If None is set, it uses
0499 the default value, empty string.
0500 :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
0501 it uses the default value, ``en-US``. For instance, ``locale`` is used while
0502 parsing dates and timestamps.
0503 :param lineSep: defines the line separator that should be used for parsing. If None is
0504 set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0505 Maximum length is 1 character.
0506 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0507 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0508 It does not change the behavior of `partition discovery`_.
0509 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0510 disables `partition discovery`_.
0511
0512 >>> df = spark.read.csv('python/test_support/sql/ages.csv')
0513 >>> df.dtypes
0514 [('_c0', 'string'), ('_c1', 'string')]
0515 >>> rdd = sc.textFile('python/test_support/sql/ages.csv')
0516 >>> df2 = spark.read.csv(rdd)
0517 >>> df2.dtypes
0518 [('_c0', 'string'), ('_c1', 'string')]
0519 """
0520 self._set_opts(
0521 schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
0522 header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
0523 ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
0524 nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
0525 dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
0526 maxCharsPerColumn=maxCharsPerColumn,
0527 maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
0528 columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
0529 charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
0530 enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
0531 pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
0532 if isinstance(path, basestring):
0533 path = [path]
0534 if type(path) == list:
0535 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
0536 elif isinstance(path, RDD):
0537 def func(iterator):
0538 for x in iterator:
0539 if not isinstance(x, basestring):
0540 x = unicode(x)
0541 if isinstance(x, unicode):
0542 x = x.encode("utf-8")
0543 yield x
0544 keyed = path.mapPartitions(func)
0545 keyed._bypass_serializer = True
0546 jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
0547
0548
0549
0550
0551 jdataset = self._spark._ssql_ctx.createDataset(
0552 jrdd.rdd(),
0553 self._spark._jvm.Encoders.STRING())
0554 return self._df(self._jreader.csv(jdataset))
0555 else:
0556 raise TypeError("path can be only string, list or RDD")
0557
0558 @since(1.5)
0559 def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
0560 """Loads ORC files, returning the result as a :class:`DataFrame`.
0561
0562 :param mergeSchema: sets whether we should merge schemas collected from all
0563 ORC part-files. This will override ``spark.sql.orc.mergeSchema``.
0564 The default value is specified in ``spark.sql.orc.mergeSchema``.
0565 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0566 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0567 It does not change the behavior of `partition discovery`_.
0568 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0569 disables `partition discovery`_.
0570
0571 >>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
0572 >>> df.dtypes
0573 [('a', 'bigint'), ('b', 'int'), ('c', 'int')]
0574 """
0575 self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
0576 recursiveFileLookup=recursiveFileLookup)
0577 if isinstance(path, basestring):
0578 path = [path]
0579 return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))
0580
0581 @since(1.4)
0582 def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
0583 predicates=None, properties=None):
0584 """
0585 Construct a :class:`DataFrame` representing the database table named ``table``
0586 accessible via JDBC URL ``url`` and connection ``properties``.
0587
0588 Partitions of the table will be retrieved in parallel if either ``column`` or
0589 ``predicates`` is specified. ``lowerBound`, ``upperBound`` and ``numPartitions``
0590 is needed when ``column`` is specified.
0591
0592 If both ``column`` and ``predicates`` are specified, ``column`` will be used.
0593
0594 .. note:: Don't create too many partitions in parallel on a large cluster;
0595 otherwise Spark might crash your external database systems.
0596
0597 :param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
0598 :param table: the name of the table
0599 :param column: the name of a column of numeric, date, or timestamp type
0600 that will be used for partitioning;
0601 if this parameter is specified, then ``numPartitions``, ``lowerBound``
0602 (inclusive), and ``upperBound`` (exclusive) will form partition strides
0603 for generated WHERE clause expressions used to split the column
0604 ``column`` evenly
0605 :param lowerBound: the minimum value of ``column`` used to decide partition stride
0606 :param upperBound: the maximum value of ``column`` used to decide partition stride
0607 :param numPartitions: the number of partitions
0608 :param predicates: a list of expressions suitable for inclusion in WHERE clauses;
0609 each one defines one partition of the :class:`DataFrame`
0610 :param properties: a dictionary of JDBC database connection arguments. Normally at
0611 least properties "user" and "password" with their corresponding values.
0612 For example { 'user' : 'SYSTEM', 'password' : 'mypassword' }
0613 :return: a DataFrame
0614 """
0615 if properties is None:
0616 properties = dict()
0617 jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
0618 for k in properties:
0619 jprop.setProperty(k, properties[k])
0620 if column is not None:
0621 assert lowerBound is not None, "lowerBound can not be None when ``column`` is specified"
0622 assert upperBound is not None, "upperBound can not be None when ``column`` is specified"
0623 assert numPartitions is not None, \
0624 "numPartitions can not be None when ``column`` is specified"
0625 return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound),
0626 int(numPartitions), jprop))
0627 if predicates is not None:
0628 gateway = self._spark._sc._gateway
0629 jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
0630 return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
0631 return self._df(self._jreader.jdbc(url, table, jprop))
0632
0633
0634 class DataFrameWriter(OptionUtils):
0635 """
0636 Interface used to write a :class:`DataFrame` to external storage systems
0637 (e.g. file systems, key-value stores, etc). Use :attr:`DataFrame.write`
0638 to access this.
0639
0640 .. versionadded:: 1.4
0641 """
0642 def __init__(self, df):
0643 self._df = df
0644 self._spark = df.sql_ctx
0645 self._jwrite = df._jdf.write()
0646
0647 def _sq(self, jsq):
0648 from pyspark.sql.streaming import StreamingQuery
0649 return StreamingQuery(jsq)
0650
0651 @since(1.4)
0652 def mode(self, saveMode):
0653 """Specifies the behavior when data or table already exists.
0654
0655 Options include:
0656
0657 * `append`: Append contents of this :class:`DataFrame` to existing data.
0658 * `overwrite`: Overwrite existing data.
0659 * `error` or `errorifexists`: Throw an exception if data already exists.
0660 * `ignore`: Silently ignore this operation if data already exists.
0661
0662 >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
0663 """
0664
0665
0666 if saveMode is not None:
0667 self._jwrite = self._jwrite.mode(saveMode)
0668 return self
0669
0670 @since(1.4)
0671 def format(self, source):
0672 """Specifies the underlying output data source.
0673
0674 :param source: string, name of the data source, e.g. 'json', 'parquet'.
0675
0676 >>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
0677 """
0678 self._jwrite = self._jwrite.format(source)
0679 return self
0680
0681 @since(1.5)
0682 def option(self, key, value):
0683 """Adds an output option for the underlying data source.
0684
0685 You can set the following option(s) for writing files:
0686 * ``timeZone``: sets the string that indicates a time zone ID to be used to format
0687 timestamps in the JSON/CSV datasources or partition values. The following
0688 formats of `timeZone` are supported:
0689
0690 * Region-based zone ID: It should have the form 'area/city', such as \
0691 'America/Los_Angeles'.
0692 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0693 '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0694
0695 Other short names like 'CST' are not recommended to use because they can be
0696 ambiguous. If it isn't set, the current value of the SQL config
0697 ``spark.sql.session.timeZone`` is used by default.
0698 """
0699 self._jwrite = self._jwrite.option(key, to_str(value))
0700 return self
0701
0702 @since(1.4)
0703 def options(self, **options):
0704 """Adds output options for the underlying data source.
0705
0706 You can set the following option(s) for writing files:
0707 * ``timeZone``: sets the string that indicates a time zone ID to be used to format
0708 timestamps in the JSON/CSV datasources or partition values. The following
0709 formats of `timeZone` are supported:
0710
0711 * Region-based zone ID: It should have the form 'area/city', such as \
0712 'America/Los_Angeles'.
0713 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0714 '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0715
0716 Other short names like 'CST' are not recommended to use because they can be
0717 ambiguous. If it isn't set, the current value of the SQL config
0718 ``spark.sql.session.timeZone`` is used by default.
0719 """
0720 for k in options:
0721 self._jwrite = self._jwrite.option(k, to_str(options[k]))
0722 return self
0723
0724 @since(1.4)
0725 def partitionBy(self, *cols):
0726 """Partitions the output by the given columns on the file system.
0727
0728 If specified, the output is laid out on the file system similar
0729 to Hive's partitioning scheme.
0730
0731 :param cols: name of columns
0732
0733 >>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
0734 """
0735 if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
0736 cols = cols[0]
0737 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
0738 return self
0739
0740 @since(2.3)
0741 def bucketBy(self, numBuckets, col, *cols):
0742 """Buckets the output by the given columns.If specified,
0743 the output is laid out on the file system similar to Hive's bucketing scheme.
0744
0745 :param numBuckets: the number of buckets to save
0746 :param col: a name of a column, or a list of names.
0747 :param cols: additional names (optional). If `col` is a list it should be empty.
0748
0749 .. note:: Applicable for file-based data sources in combination with
0750 :py:meth:`DataFrameWriter.saveAsTable`.
0751
0752 >>> (df.write.format('parquet') # doctest: +SKIP
0753 ... .bucketBy(100, 'year', 'month')
0754 ... .mode("overwrite")
0755 ... .saveAsTable('bucketed_table'))
0756 """
0757 if not isinstance(numBuckets, int):
0758 raise TypeError("numBuckets should be an int, got {0}.".format(type(numBuckets)))
0759
0760 if isinstance(col, (list, tuple)):
0761 if cols:
0762 raise ValueError("col is a {0} but cols are not empty".format(type(col)))
0763
0764 col, cols = col[0], col[1:]
0765
0766 if not all(isinstance(c, basestring) for c in cols) or not(isinstance(col, basestring)):
0767 raise TypeError("all names should be `str`")
0768
0769 self._jwrite = self._jwrite.bucketBy(numBuckets, col, _to_seq(self._spark._sc, cols))
0770 return self
0771
0772 @since(2.3)
0773 def sortBy(self, col, *cols):
0774 """Sorts the output in each bucket by the given columns on the file system.
0775
0776 :param col: a name of a column, or a list of names.
0777 :param cols: additional names (optional). If `col` is a list it should be empty.
0778
0779 >>> (df.write.format('parquet') # doctest: +SKIP
0780 ... .bucketBy(100, 'year', 'month')
0781 ... .sortBy('day')
0782 ... .mode("overwrite")
0783 ... .saveAsTable('sorted_bucketed_table'))
0784 """
0785 if isinstance(col, (list, tuple)):
0786 if cols:
0787 raise ValueError("col is a {0} but cols are not empty".format(type(col)))
0788
0789 col, cols = col[0], col[1:]
0790
0791 if not all(isinstance(c, basestring) for c in cols) or not(isinstance(col, basestring)):
0792 raise TypeError("all names should be `str`")
0793
0794 self._jwrite = self._jwrite.sortBy(col, _to_seq(self._spark._sc, cols))
0795 return self
0796
0797 @since(1.4)
0798 def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
0799 """Saves the contents of the :class:`DataFrame` to a data source.
0800
0801 The data source is specified by the ``format`` and a set of ``options``.
0802 If ``format`` is not specified, the default data source configured by
0803 ``spark.sql.sources.default`` will be used.
0804
0805 :param path: the path in a Hadoop supported file system
0806 :param format: the format used to save
0807 :param mode: specifies the behavior of the save operation when data already exists.
0808
0809 * ``append``: Append contents of this :class:`DataFrame` to existing data.
0810 * ``overwrite``: Overwrite existing data.
0811 * ``ignore``: Silently ignore this operation if data already exists.
0812 * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
0813 exists.
0814 :param partitionBy: names of partitioning columns
0815 :param options: all other string options
0816
0817 >>> df.write.mode("append").save(os.path.join(tempfile.mkdtemp(), 'data'))
0818 """
0819 self.mode(mode).options(**options)
0820 if partitionBy is not None:
0821 self.partitionBy(partitionBy)
0822 if format is not None:
0823 self.format(format)
0824 if path is None:
0825 self._jwrite.save()
0826 else:
0827 self._jwrite.save(path)
0828
0829 @since(1.4)
0830 def insertInto(self, tableName, overwrite=None):
0831 """Inserts the content of the :class:`DataFrame` to the specified table.
0832
0833 It requires that the schema of the :class:`DataFrame` is the same as the
0834 schema of the table.
0835
0836 Optionally overwriting any existing data.
0837 """
0838 if overwrite is not None:
0839 self.mode("overwrite" if overwrite else "append")
0840 self._jwrite.insertInto(tableName)
0841
0842 @since(1.4)
0843 def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options):
0844 """Saves the content of the :class:`DataFrame` as the specified table.
0845
0846 In the case the table already exists, behavior of this function depends on the
0847 save mode, specified by the `mode` function (default to throwing an exception).
0848 When `mode` is `Overwrite`, the schema of the :class:`DataFrame` does not need to be
0849 the same as that of the existing table.
0850
0851 * `append`: Append contents of this :class:`DataFrame` to existing data.
0852 * `overwrite`: Overwrite existing data.
0853 * `error` or `errorifexists`: Throw an exception if data already exists.
0854 * `ignore`: Silently ignore this operation if data already exists.
0855
0856 :param name: the table name
0857 :param format: the format used to save
0858 :param mode: one of `append`, `overwrite`, `error`, `errorifexists`, `ignore` \
0859 (default: error)
0860 :param partitionBy: names of partitioning columns
0861 :param options: all other string options
0862 """
0863 self.mode(mode).options(**options)
0864 if partitionBy is not None:
0865 self.partitionBy(partitionBy)
0866 if format is not None:
0867 self.format(format)
0868 self._jwrite.saveAsTable(name)
0869
0870 @since(1.4)
0871 def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
0872 lineSep=None, encoding=None, ignoreNullFields=None):
0873 """Saves the content of the :class:`DataFrame` in JSON format
0874 (`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
0875 specified path.
0876
0877 :param path: the path in any Hadoop supported file system
0878 :param mode: specifies the behavior of the save operation when data already exists.
0879
0880 * ``append``: Append contents of this :class:`DataFrame` to existing data.
0881 * ``overwrite``: Overwrite existing data.
0882 * ``ignore``: Silently ignore this operation if data already exists.
0883 * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
0884 exists.
0885 :param compression: compression codec to use when saving to file. This can be one of the
0886 known case-insensitive shorten names (none, bzip2, gzip, lz4,
0887 snappy and deflate).
0888 :param dateFormat: sets the string that indicates a date format. Custom date formats
0889 follow the formats at `datetime pattern`_.
0890 This applies to date type. If None is set, it uses the
0891 default value, ``yyyy-MM-dd``.
0892 :param timestampFormat: sets the string that indicates a timestamp format.
0893 Custom date formats follow the formats at `datetime pattern`_.
0894 This applies to timestamp type. If None is set, it uses the
0895 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0896 :param encoding: specifies encoding (charset) of saved json files. If None is set,
0897 the default UTF-8 charset will be used.
0898 :param lineSep: defines the line separator that should be used for writing. If None is
0899 set, it uses the default value, ``\\n``.
0900 :param ignoreNullFields: Whether to ignore null fields when generating JSON objects.
0901 If None is set, it uses the default value, ``true``.
0902
0903 >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
0904 """
0905 self.mode(mode)
0906 self._set_opts(
0907 compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
0908 lineSep=lineSep, encoding=encoding, ignoreNullFields=ignoreNullFields)
0909 self._jwrite.json(path)
0910
0911 @since(1.4)
0912 def parquet(self, path, mode=None, partitionBy=None, compression=None):
0913 """Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
0914
0915 :param path: the path in any Hadoop supported file system
0916 :param mode: specifies the behavior of the save operation when data already exists.
0917
0918 * ``append``: Append contents of this :class:`DataFrame` to existing data.
0919 * ``overwrite``: Overwrite existing data.
0920 * ``ignore``: Silently ignore this operation if data already exists.
0921 * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
0922 exists.
0923 :param partitionBy: names of partitioning columns
0924 :param compression: compression codec to use when saving to file. This can be one of the
0925 known case-insensitive shorten names (none, uncompressed, snappy, gzip,
0926 lzo, brotli, lz4, and zstd). This will override
0927 ``spark.sql.parquet.compression.codec``. If None is set, it uses the
0928 value specified in ``spark.sql.parquet.compression.codec``.
0929
0930 >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
0931 """
0932 self.mode(mode)
0933 if partitionBy is not None:
0934 self.partitionBy(partitionBy)
0935 self._set_opts(compression=compression)
0936 self._jwrite.parquet(path)
0937
0938 @since(1.6)
0939 def text(self, path, compression=None, lineSep=None):
0940 """Saves the content of the DataFrame in a text file at the specified path.
0941 The text files will be encoded as UTF-8.
0942
0943 :param path: the path in any Hadoop supported file system
0944 :param compression: compression codec to use when saving to file. This can be one of the
0945 known case-insensitive shorten names (none, bzip2, gzip, lz4,
0946 snappy and deflate).
0947 :param lineSep: defines the line separator that should be used for writing. If None is
0948 set, it uses the default value, ``\\n``.
0949
0950 The DataFrame must have only one column that is of string type.
0951 Each row becomes a new line in the output file.
0952 """
0953 self._set_opts(compression=compression, lineSep=lineSep)
0954 self._jwrite.text(path)
0955
0956 @since(2.0)
0957 def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
0958 header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
0959 timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
0960 charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None):
0961 r"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
0962
0963 :param path: the path in any Hadoop supported file system
0964 :param mode: specifies the behavior of the save operation when data already exists.
0965
0966 * ``append``: Append contents of this :class:`DataFrame` to existing data.
0967 * ``overwrite``: Overwrite existing data.
0968 * ``ignore``: Silently ignore this operation if data already exists.
0969 * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
0970 exists.
0971
0972 :param compression: compression codec to use when saving to file. This can be one of the
0973 known case-insensitive shorten names (none, bzip2, gzip, lz4,
0974 snappy and deflate).
0975 :param sep: sets a separator (one or more characters) for each field and value. If None is
0976 set, it uses the default value, ``,``.
0977 :param quote: sets a single character used for escaping quoted values where the
0978 separator can be part of the value. If None is set, it uses the default
0979 value, ``"``. If an empty string is set, it uses ``u0000`` (null character).
0980 :param escape: sets a single character used for escaping quotes inside an already
0981 quoted value. If None is set, it uses the default value, ``\``
0982 :param escapeQuotes: a flag indicating whether values containing quotes should always
0983 be enclosed in quotes. If None is set, it uses the default value
0984 ``true``, escaping all values containing a quote character.
0985 :param quoteAll: a flag indicating whether all values should always be enclosed in
0986 quotes. If None is set, it uses the default value ``false``,
0987 only escaping values containing a quote character.
0988 :param header: writes the names of columns as the first line. If None is set, it uses
0989 the default value, ``false``.
0990 :param nullValue: sets the string representation of a null value. If None is set, it uses
0991 the default value, empty string.
0992 :param dateFormat: sets the string that indicates a date format. Custom date formats follow
0993 the formats at `datetime pattern`_.
0994 This applies to date type. If None is set, it uses the
0995 default value, ``yyyy-MM-dd``.
0996 :param timestampFormat: sets the string that indicates a timestamp format.
0997 Custom date formats follow the formats at `datetime pattern`_.
0998 This applies to timestamp type. If None is set, it uses the
0999 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
1000 :param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from
1001 values being written should be skipped. If None is set, it
1002 uses the default value, ``true``.
1003 :param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from
1004 values being written should be skipped. If None is set, it
1005 uses the default value, ``true``.
1006 :param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for
1007 the quote character. If None is set, the default value is
1008 escape character when escape and quote characters are
1009 different, ``\0`` otherwise..
1010 :param encoding: sets the encoding (charset) of saved csv files. If None is set,
1011 the default UTF-8 charset will be used.
1012 :param emptyValue: sets the string representation of an empty value. If None is set, it uses
1013 the default value, ``""``.
1014 :param lineSep: defines the line separator that should be used for writing. If None is
1015 set, it uses the default value, ``\\n``. Maximum length is 1 character.
1016
1017 >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
1018 """
1019 self.mode(mode)
1020 self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
1021 nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
1022 dateFormat=dateFormat, timestampFormat=timestampFormat,
1023 ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
1024 ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
1025 charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
1026 encoding=encoding, emptyValue=emptyValue, lineSep=lineSep)
1027 self._jwrite.csv(path)
1028
1029 @since(1.5)
1030 def orc(self, path, mode=None, partitionBy=None, compression=None):
1031 """Saves the content of the :class:`DataFrame` in ORC format at the specified path.
1032
1033 :param path: the path in any Hadoop supported file system
1034 :param mode: specifies the behavior of the save operation when data already exists.
1035
1036 * ``append``: Append contents of this :class:`DataFrame` to existing data.
1037 * ``overwrite``: Overwrite existing data.
1038 * ``ignore``: Silently ignore this operation if data already exists.
1039 * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
1040 exists.
1041 :param partitionBy: names of partitioning columns
1042 :param compression: compression codec to use when saving to file. This can be one of the
1043 known case-insensitive shorten names (none, snappy, zlib, and lzo).
1044 This will override ``orc.compress`` and
1045 ``spark.sql.orc.compression.codec``. If None is set, it uses the value
1046 specified in ``spark.sql.orc.compression.codec``.
1047
1048 >>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
1049 >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
1050 """
1051 self.mode(mode)
1052 if partitionBy is not None:
1053 self.partitionBy(partitionBy)
1054 self._set_opts(compression=compression)
1055 self._jwrite.orc(path)
1056
1057 @since(1.4)
1058 def jdbc(self, url, table, mode=None, properties=None):
1059 """Saves the content of the :class:`DataFrame` to an external database table via JDBC.
1060
1061 .. note:: Don't create too many partitions in parallel on a large cluster;
1062 otherwise Spark might crash your external database systems.
1063
1064 :param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
1065 :param table: Name of the table in the external database.
1066 :param mode: specifies the behavior of the save operation when data already exists.
1067
1068 * ``append``: Append contents of this :class:`DataFrame` to existing data.
1069 * ``overwrite``: Overwrite existing data.
1070 * ``ignore``: Silently ignore this operation if data already exists.
1071 * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
1072 exists.
1073 :param properties: a dictionary of JDBC database connection arguments. Normally at
1074 least properties "user" and "password" with their corresponding values.
1075 For example { 'user' : 'SYSTEM', 'password' : 'mypassword' }
1076 """
1077 if properties is None:
1078 properties = dict()
1079 jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
1080 for k in properties:
1081 jprop.setProperty(k, properties[k])
1082 self.mode(mode)._jwrite.jdbc(url, table, jprop)
1083
1084
1085 def _test():
1086 import doctest
1087 import os
1088 import tempfile
1089 import py4j
1090 from pyspark.context import SparkContext
1091 from pyspark.sql import SparkSession, Row
1092 import pyspark.sql.readwriter
1093
1094 os.chdir(os.environ["SPARK_HOME"])
1095
1096 globs = pyspark.sql.readwriter.__dict__.copy()
1097 sc = SparkContext('local[4]', 'PythonTest')
1098 try:
1099 spark = SparkSession.builder.getOrCreate()
1100 except py4j.protocol.Py4JError:
1101 spark = SparkSession(sc)
1102
1103 globs['tempfile'] = tempfile
1104 globs['os'] = os
1105 globs['sc'] = sc
1106 globs['spark'] = spark
1107 globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
1108 (failure_count, test_count) = doctest.testmod(
1109 pyspark.sql.readwriter, globs=globs,
1110 optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
1111 sc.stop()
1112 if failure_count:
1113 sys.exit(-1)
1114
1115
1116 if __name__ == "__main__":
1117 _test()