Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import sys
0019 import json
0020 
0021 if sys.version >= '3':
0022     basestring = str
0023 
0024 from py4j.java_gateway import java_import
0025 
0026 from pyspark import since, keyword_only
0027 from pyspark.rdd import ignore_unicode_prefix
0028 from pyspark.sql.column import _to_seq
0029 from pyspark.sql.readwriter import OptionUtils, to_str
0030 from pyspark.sql.types import *
0031 from pyspark.sql.utils import ForeachBatchFunction, StreamingQueryException
0032 
0033 __all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
0034 
0035 
0036 class StreamingQuery(object):
0037     """
0038     A handle to a query that is executing continuously in the background as new data arrives.
0039     All these methods are thread-safe.
0040 
0041     .. note:: Evolving
0042 
0043     .. versionadded:: 2.0
0044     """
0045 
0046     def __init__(self, jsq):
0047         self._jsq = jsq
0048 
0049     @property
0050     @since(2.0)
0051     def id(self):
0052         """Returns the unique id of this query that persists across restarts from checkpoint data.
0053         That is, this id is generated when a query is started for the first time, and
0054         will be the same every time it is restarted from checkpoint data.
0055         There can only be one query with the same id active in a Spark cluster.
0056         Also see, `runId`.
0057         """
0058         return self._jsq.id().toString()
0059 
0060     @property
0061     @since(2.1)
0062     def runId(self):
0063         """Returns the unique id of this query that does not persist across restarts. That is, every
0064         query that is started (or restarted from checkpoint) will have a different runId.
0065         """
0066         return self._jsq.runId().toString()
0067 
0068     @property
0069     @since(2.0)
0070     def name(self):
0071         """Returns the user-specified name of the query, or null if not specified.
0072         This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
0073         as `dataframe.writeStream.queryName("query").start()`.
0074         This name, if set, must be unique across all active queries.
0075         """
0076         return self._jsq.name()
0077 
0078     @property
0079     @since(2.0)
0080     def isActive(self):
0081         """Whether this streaming query is currently active or not.
0082         """
0083         return self._jsq.isActive()
0084 
0085     @since(2.0)
0086     def awaitTermination(self, timeout=None):
0087         """Waits for the termination of `this` query, either by :func:`query.stop()` or by an
0088         exception. If the query has terminated with an exception, then the exception will be thrown.
0089         If `timeout` is set, it returns whether the query has terminated or not within the
0090         `timeout` seconds.
0091 
0092         If the query has terminated, then all subsequent calls to this method will either return
0093         immediately (if the query was terminated by :func:`stop()`), or throw the exception
0094         immediately (if the query has terminated with exception).
0095 
0096         throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
0097         """
0098         if timeout is not None:
0099             if not isinstance(timeout, (int, float)) or timeout < 0:
0100                 raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
0101             return self._jsq.awaitTermination(int(timeout * 1000))
0102         else:
0103             return self._jsq.awaitTermination()
0104 
0105     @property
0106     @since(2.1)
0107     def status(self):
0108         """
0109         Returns the current status of the query.
0110         """
0111         return json.loads(self._jsq.status().json())
0112 
0113     @property
0114     @since(2.1)
0115     def recentProgress(self):
0116         """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
0117         The number of progress updates retained for each stream is configured by Spark session
0118         configuration `spark.sql.streaming.numRecentProgressUpdates`.
0119         """
0120         return [json.loads(p.json()) for p in self._jsq.recentProgress()]
0121 
0122     @property
0123     @since(2.1)
0124     def lastProgress(self):
0125         """
0126         Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or
0127         None if there were no progress updates
0128 
0129         :return: a map
0130         """
0131         lastProgress = self._jsq.lastProgress()
0132         if lastProgress:
0133             return json.loads(lastProgress.json())
0134         else:
0135             return None
0136 
0137     @since(2.0)
0138     def processAllAvailable(self):
0139         """Blocks until all available data in the source has been processed and committed to the
0140         sink. This method is intended for testing.
0141 
0142         .. note:: In the case of continually arriving data, this method may block forever.
0143             Additionally, this method is only guaranteed to block until data that has been
0144             synchronously appended data to a stream source prior to invocation.
0145             (i.e. `getOffset` must immediately reflect the addition).
0146         """
0147         return self._jsq.processAllAvailable()
0148 
0149     @since(2.0)
0150     def stop(self):
0151         """Stop this streaming query.
0152         """
0153         self._jsq.stop()
0154 
0155     @since(2.1)
0156     def explain(self, extended=False):
0157         """Prints the (logical and physical) plans to the console for debugging purpose.
0158 
0159         :param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
0160 
0161         >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
0162         >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
0163         >>> sq.explain()
0164         == Physical Plan ==
0165         ...
0166         >>> sq.explain(True)
0167         == Parsed Logical Plan ==
0168         ...
0169         == Analyzed Logical Plan ==
0170         ...
0171         == Optimized Logical Plan ==
0172         ...
0173         == Physical Plan ==
0174         ...
0175         >>> sq.stop()
0176         """
0177         # Cannot call `_jsq.explain(...)` because it will print in the JVM process.
0178         # We should print it in the Python process.
0179         print(self._jsq.explainInternal(extended))
0180 
0181     @since(2.1)
0182     def exception(self):
0183         """
0184         :return: the StreamingQueryException if the query was terminated by an exception, or None.
0185         """
0186         if self._jsq.exception().isDefined():
0187             je = self._jsq.exception().get()
0188             msg = je.toString().split(': ', 1)[1]  # Drop the Java StreamingQueryException type info
0189             stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
0190             return StreamingQueryException(msg, stackTrace, je.getCause())
0191         else:
0192             return None
0193 
0194 
0195 class StreamingQueryManager(object):
0196     """A class to manage all the :class:`StreamingQuery` StreamingQueries active.
0197 
0198     .. note:: Evolving
0199 
0200     .. versionadded:: 2.0
0201     """
0202 
0203     def __init__(self, jsqm):
0204         self._jsqm = jsqm
0205 
0206     @property
0207     @ignore_unicode_prefix
0208     @since(2.0)
0209     def active(self):
0210         """Returns a list of active queries associated with this SQLContext
0211 
0212         >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
0213         >>> sqm = spark.streams
0214         >>> # get the list of active streaming queries
0215         >>> [q.name for q in sqm.active]
0216         [u'this_query']
0217         >>> sq.stop()
0218         """
0219         return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
0220 
0221     @ignore_unicode_prefix
0222     @since(2.0)
0223     def get(self, id):
0224         """Returns an active query from this SQLContext or throws exception if an active query
0225         with this name doesn't exist.
0226 
0227         >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
0228         >>> sq.name
0229         u'this_query'
0230         >>> sq = spark.streams.get(sq.id)
0231         >>> sq.isActive
0232         True
0233         >>> sq = sqlContext.streams.get(sq.id)
0234         >>> sq.isActive
0235         True
0236         >>> sq.stop()
0237         """
0238         return StreamingQuery(self._jsqm.get(id))
0239 
0240     @since(2.0)
0241     def awaitAnyTermination(self, timeout=None):
0242         """Wait until any of the queries on the associated SQLContext has terminated since the
0243         creation of the context, or since :func:`resetTerminated()` was called. If any query was
0244         terminated with an exception, then the exception will be thrown.
0245         If `timeout` is set, it returns whether the query has terminated or not within the
0246         `timeout` seconds.
0247 
0248         If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will
0249         either return immediately (if the query was terminated by :func:`query.stop()`),
0250         or throw the exception immediately (if the query was terminated with exception). Use
0251         :func:`resetTerminated()` to clear past terminations and wait for new terminations.
0252 
0253         In the case where multiple queries have terminated since :func:`resetTermination()`
0254         was called, if any query has terminated with exception, then :func:`awaitAnyTermination()`
0255         will throw any of the exception. For correctly documenting exceptions across multiple
0256         queries, users need to stop all of them after any of them terminates with exception, and
0257         then check the `query.exception()` for each query.
0258 
0259         throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
0260         """
0261         if timeout is not None:
0262             if not isinstance(timeout, (int, float)) or timeout < 0:
0263                 raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
0264             return self._jsqm.awaitAnyTermination(int(timeout * 1000))
0265         else:
0266             return self._jsqm.awaitAnyTermination()
0267 
0268     @since(2.0)
0269     def resetTerminated(self):
0270         """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used
0271         again to wait for new terminations.
0272 
0273         >>> spark.streams.resetTerminated()
0274         """
0275         self._jsqm.resetTerminated()
0276 
0277 
0278 class DataStreamReader(OptionUtils):
0279     """
0280     Interface used to load a streaming :class:`DataFrame <pyspark.sql.DataFrame>` from external
0281     storage systems (e.g. file systems, key-value stores, etc).
0282     Use :attr:`SparkSession.readStream <pyspark.sql.SparkSession.readStream>` to access this.
0283 
0284     .. note:: Evolving.
0285 
0286     .. versionadded:: 2.0
0287     """
0288 
0289     def __init__(self, spark):
0290         self._jreader = spark._ssql_ctx.readStream()
0291         self._spark = spark
0292 
0293     def _df(self, jdf):
0294         from pyspark.sql.dataframe import DataFrame
0295         return DataFrame(jdf, self._spark)
0296 
0297     @since(2.0)
0298     def format(self, source):
0299         """Specifies the input data source format.
0300 
0301         .. note:: Evolving.
0302 
0303         :param source: string, name of the data source, e.g. 'json', 'parquet'.
0304 
0305         >>> s = spark.readStream.format("text")
0306         """
0307         self._jreader = self._jreader.format(source)
0308         return self
0309 
0310     @since(2.0)
0311     def schema(self, schema):
0312         """Specifies the input schema.
0313 
0314         Some data sources (e.g. JSON) can infer the input schema automatically from data.
0315         By specifying the schema here, the underlying data source can skip the schema
0316         inference step, and thus speed up data loading.
0317 
0318         .. note:: Evolving.
0319 
0320         :param schema: a :class:`pyspark.sql.types.StructType` object or a DDL-formatted string
0321                        (For example ``col0 INT, col1 DOUBLE``).
0322 
0323         >>> s = spark.readStream.schema(sdf_schema)
0324         >>> s = spark.readStream.schema("col0 INT, col1 DOUBLE")
0325         """
0326         from pyspark.sql import SparkSession
0327         spark = SparkSession.builder.getOrCreate()
0328         if isinstance(schema, StructType):
0329             jschema = spark._jsparkSession.parseDataType(schema.json())
0330             self._jreader = self._jreader.schema(jschema)
0331         elif isinstance(schema, basestring):
0332             self._jreader = self._jreader.schema(schema)
0333         else:
0334             raise TypeError("schema should be StructType or string")
0335         return self
0336 
0337     @since(2.0)
0338     def option(self, key, value):
0339         """Adds an input option for the underlying data source.
0340 
0341         You can set the following option(s) for reading files:
0342             * ``timeZone``: sets the string that indicates a time zone ID to be used to parse
0343                 timestamps in the JSON/CSV datasources or partition values. The following
0344                 formats of `timeZone` are supported:
0345 
0346                 * Region-based zone ID: It should have the form 'area/city', such as \
0347                   'America/Los_Angeles'.
0348                 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0349                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0350 
0351                 Other short names like 'CST' are not recommended to use because they can be
0352                 ambiguous. If it isn't set, the current value of the SQL config
0353                 ``spark.sql.session.timeZone`` is used by default.
0354 
0355         .. note:: Evolving.
0356 
0357         >>> s = spark.readStream.option("x", 1)
0358         """
0359         self._jreader = self._jreader.option(key, to_str(value))
0360         return self
0361 
0362     @since(2.0)
0363     def options(self, **options):
0364         """Adds input options for the underlying data source.
0365 
0366         You can set the following option(s) for reading files:
0367             * ``timeZone``: sets the string that indicates a time zone ID to be used to parse
0368                 timestamps in the JSON/CSV datasources or partition values. The following
0369                 formats of `timeZone` are supported:
0370 
0371                 * Region-based zone ID: It should have the form 'area/city', such as \
0372                   'America/Los_Angeles'.
0373                 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0374                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0375 
0376                 Other short names like 'CST' are not recommended to use because they can be
0377                 ambiguous. If it isn't set, the current value of the SQL config
0378                 ``spark.sql.session.timeZone`` is used by default.
0379 
0380         .. note:: Evolving.
0381 
0382         >>> s = spark.readStream.options(x="1", y=2)
0383         """
0384         for k in options:
0385             self._jreader = self._jreader.option(k, to_str(options[k]))
0386         return self
0387 
0388     @since(2.0)
0389     def load(self, path=None, format=None, schema=None, **options):
0390         """Loads a data stream from a data source and returns it as a
0391         :class:`DataFrame <pyspark.sql.DataFrame>`.
0392 
0393         .. note:: Evolving.
0394 
0395         :param path: optional string for file-system backed data sources.
0396         :param format: optional string for format of the data source. Default to 'parquet'.
0397         :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema
0398                        or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0399         :param options: all other string options
0400 
0401         >>> json_sdf = spark.readStream.format("json") \\
0402         ...     .schema(sdf_schema) \\
0403         ...     .load(tempfile.mkdtemp())
0404         >>> json_sdf.isStreaming
0405         True
0406         >>> json_sdf.schema == sdf_schema
0407         True
0408         """
0409         if format is not None:
0410             self.format(format)
0411         if schema is not None:
0412             self.schema(schema)
0413         self.options(**options)
0414         if path is not None:
0415             if type(path) != str or len(path.strip()) == 0:
0416                 raise ValueError("If the path is provided for stream, it needs to be a " +
0417                                  "non-empty string. List of paths are not supported.")
0418             return self._df(self._jreader.load(path))
0419         else:
0420             return self._df(self._jreader.load())
0421 
0422     @since(2.0)
0423     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
0424              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
0425              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
0426              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
0427              multiLine=None,  allowUnquotedControlChars=None, lineSep=None, locale=None,
0428              dropFieldIfAllNull=None, encoding=None, pathGlobFilter=None,
0429              recursiveFileLookup=None):
0430         """
0431         Loads a JSON file stream and returns the results as a :class:`DataFrame`.
0432 
0433         `JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
0434         For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
0435 
0436         If the ``schema`` parameter is not specified, this function goes
0437         through the input once to determine the input schema.
0438 
0439         .. note:: Evolving.
0440 
0441         :param path: string represents path to the JSON dataset,
0442                      or RDD of Strings storing JSON objects.
0443         :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
0444                        or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0445         :param primitivesAsString: infers all primitive values as a string type. If None is set,
0446                                    it uses the default value, ``false``.
0447         :param prefersDecimal: infers all floating-point values as a decimal type. If the values
0448                                do not fit in decimal, then it infers them as doubles. If None is
0449                                set, it uses the default value, ``false``.
0450         :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
0451                               it uses the default value, ``false``.
0452         :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
0453                                         it uses the default value, ``false``.
0454         :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
0455                                         set, it uses the default value, ``true``.
0456         :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
0457                                         set, it uses the default value, ``false``.
0458         :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
0459                                                    using backslash quoting mechanism. If None is
0460                                                    set, it uses the default value, ``false``.
0461         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
0462                      set, it uses the default value, ``PERMISSIVE``.
0463 
0464                 * ``PERMISSIVE``: when it meets a corrupted record, puts the malformed string \
0465                   into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
0466                   fields to ``null``. To keep corrupt records, an user can set a string type \
0467                   field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
0468                   schema does not have the field, it drops corrupt records during parsing. \
0469                   When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
0470                   field in an output schema.
0471                 *  ``DROPMALFORMED``: ignores the whole corrupted records.
0472                 *  ``FAILFAST``: throws an exception when it meets corrupted records.
0473 
0474         :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
0475                                           created by ``PERMISSIVE`` mode. This overrides
0476                                           ``spark.sql.columnNameOfCorruptRecord``. If None is set,
0477                                           it uses the value specified in
0478                                           ``spark.sql.columnNameOfCorruptRecord``.
0479         :param dateFormat: sets the string that indicates a date format. Custom date formats
0480                            follow the formats at `datetime pattern`_.
0481                            This applies to date type. If None is set, it uses the
0482                            default value, ``yyyy-MM-dd``.
0483         :param timestampFormat: sets the string that indicates a timestamp format.
0484                                 Custom date formats follow the formats at `datetime pattern`_.
0485                                 This applies to timestamp type. If None is set, it uses the
0486                                 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0487         :param multiLine: parse one record, which may span multiple lines, per file. If None is
0488                           set, it uses the default value, ``false``.
0489         :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
0490                                           characters (ASCII characters with value less than 32,
0491                                           including tab and line feed characters) or not.
0492         :param lineSep: defines the line separator that should be used for parsing. If None is
0493                         set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0494         :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
0495                        it uses the default value, ``en-US``. For instance, ``locale`` is used while
0496                        parsing dates and timestamps.
0497         :param dropFieldIfAllNull: whether to ignore column of all null values or empty
0498                                    array/struct during schema inference. If None is set, it
0499                                    uses the default value, ``false``.
0500         :param encoding: allows to forcibly set one of standard basic or extended encoding for
0501                          the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
0502                          the encoding of input JSON will be detected automatically
0503                          when the multiLine option is set to ``true``.
0504         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0505                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0506                                It does not change the behavior of `partition discovery`_.
0507         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0508                                     disables `partition discovery`_.
0509 
0510         .. _partition discovery:
0511           https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
0512         .. _datetime pattern: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
0513 
0514         >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
0515         >>> json_sdf.isStreaming
0516         True
0517         >>> json_sdf.schema == sdf_schema
0518         True
0519         """
0520         self._set_opts(
0521             schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
0522             allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
0523             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
0524             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
0525             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
0526             timestampFormat=timestampFormat, multiLine=multiLine,
0527             allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, locale=locale,
0528             dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
0529             pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
0530         if isinstance(path, basestring):
0531             return self._df(self._jreader.json(path))
0532         else:
0533             raise TypeError("path can be only a single string")
0534 
0535     @since(2.3)
0536     def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
0537         """Loads a ORC file stream, returning the result as a :class:`DataFrame`.
0538 
0539         .. note:: Evolving.
0540 
0541         :param mergeSchema: sets whether we should merge schemas collected from all
0542                             ORC part-files. This will override ``spark.sql.orc.mergeSchema``.
0543                             The default value is specified in ``spark.sql.orc.mergeSchema``.
0544         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0545                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0546                                It does not change the behavior of `partition discovery`_.
0547         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0548             disables `partition discovery`_.
0549 
0550         >>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
0551         >>> orc_sdf.isStreaming
0552         True
0553         >>> orc_sdf.schema == sdf_schema
0554         True
0555         """
0556         self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
0557                        recursiveFileLookup=recursiveFileLookup)
0558         if isinstance(path, basestring):
0559             return self._df(self._jreader.orc(path))
0560         else:
0561             raise TypeError("path can be only a single string")
0562 
0563     @since(2.0)
0564     def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
0565         """
0566         Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
0567 
0568         .. note:: Evolving.
0569 
0570         :param mergeSchema: sets whether we should merge schemas collected from all
0571                             Parquet part-files. This will override
0572                             ``spark.sql.parquet.mergeSchema``. The default value is specified in
0573                             ``spark.sql.parquet.mergeSchema``.
0574         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0575                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0576                                It does not change the behavior of `partition discovery`_.
0577         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0578                                     disables `partition discovery`_.
0579 
0580         >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
0581         >>> parquet_sdf.isStreaming
0582         True
0583         >>> parquet_sdf.schema == sdf_schema
0584         True
0585         """
0586         self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
0587                        recursiveFileLookup=recursiveFileLookup)
0588         if isinstance(path, basestring):
0589             return self._df(self._jreader.parquet(path))
0590         else:
0591             raise TypeError("path can be only a single string")
0592 
0593     @ignore_unicode_prefix
0594     @since(2.0)
0595     def text(self, path, wholetext=False, lineSep=None, pathGlobFilter=None,
0596              recursiveFileLookup=None):
0597         """
0598         Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
0599         string column named "value", and followed by partitioned columns if there
0600         are any.
0601         The text files must be encoded as UTF-8.
0602 
0603         By default, each line in the text file is a new row in the resulting DataFrame.
0604 
0605         .. note:: Evolving.
0606 
0607         :param paths: string, or list of strings, for input path(s).
0608         :param wholetext: if true, read each file from input path(s) as a single row.
0609         :param lineSep: defines the line separator that should be used for parsing. If None is
0610                         set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0611         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0612                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0613                                It does not change the behavior of `partition discovery`_.
0614         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0615                                     disables `partition discovery`_.
0616 
0617         >>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
0618         >>> text_sdf.isStreaming
0619         True
0620         >>> "value" in str(text_sdf.schema)
0621         True
0622         """
0623         self._set_opts(
0624             wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
0625             recursiveFileLookup=recursiveFileLookup)
0626         if isinstance(path, basestring):
0627             return self._df(self._jreader.text(path))
0628         else:
0629             raise TypeError("path can be only a single string")
0630 
0631     @since(2.0)
0632     def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
0633             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
0634             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
0635             negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
0636             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
0637             columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
0638             enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
0639             pathGlobFilter=None, recursiveFileLookup=None):
0640         r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
0641 
0642         This function will go through the input once to determine the input schema if
0643         ``inferSchema`` is enabled. To avoid going through the entire data once, disable
0644         ``inferSchema`` option or specify the schema explicitly using ``schema``.
0645 
0646         .. note:: Evolving.
0647 
0648         :param path: string, or list of strings, for input path(s).
0649         :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
0650                        or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
0651         :param sep: sets a separator (one or more characters) for each field and value. If None is
0652                     set, it uses the default value, ``,``.
0653         :param encoding: decodes the CSV files by the given encoding type. If None is set,
0654                          it uses the default value, ``UTF-8``.
0655         :param quote: sets a single character used for escaping quoted values where the
0656                       separator can be part of the value. If None is set, it uses the default
0657                       value, ``"``. If you would like to turn off quotations, you need to set an
0658                       empty string.
0659         :param escape: sets a single character used for escaping quotes inside an already
0660                        quoted value. If None is set, it uses the default value, ``\``.
0661         :param comment: sets a single character used for skipping lines beginning with this
0662                         character. By default (None), it is disabled.
0663         :param header: uses the first line as names of columns. If None is set, it uses the
0664                        default value, ``false``.
0665         :param inferSchema: infers the input schema automatically from data. It requires one extra
0666                        pass over the data. If None is set, it uses the default value, ``false``.
0667         :param enforceSchema: If it is set to ``true``, the specified or inferred schema will be
0668                               forcibly applied to datasource files, and headers in CSV files will be
0669                               ignored. If the option is set to ``false``, the schema will be
0670                               validated against all headers in CSV files or the first header in RDD
0671                               if the ``header`` option is set to ``true``. Field names in the schema
0672                               and column names in CSV headers are checked by their positions
0673                               taking into account ``spark.sql.caseSensitive``. If None is set,
0674                               ``true`` is used by default. Though the default value is ``true``,
0675                               it is recommended to disable the ``enforceSchema`` option
0676                               to avoid incorrect results.
0677         :param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from
0678                                         values being read should be skipped. If None is set, it
0679                                         uses the default value, ``false``.
0680         :param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from
0681                                          values being read should be skipped. If None is set, it
0682                                          uses the default value, ``false``.
0683         :param nullValue: sets the string representation of a null value. If None is set, it uses
0684                           the default value, empty string. Since 2.0.1, this ``nullValue`` param
0685                           applies to all supported types including the string type.
0686         :param nanValue: sets the string representation of a non-number value. If None is set, it
0687                          uses the default value, ``NaN``.
0688         :param positiveInf: sets the string representation of a positive infinity value. If None
0689                             is set, it uses the default value, ``Inf``.
0690         :param negativeInf: sets the string representation of a negative infinity value. If None
0691                             is set, it uses the default value, ``Inf``.
0692         :param dateFormat: sets the string that indicates a date format. Custom date formats
0693                            follow the formats at `datetime pattern`_.
0694                            This applies to date type. If None is set, it uses the
0695                            default value, ``yyyy-MM-dd``.
0696         :param timestampFormat: sets the string that indicates a timestamp format.
0697                                 Custom date formats follow the formats at `datetime pattern`_.
0698                                 This applies to timestamp type. If None is set, it uses the
0699                                 default value, ``yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]``.
0700         :param maxColumns: defines a hard limit of how many columns a record can have. If None is
0701                            set, it uses the default value, ``20480``.
0702         :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
0703                                   value being read. If None is set, it uses the default value,
0704                                   ``-1`` meaning unlimited length.
0705         :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0.
0706                                             If specified, it is ignored.
0707         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
0708                      set, it uses the default value, ``PERMISSIVE``.
0709 
0710                 * ``PERMISSIVE``: when it meets a corrupted record, puts the malformed string \
0711                   into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
0712                   fields to ``null``. To keep corrupt records, an user can set a string type \
0713                   field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
0714                   schema does not have the field, it drops corrupt records during parsing. \
0715                   A record with less/more tokens than schema is not a corrupted record to CSV. \
0716                   When it meets a record having fewer tokens than the length of the schema, \
0717                   sets ``null`` to extra fields. When the record has more tokens than the \
0718                   length of the schema, it drops extra tokens.
0719                 * ``DROPMALFORMED``: ignores the whole corrupted records.
0720                 * ``FAILFAST``: throws an exception when it meets corrupted records.
0721 
0722         :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
0723                                           created by ``PERMISSIVE`` mode. This overrides
0724                                           ``spark.sql.columnNameOfCorruptRecord``. If None is set,
0725                                           it uses the value specified in
0726                                           ``spark.sql.columnNameOfCorruptRecord``.
0727         :param multiLine: parse one record, which may span multiple lines. If None is
0728                           set, it uses the default value, ``false``.
0729         :param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for
0730                                           the quote character. If None is set, the default value is
0731                                           escape character when escape and quote characters are
0732                                           different, ``\0`` otherwise..
0733         :param emptyValue: sets the string representation of an empty value. If None is set, it uses
0734                            the default value, empty string.
0735         :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
0736                        it uses the default value, ``en-US``. For instance, ``locale`` is used while
0737                        parsing dates and timestamps.
0738         :param lineSep: defines the line separator that should be used for parsing. If None is
0739                         set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
0740                         Maximum length is 1 character.
0741         :param pathGlobFilter: an optional glob pattern to only include files with paths matching
0742                                the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
0743                                It does not change the behavior of `partition discovery`_.
0744         :param recursiveFileLookup: recursively scan a directory for files. Using this option
0745                                     disables `partition discovery`_.
0746 
0747         >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
0748         >>> csv_sdf.isStreaming
0749         True
0750         >>> csv_sdf.schema == sdf_schema
0751         True
0752         """
0753         self._set_opts(
0754             schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
0755             header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
0756             ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
0757             nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
0758             dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
0759             maxCharsPerColumn=maxCharsPerColumn,
0760             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
0761             columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
0762             charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
0763             emptyValue=emptyValue, locale=locale, lineSep=lineSep,
0764             pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
0765         if isinstance(path, basestring):
0766             return self._df(self._jreader.csv(path))
0767         else:
0768             raise TypeError("path can be only a single string")
0769 
0770 
0771 class DataStreamWriter(object):
0772     """
0773     Interface used to write a streaming :class:`DataFrame <pyspark.sql.DataFrame>` to external
0774     storage systems (e.g. file systems, key-value stores, etc).
0775     Use :attr:`DataFrame.writeStream <pyspark.sql.DataFrame.writeStream>`
0776     to access this.
0777 
0778     .. note:: Evolving.
0779 
0780     .. versionadded:: 2.0
0781     """
0782 
0783     def __init__(self, df):
0784         self._df = df
0785         self._spark = df.sql_ctx
0786         self._jwrite = df._jdf.writeStream()
0787 
0788     def _sq(self, jsq):
0789         from pyspark.sql.streaming import StreamingQuery
0790         return StreamingQuery(jsq)
0791 
0792     @since(2.0)
0793     def outputMode(self, outputMode):
0794         """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
0795 
0796         Options include:
0797 
0798         * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to
0799            the sink
0800         * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the sink
0801            every time these is some updates
0802         * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be
0803            written to the sink every time there are some updates. If the query doesn't contain
0804            aggregations, it will be equivalent to `append` mode.
0805 
0806        .. note:: Evolving.
0807 
0808         >>> writer = sdf.writeStream.outputMode('append')
0809         """
0810         if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
0811             raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
0812         self._jwrite = self._jwrite.outputMode(outputMode)
0813         return self
0814 
0815     @since(2.0)
0816     def format(self, source):
0817         """Specifies the underlying output data source.
0818 
0819         .. note:: Evolving.
0820 
0821         :param source: string, name of the data source, which for now can be 'parquet'.
0822 
0823         >>> writer = sdf.writeStream.format('json')
0824         """
0825         self._jwrite = self._jwrite.format(source)
0826         return self
0827 
0828     @since(2.0)
0829     def option(self, key, value):
0830         """Adds an output option for the underlying data source.
0831 
0832         You can set the following option(s) for writing files:
0833             * ``timeZone``: sets the string that indicates a time zone ID to be used to format
0834                 timestamps in the JSON/CSV datasources or partition values. The following
0835                 formats of `timeZone` are supported:
0836 
0837                 * Region-based zone ID: It should have the form 'area/city', such as \
0838                   'America/Los_Angeles'.
0839                 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0840                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0841 
0842                 Other short names like 'CST' are not recommended to use because they can be
0843                 ambiguous. If it isn't set, the current value of the SQL config
0844                 ``spark.sql.session.timeZone`` is used by default.
0845 
0846         .. note:: Evolving.
0847         """
0848         self._jwrite = self._jwrite.option(key, to_str(value))
0849         return self
0850 
0851     @since(2.0)
0852     def options(self, **options):
0853         """Adds output options for the underlying data source.
0854 
0855         You can set the following option(s) for writing files:
0856             * ``timeZone``: sets the string that indicates a time zone ID to be used to format
0857                 timestamps in the JSON/CSV datasources or partition values. The following
0858                 formats of `timeZone` are supported:
0859 
0860                 * Region-based zone ID: It should have the form 'area/city', such as \
0861                   'America/Los_Angeles'.
0862                 * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or \
0863                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
0864 
0865                 Other short names like 'CST' are not recommended to use because they can be
0866                 ambiguous. If it isn't set, the current value of the SQL config
0867                 ``spark.sql.session.timeZone`` is used by default.
0868 
0869        .. note:: Evolving.
0870         """
0871         for k in options:
0872             self._jwrite = self._jwrite.option(k, to_str(options[k]))
0873         return self
0874 
0875     @since(2.0)
0876     def partitionBy(self, *cols):
0877         """Partitions the output by the given columns on the file system.
0878 
0879         If specified, the output is laid out on the file system similar
0880         to Hive's partitioning scheme.
0881 
0882         .. note:: Evolving.
0883 
0884         :param cols: name of columns
0885 
0886         """
0887         if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
0888             cols = cols[0]
0889         self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
0890         return self
0891 
0892     @since(2.0)
0893     def queryName(self, queryName):
0894         """Specifies the name of the :class:`StreamingQuery` that can be started with
0895         :func:`start`. This name must be unique among all the currently active queries
0896         in the associated SparkSession.
0897 
0898         .. note:: Evolving.
0899 
0900         :param queryName: unique name for the query
0901 
0902         >>> writer = sdf.writeStream.queryName('streaming_query')
0903         """
0904         if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
0905             raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
0906         self._jwrite = self._jwrite.queryName(queryName)
0907         return self
0908 
0909     @keyword_only
0910     @since(2.0)
0911     def trigger(self, processingTime=None, once=None, continuous=None):
0912         """Set the trigger for the stream query. If this is not set it will run the query as fast
0913         as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
0914 
0915         .. note:: Evolving.
0916 
0917         :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
0918                                Set a trigger that runs a microbatch query periodically based on the
0919                                processing time. Only one trigger can be set.
0920         :param once: if set to True, set a trigger that processes only one batch of data in a
0921                      streaming query then terminates the query. Only one trigger can be set.
0922         :param continuous: a time interval as a string, e.g. '5 seconds', '1 minute'.
0923                            Set a trigger that runs a continuous query with a given checkpoint
0924                            interval. Only one trigger can be set.
0925 
0926         >>> # trigger the query for execution every 5 seconds
0927         >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
0928         >>> # trigger the query for just once batch of data
0929         >>> writer = sdf.writeStream.trigger(once=True)
0930         >>> # trigger the query for execution every 5 seconds
0931         >>> writer = sdf.writeStream.trigger(continuous='5 seconds')
0932         """
0933         params = [processingTime, once, continuous]
0934 
0935         if params.count(None) == 3:
0936             raise ValueError('No trigger provided')
0937         elif params.count(None) < 2:
0938             raise ValueError('Multiple triggers not allowed.')
0939 
0940         jTrigger = None
0941         if processingTime is not None:
0942             if type(processingTime) != str or len(processingTime.strip()) == 0:
0943                 raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
0944                                  processingTime)
0945             interval = processingTime.strip()
0946             jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
0947                 interval)
0948 
0949         elif once is not None:
0950             if once is not True:
0951                 raise ValueError('Value for once must be True. Got: %s' % once)
0952             jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()
0953 
0954         else:
0955             if type(continuous) != str or len(continuous.strip()) == 0:
0956                 raise ValueError('Value for continuous must be a non empty string. Got: %s' %
0957                                  continuous)
0958             interval = continuous.strip()
0959             jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous(
0960                 interval)
0961 
0962         self._jwrite = self._jwrite.trigger(jTrigger)
0963         return self
0964 
0965     @since(2.4)
0966     def foreach(self, f):
0967         """
0968         Sets the output of the streaming query to be processed using the provided writer ``f``.
0969         This is often used to write the output of a streaming query to arbitrary storage systems.
0970         The processing logic can be specified in two ways.
0971 
0972         #. A **function** that takes a row as input.
0973             This is a simple way to express your processing logic. Note that this does
0974             not allow you to deduplicate generated data when failures cause reprocessing of
0975             some input data. That would require you to specify the processing logic in the next
0976             way.
0977 
0978         #. An **object** with a ``process`` method and optional ``open`` and ``close`` methods.
0979             The object can have the following methods.
0980 
0981             * ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing
0982                 (for example, open a connection, start a transaction, etc). Additionally, you can
0983                 use the `partition_id` and `epoch_id` to deduplicate regenerated data
0984                 (discussed later).
0985 
0986             * ``process(row)``: *Non-optional* method that processes each :class:`Row`.
0987 
0988             * ``close(error)``: *Optional* method that finalizes and cleans up (for example,
0989                 close connection, commit transaction, etc.) after all rows have been processed.
0990 
0991             The object will be used by Spark in the following way.
0992 
0993             * A single copy of this object is responsible of all the data generated by a
0994                 single task in a query. In other words, one instance is responsible for
0995                 processing one partition of the data generated in a distributed manner.
0996 
0997             * This object must be serializable because each task will get a fresh
0998                 serialized-deserialized copy of the provided object. Hence, it is strongly
0999                 recommended that any initialization for writing data (e.g. opening a
1000                 connection or starting a transaction) is done after the `open(...)`
1001                 method has been called, which signifies that the task is ready to generate data.
1002 
1003             * The lifecycle of the methods are as follows.
1004 
1005                 For each partition with ``partition_id``:
1006 
1007                 ... For each batch/epoch of streaming data with ``epoch_id``:
1008 
1009                 ....... Method ``open(partitionId, epochId)`` is called.
1010 
1011                 ....... If ``open(...)`` returns true, for each row in the partition and
1012                         batch/epoch, method ``process(row)`` is called.
1013 
1014                 ....... Method ``close(errorOrNull)`` is called with error (if any) seen while
1015                         processing rows.
1016 
1017             Important points to note:
1018 
1019             * The `partitionId` and `epochId` can be used to deduplicate generated data when
1020                 failures cause reprocessing of some input data. This depends on the execution
1021                 mode of the query. If the streaming query is being executed in the micro-batch
1022                 mode, then every partition represented by a unique tuple (partition_id, epoch_id)
1023                 is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used
1024                 to deduplicate and/or transactionally commit data and achieve exactly-once
1025                 guarantees. However, if the streaming query is being executed in the continuous
1026                 mode, then this guarantee does not hold and therefore should not be used for
1027                 deduplication.
1028 
1029             * The ``close()`` method (if exists) will be called if `open()` method exists and
1030                 returns successfully (irrespective of the return value), except if the Python
1031                 crashes in the middle.
1032 
1033         .. note:: Evolving.
1034 
1035         >>> # Print every row using a function
1036         >>> def print_row(row):
1037         ...     print(row)
1038         ...
1039         >>> writer = sdf.writeStream.foreach(print_row)
1040         >>> # Print every row using a object with process() method
1041         >>> class RowPrinter:
1042         ...     def open(self, partition_id, epoch_id):
1043         ...         print("Opened %d, %d" % (partition_id, epoch_id))
1044         ...         return True
1045         ...     def process(self, row):
1046         ...         print(row)
1047         ...     def close(self, error):
1048         ...         print("Closed with error: %s" % str(error))
1049         ...
1050         >>> writer = sdf.writeStream.foreach(RowPrinter())
1051         """
1052 
1053         from pyspark.rdd import _wrap_function
1054         from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
1055         from pyspark.taskcontext import TaskContext
1056 
1057         if callable(f):
1058             # The provided object is a callable function that is supposed to be called on each row.
1059             # Construct a function that takes an iterator and calls the provided function on each
1060             # row.
1061             def func_without_process(_, iterator):
1062                 for x in iterator:
1063                     f(x)
1064                 return iter([])
1065 
1066             func = func_without_process
1067 
1068         else:
1069             # The provided object is not a callable function. Then it is expected to have a
1070             # 'process(row)' method, and optional 'open(partition_id, epoch_id)' and
1071             # 'close(error)' methods.
1072 
1073             if not hasattr(f, 'process'):
1074                 raise Exception("Provided object does not have a 'process' method")
1075 
1076             if not callable(getattr(f, 'process')):
1077                 raise Exception("Attribute 'process' in provided object is not callable")
1078 
1079             def doesMethodExist(method_name):
1080                 exists = hasattr(f, method_name)
1081                 if exists and not callable(getattr(f, method_name)):
1082                     raise Exception(
1083                         "Attribute '%s' in provided object is not callable" % method_name)
1084                 return exists
1085 
1086             open_exists = doesMethodExist('open')
1087             close_exists = doesMethodExist('close')
1088 
1089             def func_with_open_process_close(partition_id, iterator):
1090                 epoch_id = TaskContext.get().getLocalProperty('streaming.sql.batchId')
1091                 if epoch_id:
1092                     epoch_id = int(epoch_id)
1093                 else:
1094                     raise Exception("Could not get batch id from TaskContext")
1095 
1096                 # Check if the data should be processed
1097                 should_process = True
1098                 if open_exists:
1099                     should_process = f.open(partition_id, epoch_id)
1100 
1101                 error = None
1102 
1103                 try:
1104                     if should_process:
1105                         for x in iterator:
1106                             f.process(x)
1107                 except Exception as ex:
1108                     error = ex
1109                 finally:
1110                     if close_exists:
1111                         f.close(error)
1112                     if error:
1113                         raise error
1114 
1115                 return iter([])
1116 
1117             func = func_with_open_process_close
1118 
1119         serializer = AutoBatchedSerializer(PickleSerializer())
1120         wrapped_func = _wrap_function(self._spark._sc, func, serializer, serializer)
1121         jForeachWriter = \
1122             self._spark._sc._jvm.org.apache.spark.sql.execution.python.PythonForeachWriter(
1123                 wrapped_func, self._df._jdf.schema())
1124         self._jwrite.foreach(jForeachWriter)
1125         return self
1126 
1127     @since(2.4)
1128     def foreachBatch(self, func):
1129         """
1130         Sets the output of the streaming query to be processed using the provided
1131         function. This is supported only the in the micro-batch execution modes (that is, when the
1132         trigger is not continuous). In every micro-batch, the provided function will be called in
1133         every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier.
1134         The batchId can be used deduplicate and transactionally write the output
1135         (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed
1136         to exactly same for the same batchId (assuming all operations are deterministic in the
1137         query).
1138 
1139         .. note:: Evolving.
1140 
1141         >>> def func(batch_df, batch_id):
1142         ...     batch_df.collect()
1143         ...
1144         >>> writer = sdf.writeStream.foreachBatch(func)
1145         """
1146 
1147         from pyspark.java_gateway import ensure_callback_server_started
1148         gw = self._spark._sc._gateway
1149         java_import(gw.jvm, "org.apache.spark.sql.execution.streaming.sources.*")
1150 
1151         wrapped_func = ForeachBatchFunction(self._spark, func)
1152         gw.jvm.PythonForeachBatchHelper.callForeachBatch(self._jwrite, wrapped_func)
1153         ensure_callback_server_started(gw)
1154         return self
1155 
1156     @ignore_unicode_prefix
1157     @since(2.0)
1158     def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None,
1159               **options):
1160         """Streams the contents of the :class:`DataFrame` to a data source.
1161 
1162         The data source is specified by the ``format`` and a set of ``options``.
1163         If ``format`` is not specified, the default data source configured by
1164         ``spark.sql.sources.default`` will be used.
1165 
1166         .. note:: Evolving.
1167 
1168         :param path: the path in a Hadoop supported file system
1169         :param format: the format used to save
1170         :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a
1171                            streaming sink.
1172 
1173             * `append`: Only the new rows in the streaming DataFrame/Dataset will be written to the
1174               sink
1175             * `complete`: All the rows in the streaming DataFrame/Dataset will be written to the
1176               sink every time these is some updates
1177             * `update`: only the rows that were updated in the streaming DataFrame/Dataset will be
1178               written to the sink every time there are some updates. If the query doesn't contain
1179               aggregations, it will be equivalent to `append` mode.
1180         :param partitionBy: names of partitioning columns
1181         :param queryName: unique name for the query
1182         :param options: All other string options. You may want to provide a `checkpointLocation`
1183                         for most streams, however it is not required for a `memory` stream.
1184 
1185         >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
1186         >>> sq.isActive
1187         True
1188         >>> sq.name
1189         u'this_query'
1190         >>> sq.stop()
1191         >>> sq.isActive
1192         False
1193         >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
1194         ...     queryName='that_query', outputMode="append", format='memory')
1195         >>> sq.name
1196         u'that_query'
1197         >>> sq.isActive
1198         True
1199         >>> sq.stop()
1200         """
1201         self.options(**options)
1202         if outputMode is not None:
1203             self.outputMode(outputMode)
1204         if partitionBy is not None:
1205             self.partitionBy(partitionBy)
1206         if format is not None:
1207             self.format(format)
1208         if queryName is not None:
1209             self.queryName(queryName)
1210         if path is None:
1211             return self._sq(self._jwrite.start())
1212         else:
1213             return self._sq(self._jwrite.start(path))
1214 
1215 
1216 def _test():
1217     import doctest
1218     import os
1219     import tempfile
1220     from pyspark.sql import Row, SparkSession, SQLContext
1221     import pyspark.sql.streaming
1222 
1223     os.chdir(os.environ["SPARK_HOME"])
1224 
1225     globs = pyspark.sql.streaming.__dict__.copy()
1226     try:
1227         spark = SparkSession.builder.getOrCreate()
1228     except py4j.protocol.Py4JError:
1229         spark = SparkSession(sc)
1230 
1231     globs['tempfile'] = tempfile
1232     globs['os'] = os
1233     globs['spark'] = spark
1234     globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
1235     globs['sdf'] = \
1236         spark.readStream.format('text').load('python/test_support/sql/streaming')
1237     globs['sdf_schema'] = StructType([StructField("data", StringType(), True)])
1238     globs['df'] = \
1239         globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
1240 
1241     (failure_count, test_count) = doctest.testmod(
1242         pyspark.sql.streaming, globs=globs,
1243         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
1244     globs['spark'].stop()
1245 
1246     if failure_count:
1247         sys.exit(-1)
1248 
1249 
1250 if __name__ == "__main__":
1251     _test()