0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import sys
0019 import json
0020
0021 if sys.version >= '3':
0022 basestring = str
0023
0024 from py4j.java_gateway import java_import
0025
0026 from pyspark import since, keyword_only
0027 from pyspark.rdd import ignore_unicode_prefix
0028 from pyspark.sql.column import _to_seq
0029 from pyspark.sql.readwriter import OptionUtils, to_str
0030 from pyspark.sql.types import *
0031 from pyspark.sql.utils import ForeachBatchFunction, StreamingQueryException
0032
0033 __all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
0034
0035
0036 class StreamingQuery(object):
0037 """
0038 A handle to a query that is executing continuously in the background as new data arrives.
0039 All these methods are thread-safe.
0040
0041 .. note:: Evolving
0042
0043 .. versionadded:: 2.0
0044 """
0045
0046 def __init__(self, jsq):
0047 self._jsq = jsq
0048
0049 @property
0050 @since(2.0)
0051 def id(self):
0052 """Returns the unique id of this query that persists across restarts from checkpoint data.
0053 That is, this id is generated when a query is started for the first time, and
0054 will be the same every time it is restarted from checkpoint data.
0055 There can only be one query with the same id active in a Spark cluster.
0056 Also see, `runId`.
0057 """
0058 return self._jsq.id().toString()
0059
0060 @property
0061 @since(2.1)
0062 def runId(self):
0063 """Returns the unique id of this query that does not persist across restarts. That is, every
0064 query that is started (or restarted from checkpoint) will have a different runId.
0065 """
0066 return self._jsq.runId().toString()
0067
0068 @property
0069 @since(2.0)
0070 def name(self):
0071 """Returns the user-specified name of the query, or null if not specified.
0072 This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
0073 as `dataframe.writeStream.queryName("query").start()`.
0074 This name, if set, must be unique across all active queries.
0075 """
0076 return self._jsq.name()
0077
0078 @property
0079 @since(2.0)
0080 def isActive(self):
0081 """Whether this streaming query is currently active or not.
0082 """
0083 return self._jsq.isActive()
0084
0085 @since(2.0)
0086 def awaitTermination(self, timeout=None):
0087 """Waits for the termination of `this` query, either by :func:`query.stop()` or by an
0088 exception. If the query has terminated with an exception, then the exception will be thrown.
0089 If `timeout` is set, it returns whether the query has terminated or not within the
0090 `timeout` seconds.
0091
0092 If the query has terminated, then all subsequent calls to this method will either return
0093 immediately (if the query was terminated by :func:`stop()`), or throw the exception
0094 immediately (if the query has terminated with exception).
0095
0096 throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
0097 """
0098 if timeout is not None:
0099 if not isinstance(timeout, (int, float)) or timeout < 0:
0100 raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
0101 return self._jsq.awaitTermination(int(timeout * 1000))
0102 else:
0103 return self._jsq.awaitTermination()
0104
0105 @property
0106 @since(2.1)
0107 def status(self):
0108 """
0109 Returns the current status of the query.
0110 """
0111 return json.loads(self._jsq.status().json())
0112
0113 @property
0114 @since(2.1)
0115 def recentProgress(self):
0116 """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
0117 The number of progress updates retained for each stream is configured by Spark session
0118 configuration `spark.sql.streaming.numRecentProgressUpdates`.
0119 """
0120 return [json.loads(p.json()) for p in self._jsq.recentProgress()]
0121
0122 @property
0123 @since(2.1)
0124 def lastProgress(self):
0125 """
0126 Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or
0127 None if there were no progress updates
0128
0129 :return: a map
0130 """
0131 lastProgress = self._jsq.lastProgress()
0132 if lastProgress:
0133 return json.loads(lastProgress.json())
0134 else:
0135 return None
0136
0137 @since(2.0)
0138 def processAllAvailable(self):
0139 """Blocks until all available data in the source has been processed and committed to the
0140 sink. This method is intended for testing.
0141
0142 .. note:: In the case of continually arriving data, this method may block forever.
0143 Additionally, this method is only guaranteed to block until data that has been
0144 synchronously appended data to a stream source prior to invocation.
0145 (i.e. `getOffset` must immediately reflect the addition).
0146 """
0147 return self._jsq.processAllAvailable()
0148
0149 @since(2.0)
0150 def stop(self):
0151 """Stop this streaming query.
0152 """
0153 self._jsq.stop()
0154
0155 @since(2.1)
0156 def explain(self, extended=False):
0157 """Prints the (logical and physical) plans to the console for debugging purpose.
0158
0159 :param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
0160
0161 >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
0162 >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
0163 >>> sq.explain()
0164 == Physical Plan ==
0165 ...
0166 >>> sq.explain(True)
0167 == Parsed Logical Plan ==
0168 ...
0169 == Analyzed Logical Plan ==
0170 ...
0171 == Optimized Logical Plan ==
0172 ...
0173 == Physical Plan ==
0174 ...
0175 >>> sq.stop()
0176 """
0177
0178
0179 print(self._jsq.explainInternal(extended))
0180
0181 @since(2.1)
0182 def exception(self):
0183 """
0184 :return: the StreamingQueryException if the query was terminated by an exception, or None.
0185 """
0186 if self._jsq.exception().isDefined():
0187 je = self._jsq.exception().get()
0188 msg = je.toString().split(': ', 1)[1]
0189 stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
0190 return StreamingQueryException(msg, stackTrace, je.getCause())
0191 else:
0192 return None
0193
0194
0195 class StreamingQueryManager(object):
0196 """A class to manage all the :class:`StreamingQuery` StreamingQueries active.
0197
0198 .. note:: Evolving
0199
0200 .. versionadded:: 2.0
0201 """
0202
0203 def __init__(self, jsqm):
0204 self._jsqm = jsqm
0205
0206 @property
0207 @ignore_unicode_prefix
0208 @since(2.0)
0209 def active(self):
0210 """Returns a list of active queries associated with this SQLContext
0211
0212 >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
0213 >>> sqm = spark.streams
0214 >>> # get the list of active streaming queries
0215 >>> [q.name for q in sqm.active]
0216 [u'this_query']
0217 >>> sq.stop()
0218 """
0219 return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
0220
0221 @ignore_unicode_prefix
0222 @since(2.0)
0223 def get(self, id):
0224 """Returns an active query from this SQLContext or throws exception if an active query
0225 with this name doesn't exist.
0226
0227 >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
0228 >>> sq.name
0229 u'this_query'
0230 >>> sq = spark.streams.get(sq.id)
0231 >>> sq.isActive
0232 True
0233 >>> sq = sqlContext.streams.get(sq.id)
0234 >>> sq.isActive
0235 True
0236 >>> sq.stop()
0237 """
0238 return StreamingQuery(self._jsqm.get(id))
0239
0240 @since(2.0)
0241 def awaitAnyTermination(self, timeout=None):
0242 """Wait until any of the queries on the associated SQLContext has terminated since the
0243 creation of the context, or since :func:`resetTerminated()` was called. If any query was
0244 terminated with an exception, then the exception will be thrown.
0245 If `timeout` is set, it returns whether the query has terminated or not within the
0246 `timeout` seconds.
0247
0248 If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will
0249 either return immediately (if the query was terminated by :func:`query.stop()`),
0250 or throw the exception immediately (if the query was terminated with exception). Use
0251 :func:`resetTerminated()` to clear past terminations and wait for new terminations.
0252
0253 In the case where multiple queries have terminated since :func:`resetTermination()`
0254 was called, if any query has terminated with exception, then :func:`awaitAnyTermination()`
0255 will throw any of the exception. For correctly documenting exceptions across multiple
0256 queries, users need to stop all of them after any of them terminates with exception, and
0257 then check the `query.exception()` for each query.
0258
0259 throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
0260 """
0261 if timeout is not None:
0262 if not isinstance(timeout, (int, float)) or timeout < 0:
0263 raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
0264 return self._jsqm.awaitAnyTermination(int(timeout * 1000))
0265 else:
0266 return self._jsqm.awaitAnyTermination()
0267
0268 @since(2.0)
0269 def resetTerminated(self):
0270 """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used
0271 again to wait for new terminations.
0272
0273 >>> spark.streams.resetTerminated()
0274 """
0275 self._jsqm.resetTerminated()
0276
0277
0278 class DataStreamReader(OptionUtils):
0279 """
0280 Interface used to load a streaming :class:`DataFrame <pyspark.sql.DataFrame>` from external
0281 storage systems (e.g. file systems, key-value stores, etc).
0282 Use :attr:`SparkSession.readStream <pyspark.sql.SparkSession.readStream>` to access this.
0283
0284 .. note:: Evolving.
0285
0286 .. versionadded:: 2.0
0287 """
0288
0289 def __init__(self, spark):
0290 self._jreader = spark._ssql_ctx.readStream()
0291 self._spark = spark
0292
0293 def _df(self, jdf):
0294 from pyspark.sql.dataframe import DataFrame
0295 return DataFrame(jdf, self._spark)
0296
0297 @since(2.0)
0298 def format(self, source):
0299 """Specifies the input data source format.
0300
0301 .. note:: Evolving.
0302
0303 :param source: string, name of the data source, e.g. 'json', 'parquet'.
0304
0305 >>> s = spark.readStream.format("text")
0306 """
0307 self._jreader = self._jreader.format(source)
0308 return self
0309
0310 @since(2.0)
0311 def schema(self, schema):
0312 """Specifies the input schema.
0313
0314 Some data sources (e.g. JSON) can infer the input schema automatically from data.
0315 By specifying the schema here, the underlying data source can skip the schema
0316 inference step, and thus speed up data loading.
0317
0318 .. note:: Evolving.
0319
0320 :param schema: a :class:`pyspark.sql.types.StructType` object or a DDL-formatted string
0321 (For example ``col0 INT, col1 DOUBLE``).
0322
0323 >>> s = spark.readStream.schema(sdf_schema)
0324 >>> s = spark.readStream.schema("col0 INT, col1 DOUBLE")
0325 """
0326 from pyspark.sql import SparkSession
0327 spark = SparkSession.builder.getOrCreate()
0328 if isinstance(schema, StructType):
0329 jschema = spark._jsparkSession.parseDataType(schema.json())
0330 self._jreader = self._jreader.schema(jschema)
0331 elif isinstance(schema, basestring):
0332 self._jreader = self._jreader.schema(schema)
0333 else:
0334 raise TypeError("schema should be StructType or string")
0335 return self
0336
0337 @since(2.0)
0338 def option(self, key, value):
0339 """Adds an input option for the underlying data source.
0340
0341 You can set the following option(s) for reading files:
0342 * ``timeZone``: sets the string that indicates a time zone ID to be used to parse
0343 timestamps in the JSON/CSV datasources or partition values. The following
0344 formats of `timeZone` are supported:
0345
0346 * Region-based zone ID: It should have the form 'area/city', such as \
0347 'America/Los_Angeles'.
0348 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0349 '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0350
0351 Other short names like 'CST' are not recommended to use because they can be
0352 ambiguous. If it isn't set, the current value of the SQL config
0353 ``spark.sql.session.timeZone`` is used by default.
0354
0355 .. note:: Evolving.
0356
0357 >>> s = spark.readStream.option("x", 1)
0358 """
0359 self._jreader = self._jreader.option(key, to_str(value))
0360 return self
0361
0362 @since(2.0)
0363 def options(self, **options):
0364 """Adds input options for the underlying data source.
0365
0366 You can set the following option(s) for reading files:
0367 * ``timeZone``: sets the string that indicates a time zone ID to be used to parse
0368 timestamps in the JSON/CSV datasources or partition values. The following
0369 formats of `timeZone` are supported:
0370
0371 * Region-based zone ID: It should have the form 'area/city', such as \
0372 'America/Los_Angeles'.
0373 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0374 '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0375
0376 Other short names like 'CST' are not recommended to use because they can be
0377 ambiguous. If it isn't set, the current value of the SQL config
0378 ``spark.sql.session.timeZone`` is used by default.
0379
0380 .. note:: Evolving.
0381
0382 >>> s = spark.readStream.options(x="1", y=2)
0383 """
0384 for k in options:
0385 self._jreader = self._jreader.option(k, to_str(options[k]))
0386 return self
0387
0388 @since(2.0)
0389 def load(self, path=None, format=None, schema=None, **options):
0390 """Loads a data stream from a data source and returns it as a
0391 :class:`DataFrame <pyspark.sql.DataFrame>`.
0392
0393 .. note:: Evolving.
0394
0395 :param path: optional string for file-system backed data sources.
0396 :param format: optional string for format of the data source. Default to 'parquet'.
0397 :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema
0398 or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0399 :param options: all other string options
0400
0401 >>> json_sdf = spark.readStream.format("json") \\
0402 ... .schema(sdf_schema) \\
0403 ... .load(tempfile.mkdtemp())
0404 >>> json_sdf.isStreaming
0405 True
0406 >>> json_sdf.schema == sdf_schema
0407 True
0408 """
0409 if format is not None:
0410 self.format(format)
0411 if schema is not None:
0412 self.schema(schema)
0413 self.options(**options)
0414 if path is not None:
0415 if type(path) != str or len(path.strip()) == 0:
0416 raise ValueError("If the path is provided for stream, it needs to be a " +
0417 "non-empty string. List of paths are not supported.")
0418 return self._df(self._jreader.load(path))
0419 else:
0420 return self._df(self._jreader.load())
0421
0422 @since(2.0)
0423 def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
0424 allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
0425 allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
0426 mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
0427 multiLine=None, allowUnquotedControlChars=None, lineSep=None, locale=None,
0428 dropFieldIfAllNull=None, encoding=None, pathGlobFilter=None,
0429 recursiveFileLookup=None):
0430 """
0431 Loads a JSON file stream and returns the results as a :class:`DataFrame`.
0432
0433 `JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
0434 For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
0435
0436 If the ``schema`` parameter is not specified, this function goes
0437 through the input once to determine the input schema.
0438
0439 .. note:: Evolving.
0440
0441 :param path: string represents path to the JSON dataset,
0442 or RDD of Strings storing JSON objects.
0443 :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
0444 or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0445 :param primitivesAsString: infers all primitive values as a string type. If None is set,
0446 it uses the default value, ``false``.
0447 :param prefersDecimal: infers all floating-point values as a decimal type. If the values
0448 do not fit in decimal, then it infers them as doubles. If None is
0449 set, it uses the default value, ``false``.
0450 :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
0451 it uses the default value, ``false``.
0452 :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
0453 it uses the default value, ``false``.
0454 :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
0455 set, it uses the default value, ``true``.
0456 :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
0457 set, it uses the default value, ``false``.
0458 :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
0459 using backslash quoting mechanism. If None is
0460 set, it uses the default value, ``false``.
0461 :param mode: allows a mode for dealing with corrupt records during parsing. If None is
0462 set, it uses the default value, ``PERMISSIVE``.
0463
0464 * ``PERMISSIVE``: when it meets a corrupted record, puts the malformed string \
0465 into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
0466 fields to ``null``. To keep corrupt records, an user can set a string type \
0467 field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
0468 schema does not have the field, it drops corrupt records during parsing. \
0469 When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
0470 field in an output schema.
0471 * ``DROPMALFORMED``: ignores the whole corrupted records.
0472 * ``FAILFAST``: throws an exception when it meets corrupted records.
0473
0474 :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
0475 created by ``PERMISSIVE`` mode. This overrides
0476 ``spark.sql.columnNameOfCorruptRecord``. If None is set,
0477 it uses the value specified in
0478 ``spark.sql.columnNameOfCorruptRecord``.
0479 :param dateFormat: sets the string that indicates a date format. Custom date formats
0480 follow the formats at `datetime pattern`_.
0481 This applies to date type. If None is set, it uses the
0482 default value, ``yyyy-MM-dd``.
0483 :param timestampFormat: sets the string that indicates a timestamp format.
0484 Custom date formats follow the formats at `datetime pattern`_.
0485 This applies to timestamp type. If None is set, it uses the
0486 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0487 :param multiLine: parse one record, which may span multiple lines, per file. If None is
0488 set, it uses the default value, ``false``.
0489 :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
0490 characters (ASCII characters with value less than 32,
0491 including tab and line feed characters) or not.
0492 :param lineSep: defines the line separator that should be used for parsing. If None is
0493 set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0494 :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
0495 it uses the default value, ``en-US``. For instance, ``locale`` is used while
0496 parsing dates and timestamps.
0497 :param dropFieldIfAllNull: whether to ignore column of all null values or empty
0498 array/struct during schema inference. If None is set, it
0499 uses the default value, ``false``.
0500 :param encoding: allows to forcibly set one of standard basic or extended encoding for
0501 the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
0502 the encoding of input JSON will be detected automatically
0503 when the multiLine option is set to ``true``.
0504 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0505 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0506 It does not change the behavior of `partition discovery`_.
0507 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0508 disables `partition discovery`_.
0509
0510 .. _partition discovery:
0511 https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
0512 .. _datetime pattern: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
0513
0514 >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
0515 >>> json_sdf.isStreaming
0516 True
0517 >>> json_sdf.schema == sdf_schema
0518 True
0519 """
0520 self._set_opts(
0521 schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
0522 allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
0523 allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
0524 allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
0525 mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
0526 timestampFormat=timestampFormat, multiLine=multiLine,
0527 allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, locale=locale,
0528 dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
0529 pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
0530 if isinstance(path, basestring):
0531 return self._df(self._jreader.json(path))
0532 else:
0533 raise TypeError("path can be only a single string")
0534
0535 @since(2.3)
0536 def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
0537 """Loads a ORC file stream, returning the result as a :class:`DataFrame`.
0538
0539 .. note:: Evolving.
0540
0541 :param mergeSchema: sets whether we should merge schemas collected from all
0542 ORC part-files. This will override ``spark.sql.orc.mergeSchema``.
0543 The default value is specified in ``spark.sql.orc.mergeSchema``.
0544 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0545 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0546 It does not change the behavior of `partition discovery`_.
0547 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0548 disables `partition discovery`_.
0549
0550 >>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
0551 >>> orc_sdf.isStreaming
0552 True
0553 >>> orc_sdf.schema == sdf_schema
0554 True
0555 """
0556 self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
0557 recursiveFileLookup=recursiveFileLookup)
0558 if isinstance(path, basestring):
0559 return self._df(self._jreader.orc(path))
0560 else:
0561 raise TypeError("path can be only a single string")
0562
0563 @since(2.0)
0564 def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
0565 """
0566 Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
0567
0568 .. note:: Evolving.
0569
0570 :param mergeSchema: sets whether we should merge schemas collected from all
0571 Parquet part-files. This will override
0572 ``spark.sql.parquet.mergeSchema``. The default value is specified in
0573 ``spark.sql.parquet.mergeSchema``.
0574 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0575 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0576 It does not change the behavior of `partition discovery`_.
0577 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0578 disables `partition discovery`_.
0579
0580 >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
0581 >>> parquet_sdf.isStreaming
0582 True
0583 >>> parquet_sdf.schema == sdf_schema
0584 True
0585 """
0586 self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
0587 recursiveFileLookup=recursiveFileLookup)
0588 if isinstance(path, basestring):
0589 return self._df(self._jreader.parquet(path))
0590 else:
0591 raise TypeError("path can be only a single string")
0592
0593 @ignore_unicode_prefix
0594 @since(2.0)
0595 def text(self, path, wholetext=False, lineSep=None, pathGlobFilter=None,
0596 recursiveFileLookup=None):
0597 """
0598 Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
0599 string column named "value", and followed by partitioned columns if there
0600 are any.
0601 The text files must be encoded as UTF-8.
0602
0603 By default, each line in the text file is a new row in the resulting DataFrame.
0604
0605 .. note:: Evolving.
0606
0607 :param paths: string, or list of strings, for input path(s).
0608 :param wholetext: if true, read each file from input path(s) as a single row.
0609 :param lineSep: defines the line separator that should be used for parsing. If None is
0610 set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0611 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0612 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0613 It does not change the behavior of `partition discovery`_.
0614 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0615 disables `partition discovery`_.
0616
0617 >>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
0618 >>> text_sdf.isStreaming
0619 True
0620 >>> "value" in str(text_sdf.schema)
0621 True
0622 """
0623 self._set_opts(
0624 wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
0625 recursiveFileLookup=recursiveFileLookup)
0626 if isinstance(path, basestring):
0627 return self._df(self._jreader.text(path))
0628 else:
0629 raise TypeError("path can be only a single string")
0630
0631 @since(2.0)
0632 def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
0633 comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
0634 ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
0635 negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
0636 maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
0637 columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
0638 enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
0639 pathGlobFilter=None, recursiveFileLookup=None):
0640 r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
0641
0642 This function will go through the input once to determine the input schema if
0643 ``inferSchema`` is enabled. To avoid going through the entire data once, disable
0644 ``inferSchema`` option or specify the schema explicitly using ``schema``.
0645
0646 .. note:: Evolving.
0647
0648 :param path: string, or list of strings, for input path(s).
0649 :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
0650 or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0651 :param sep: sets a separator (one or more characters) for each field and value. If None is
0652 set, it uses the default value, ``,``.
0653 :param encoding: decodes the CSV files by the given encoding type. If None is set,
0654 it uses the default value, ``UTF-8``.
0655 :param quote: sets a single character used for escaping quoted values where the
0656 separator can be part of the value. If None is set, it uses the default
0657 value, ``"``. If you would like to turn off quotations, you need to set an
0658 empty string.
0659 :param escape: sets a single character used for escaping quotes inside an already
0660 quoted value. If None is set, it uses the default value, ``\``.
0661 :param comment: sets a single character used for skipping lines beginning with this
0662 character. By default (None), it is disabled.
0663 :param header: uses the first line as names of columns. If None is set, it uses the
0664 default value, ``false``.
0665 :param inferSchema: infers the input schema automatically from data. It requires one extra
0666 pass over the data. If None is set, it uses the default value, ``false``.
0667 :param enforceSchema: If it is set to ``true``, the specified or inferred schema will be
0668 forcibly applied to datasource files, and headers in CSV files will be
0669 ignored. If the option is set to ``false``, the schema will be
0670 validated against all headers in CSV files or the first header in RDD
0671 if the ``header`` option is set to ``true``. Field names in the schema
0672 and column names in CSV headers are checked by their positions
0673 taking into account ``spark.sql.caseSensitive``. If None is set,
0674 ``true`` is used by default. Though the default value is ``true``,
0675 it is recommended to disable the ``enforceSchema`` option
0676 to avoid incorrect results.
0677 :param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from
0678 values being read should be skipped. If None is set, it
0679 uses the default value, ``false``.
0680 :param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from
0681 values being read should be skipped. If None is set, it
0682 uses the default value, ``false``.
0683 :param nullValue: sets the string representation of a null value. If None is set, it uses
0684 the default value, empty string. Since 2.0.1, this ``nullValue`` param
0685 applies to all supported types including the string type.
0686 :param nanValue: sets the string representation of a non-number value. If None is set, it
0687 uses the default value, ``NaN``.
0688 :param positiveInf: sets the string representation of a positive infinity value. If None
0689 is set, it uses the default value, ``Inf``.
0690 :param negativeInf: sets the string representation of a negative infinity value. If None
0691 is set, it uses the default value, ``Inf``.
0692 :param dateFormat: sets the string that indicates a date format. Custom date formats
0693 follow the formats at `datetime pattern`_.
0694 This applies to date type. If None is set, it uses the
0695 default value, ``yyyy-MM-dd``.
0696 :param timestampFormat: sets the string that indicates a timestamp format.
0697 Custom date formats follow the formats at `datetime pattern`_.
0698 This applies to timestamp type. If None is set, it uses the
0699 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0700 :param maxColumns: defines a hard limit of how many columns a record can have. If None is
0701 set, it uses the default value, ``20480``.
0702 :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
0703 value being read. If None is set, it uses the default value,
0704 ``-1`` meaning unlimited length.
0705 :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0.
0706 If specified, it is ignored.
0707 :param mode: allows a mode for dealing with corrupt records during parsing. If None is
0708 set, it uses the default value, ``PERMISSIVE``.
0709
0710 * ``PERMISSIVE``: when it meets a corrupted record, puts the malformed string \
0711 into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
0712 fields to ``null``. To keep corrupt records, an user can set a string type \
0713 field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
0714 schema does not have the field, it drops corrupt records during parsing. \
0715 A record with less/more tokens than schema is not a corrupted record to CSV. \
0716 When it meets a record having fewer tokens than the length of the schema, \
0717 sets ``null`` to extra fields. When the record has more tokens than the \
0718 length of the schema, it drops extra tokens.
0719 * ``DROPMALFORMED``: ignores the whole corrupted records.
0720 * ``FAILFAST``: throws an exception when it meets corrupted records.
0721
0722 :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
0723 created by ``PERMISSIVE`` mode. This overrides
0724 ``spark.sql.columnNameOfCorruptRecord``. If None is set,
0725 it uses the value specified in
0726 ``spark.sql.columnNameOfCorruptRecord``.
0727 :param multiLine: parse one record, which may span multiple lines. If None is
0728 set, it uses the default value, ``false``.
0729 :param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for
0730 the quote character. If None is set, the default value is
0731 escape character when escape and quote characters are
0732 different, ``\0`` otherwise..
0733 :param emptyValue: sets the string representation of an empty value. If None is set, it uses
0734 the default value, empty string.
0735 :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
0736 it uses the default value, ``en-US``. For instance, ``locale`` is used while
0737 parsing dates and timestamps.
0738 :param lineSep: defines the line separator that should be used for parsing. If None is
0739 set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0740 Maximum length is 1 character.
0741 :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0742 the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0743 It does not change the behavior of `partition discovery`_.
0744 :param recursiveFileLookup: recursively scan a directory for files. Using this option
0745 disables `partition discovery`_.
0746
0747 >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
0748 >>> csv_sdf.isStreaming
0749 True
0750 >>> csv_sdf.schema == sdf_schema
0751 True
0752 """
0753 self._set_opts(
0754 schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
0755 header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
0756 ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
0757 nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
0758 dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
0759 maxCharsPerColumn=maxCharsPerColumn,
0760 maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
0761 columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
0762 charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
0763 emptyValue=emptyValue, locale=locale, lineSep=lineSep,
0764 pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
0765 if isinstance(path, basestring):
0766 return self._df(self._jreader.csv(path))
0767 else:
0768 raise TypeError("path can be only a single string")
0769
0770
0771 class DataStreamWriter(object):
0772 """
0773 Interface used to write a streaming :class:`DataFrame <pyspark.sql.DataFrame>` to external
0774 storage systems (e.g. file systems, key-value stores, etc).
0775 Use :attr:`DataFrame.writeStream <pyspark.sql.DataFrame.writeStream>`
0776 to access this.
0777
0778 .. note:: Evolving.
0779
0780 .. versionadded:: 2.0
0781 """
0782
0783 def __init__(self, df):
0784 self._df = df
0785 self._spark = df.sql_ctx
0786 self._jwrite = df._jdf.writeStream()
0787
0788 def _sq(self, jsq):
0789 from pyspark.sql.streaming import StreamingQuery
0790 return StreamingQuery(jsq)
0791
0792 @since(2.0)
0793 def outputMode(self, outputMode):
0794 """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
0795
0796 Options include:
0797
0798 * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to
0799 the sink
0800 * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the sink
0801 every time these is some updates
0802 * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be
0803 written to the sink every time there are some updates. If the query doesn't contain
0804 aggregations, it will be equivalent to `append` mode.
0805
0806 .. note:: Evolving.
0807
0808 >>> writer = sdf.writeStream.outputMode('append')
0809 """
0810 if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
0811 raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
0812 self._jwrite = self._jwrite.outputMode(outputMode)
0813 return self
0814
0815 @since(2.0)
0816 def format(self, source):
0817 """Specifies the underlying output data source.
0818
0819 .. note:: Evolving.
0820
0821 :param source: string, name of the data source, which for now can be 'parquet'.
0822
0823 >>> writer = sdf.writeStream.format('json')
0824 """
0825 self._jwrite = self._jwrite.format(source)
0826 return self
0827
0828 @since(2.0)
0829 def option(self, key, value):
0830 """Adds an output option for the underlying data source.
0831
0832 You can set the following option(s) for writing files:
0833 * ``timeZone``: sets the string that indicates a time zone ID to be used to format
0834 timestamps in the JSON/CSV datasources or partition values. The following
0835 formats of `timeZone` are supported:
0836
0837 * Region-based zone ID: It should have the form 'area/city', such as \
0838 'America/Los_Angeles'.
0839 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0840 '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0841
0842 Other short names like 'CST' are not recommended to use because they can be
0843 ambiguous. If it isn't set, the current value of the SQL config
0844 ``spark.sql.session.timeZone`` is used by default.
0845
0846 .. note:: Evolving.
0847 """
0848 self._jwrite = self._jwrite.option(key, to_str(value))
0849 return self
0850
0851 @since(2.0)
0852 def options(self, **options):
0853 """Adds output options for the underlying data source.
0854
0855 You can set the following option(s) for writing files:
0856 * ``timeZone``: sets the string that indicates a time zone ID to be used to format
0857 timestamps in the JSON/CSV datasources or partition values. The following
0858 formats of `timeZone` are supported:
0859
0860 * Region-based zone ID: It should have the form 'area/city', such as \
0861 'America/Los_Angeles'.
0862 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0863 '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0864
0865 Other short names like 'CST' are not recommended to use because they can be
0866 ambiguous. If it isn't set, the current value of the SQL config
0867 ``spark.sql.session.timeZone`` is used by default.
0868
0869 .. note:: Evolving.
0870 """
0871 for k in options:
0872 self._jwrite = self._jwrite.option(k, to_str(options[k]))
0873 return self
0874
0875 @since(2.0)
0876 def partitionBy(self, *cols):
0877 """Partitions the output by the given columns on the file system.
0878
0879 If specified, the output is laid out on the file system similar
0880 to Hive's partitioning scheme.
0881
0882 .. note:: Evolving.
0883
0884 :param cols: name of columns
0885
0886 """
0887 if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
0888 cols = cols[0]
0889 self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
0890 return self
0891
0892 @since(2.0)
0893 def queryName(self, queryName):
0894 """Specifies the name of the :class:`StreamingQuery` that can be started with
0895 :func:`start`. This name must be unique among all the currently active queries
0896 in the associated SparkSession.
0897
0898 .. note:: Evolving.
0899
0900 :param queryName: unique name for the query
0901
0902 >>> writer = sdf.writeStream.queryName('streaming_query')
0903 """
0904 if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
0905 raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
0906 self._jwrite = self._jwrite.queryName(queryName)
0907 return self
0908
0909 @keyword_only
0910 @since(2.0)
0911 def trigger(self, processingTime=None, once=None, continuous=None):
0912 """Set the trigger for the stream query. If this is not set it will run the query as fast
0913 as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
0914
0915 .. note:: Evolving.
0916
0917 :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
0918 Set a trigger that runs a microbatch query periodically based on the
0919 processing time. Only one trigger can be set.
0920 :param once: if set to True, set a trigger that processes only one batch of data in a
0921 streaming query then terminates the query. Only one trigger can be set.
0922 :param continuous: a time interval as a string, e.g. '5 seconds', '1 minute'.
0923 Set a trigger that runs a continuous query with a given checkpoint
0924 interval. Only one trigger can be set.
0925
0926 >>> # trigger the query for execution every 5 seconds
0927 >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
0928 >>> # trigger the query for just once batch of data
0929 >>> writer = sdf.writeStream.trigger(once=True)
0930 >>> # trigger the query for execution every 5 seconds
0931 >>> writer = sdf.writeStream.trigger(continuous='5 seconds')
0932 """
0933 params = [processingTime, once, continuous]
0934
0935 if params.count(None) == 3:
0936 raise ValueError('No trigger provided')
0937 elif params.count(None) < 2:
0938 raise ValueError('Multiple triggers not allowed.')
0939
0940 jTrigger = None
0941 if processingTime is not None:
0942 if type(processingTime) != str or len(processingTime.strip()) == 0:
0943 raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
0944 processingTime)
0945 interval = processingTime.strip()
0946 jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
0947 interval)
0948
0949 elif once is not None:
0950 if once is not True:
0951 raise ValueError('Value for once must be True. Got: %s' % once)
0952 jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()
0953
0954 else:
0955 if type(continuous) != str or len(continuous.strip()) == 0:
0956 raise ValueError('Value for continuous must be a non empty string. Got: %s' %
0957 continuous)
0958 interval = continuous.strip()
0959 jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous(
0960 interval)
0961
0962 self._jwrite = self._jwrite.trigger(jTrigger)
0963 return self
0964
0965 @since(2.4)
0966 def foreach(self, f):
0967 """
0968 Sets the output of the streaming query to be processed using the provided writer ``f``.
0969 This is often used to write the output of a streaming query to arbitrary storage systems.
0970 The processing logic can be specified in two ways.
0971
0972 #. A **function** that takes a row as input.
0973 This is a simple way to express your processing logic. Note that this does
0974 not allow you to deduplicate generated data when failures cause reprocessing of
0975 some input data. That would require you to specify the processing logic in the next
0976 way.
0977
0978 #. An **object** with a ``process`` method and optional ``open`` and ``close`` methods.
0979 The object can have the following methods.
0980
0981 * ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing
0982 (for example, open a connection, start a transaction, etc). Additionally, you can
0983 use the `partition_id` and `epoch_id` to deduplicate regenerated data
0984 (discussed later).
0985
0986 * ``process(row)``: *Non-optional* method that processes each :class:`Row`.
0987
0988 * ``close(error)``: *Optional* method that finalizes and cleans up (for example,
0989 close connection, commit transaction, etc.) after all rows have been processed.
0990
0991 The object will be used by Spark in the following way.
0992
0993 * A single copy of this object is responsible of all the data generated by a
0994 single task in a query. In other words, one instance is responsible for
0995 processing one partition of the data generated in a distributed manner.
0996
0997 * This object must be serializable because each task will get a fresh
0998 serialized-deserialized copy of the provided object. Hence, it is strongly
0999 recommended that any initialization for writing data (e.g. opening a
1000 connection or starting a transaction) is done after the `open(...)`
1001 method has been called, which signifies that the task is ready to generate data.
1002
1003 * The lifecycle of the methods are as follows.
1004
1005 For each partition with ``partition_id``:
1006
1007 ... For each batch/epoch of streaming data with ``epoch_id``:
1008
1009 ....... Method ``open(partitionId, epochId)`` is called.
1010
1011 ....... If ``open(...)`` returns true, for each row in the partition and
1012 batch/epoch, method ``process(row)`` is called.
1013
1014 ....... Method ``close(errorOrNull)`` is called with error (if any) seen while
1015 processing rows.
1016
1017 Important points to note:
1018
1019 * The `partitionId` and `epochId` can be used to deduplicate generated data when
1020 failures cause reprocessing of some input data. This depends on the execution
1021 mode of the query. If the streaming query is being executed in the micro-batch
1022 mode, then every partition represented by a unique tuple (partition_id, epoch_id)
1023 is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used
1024 to deduplicate and/or transactionally commit data and achieve exactly-once
1025 guarantees. However, if the streaming query is being executed in the continuous
1026 mode, then this guarantee does not hold and therefore should not be used for
1027 deduplication.
1028
1029 * The ``close()`` method (if exists) will be called if `open()` method exists and
1030 returns successfully (irrespective of the return value), except if the Python
1031 crashes in the middle.
1032
1033 .. note:: Evolving.
1034
1035 >>> # Print every row using a function
1036 >>> def print_row(row):
1037 ... print(row)
1038 ...
1039 >>> writer = sdf.writeStream.foreach(print_row)
1040 >>> # Print every row using a object with process() method
1041 >>> class RowPrinter:
1042 ... def open(self, partition_id, epoch_id):
1043 ... print("Opened %d, %d" % (partition_id, epoch_id))
1044 ... return True
1045 ... def process(self, row):
1046 ... print(row)
1047 ... def close(self, error):
1048 ... print("Closed with error: %s" % str(error))
1049 ...
1050 >>> writer = sdf.writeStream.foreach(RowPrinter())
1051 """
1052
1053 from pyspark.rdd import _wrap_function
1054 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
1055 from pyspark.taskcontext import TaskContext
1056
1057 if callable(f):
1058
1059
1060
1061 def func_without_process(_, iterator):
1062 for x in iterator:
1063 f(x)
1064 return iter([])
1065
1066 func = func_without_process
1067
1068 else:
1069
1070
1071
1072
1073 if not hasattr(f, 'process'):
1074 raise Exception("Provided object does not have a 'process' method")
1075
1076 if not callable(getattr(f, 'process')):
1077 raise Exception("Attribute 'process' in provided object is not callable")
1078
1079 def doesMethodExist(method_name):
1080 exists = hasattr(f, method_name)
1081 if exists and not callable(getattr(f, method_name)):
1082 raise Exception(
1083 "Attribute '%s' in provided object is not callable" % method_name)
1084 return exists
1085
1086 open_exists = doesMethodExist('open')
1087 close_exists = doesMethodExist('close')
1088
1089 def func_with_open_process_close(partition_id, iterator):
1090 epoch_id = TaskContext.get().getLocalProperty('streaming.sql.batchId')
1091 if epoch_id:
1092 epoch_id = int(epoch_id)
1093 else:
1094 raise Exception("Could not get batch id from TaskContext")
1095
1096
1097 should_process = True
1098 if open_exists:
1099 should_process = f.open(partition_id, epoch_id)
1100
1101 error = None
1102
1103 try:
1104 if should_process:
1105 for x in iterator:
1106 f.process(x)
1107 except Exception as ex:
1108 error = ex
1109 finally:
1110 if close_exists:
1111 f.close(error)
1112 if error:
1113 raise error
1114
1115 return iter([])
1116
1117 func = func_with_open_process_close
1118
1119 serializer = AutoBatchedSerializer(PickleSerializer())
1120 wrapped_func = _wrap_function(self._spark._sc, func, serializer, serializer)
1121 jForeachWriter = \
1122 self._spark._sc._jvm.org.apache.spark.sql.execution.python.PythonForeachWriter(
1123 wrapped_func, self._df._jdf.schema())
1124 self._jwrite.foreach(jForeachWriter)
1125 return self
1126
1127 @since(2.4)
1128 def foreachBatch(self, func):
1129 """
1130 Sets the output of the streaming query to be processed using the provided
1131 function. This is supported only the in the micro-batch execution modes (that is, when the
1132 trigger is not continuous). In every micro-batch, the provided function will be called in
1133 every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier.
1134 The batchId can be used deduplicate and transactionally write the output
1135 (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed
1136 to exactly same for the same batchId (assuming all operations are deterministic in the
1137 query).
1138
1139 .. note:: Evolving.
1140
1141 >>> def func(batch_df, batch_id):
1142 ... batch_df.collect()
1143 ...
1144 >>> writer = sdf.writeStream.foreachBatch(func)
1145 """
1146
1147 from pyspark.java_gateway import ensure_callback_server_started
1148 gw = self._spark._sc._gateway
1149 java_import(gw.jvm, "org.apache.spark.sql.execution.streaming.sources.*")
1150
1151 wrapped_func = ForeachBatchFunction(self._spark, func)
1152 gw.jvm.PythonForeachBatchHelper.callForeachBatch(self._jwrite, wrapped_func)
1153 ensure_callback_server_started(gw)
1154 return self
1155
1156 @ignore_unicode_prefix
1157 @since(2.0)
1158 def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None,
1159 **options):
1160 """Streams the contents of the :class:`DataFrame` to a data source.
1161
1162 The data source is specified by the ``format`` and a set of ``options``.
1163 If ``format`` is not specified, the default data source configured by
1164 ``spark.sql.sources.default`` will be used.
1165
1166 .. note:: Evolving.
1167
1168 :param path: the path in a Hadoop supported file system
1169 :param format: the format used to save
1170 :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a
1171 streaming sink.
1172
1173 * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to the
1174 sink
1175 * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the
1176 sink every time these is some updates
1177 * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be
1178 written to the sink every time there are some updates. If the query doesn't contain
1179 aggregations, it will be equivalent to `append` mode.
1180 :param partitionBy: names of partitioning columns
1181 :param queryName: unique name for the query
1182 :param options: All other string options. You may want to provide a `checkpointLocation`
1183 for most streams, however it is not required for a `memory` stream.
1184
1185 >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
1186 >>> sq.isActive
1187 True
1188 >>> sq.name
1189 u'this_query'
1190 >>> sq.stop()
1191 >>> sq.isActive
1192 False
1193 >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
1194 ... queryName='that_query', outputMode="append", format='memory')
1195 >>> sq.name
1196 u'that_query'
1197 >>> sq.isActive
1198 True
1199 >>> sq.stop()
1200 """
1201 self.options(**options)
1202 if outputMode is not None:
1203 self.outputMode(outputMode)
1204 if partitionBy is not None:
1205 self.partitionBy(partitionBy)
1206 if format is not None:
1207 self.format(format)
1208 if queryName is not None:
1209 self.queryName(queryName)
1210 if path is None:
1211 return self._sq(self._jwrite.start())
1212 else:
1213 return self._sq(self._jwrite.start(path))
1214
1215
1216 def _test():
1217 import doctest
1218 import os
1219 import tempfile
1220 from pyspark.sql import Row, SparkSession, SQLContext
1221 import pyspark.sql.streaming
1222
1223 os.chdir(os.environ["SPARK_HOME"])
1224
1225 globs = pyspark.sql.streaming.__dict__.copy()
1226 try:
1227 spark = SparkSession.builder.getOrCreate()
1228 except py4j.protocol.Py4JError:
1229 spark = SparkSession(sc)
1230
1231 globs['tempfile'] = tempfile
1232 globs['os'] = os
1233 globs['spark'] = spark
1234 globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
1235 globs['sdf'] = \
1236 spark.readStream.format('text').load('python/test_support/sql/streaming')
1237 globs['sdf_schema'] = StructType([StructField("data", StringType(), True)])
1238 globs['df'] = \
1239 globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
1240
1241 (failure_count, test_count) = doctest.testmod(
1242 pyspark.sql.streaming, globs=globs,
1243 optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
1244 globs['spark'].stop()
1245
1246 if failure_count:
1247 sys.exit(-1)
1248
1249
1250 if __name__ == "__main__":
1251 _test()