0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import sys
0019 import operator
0020 import time
0021 from itertools import chain
0022 from datetime import datetime
0023
0024 if sys.version < "3":
0025 from itertools import imap as map, ifilter as filter
0026 else:
0027 long = int
0028
0029 from py4j.protocol import Py4JJavaError
0030
0031 from pyspark import RDD
0032 from pyspark.storagelevel import StorageLevel
0033 from pyspark.streaming.util import rddToFileName, TransformFunction
0034 from pyspark.rdd import portable_hash
0035 from pyspark.resultiterable import ResultIterable
0036
0037 __all__ = ["DStream"]
0038
0039
0040 class DStream(object):
0041 """
0042 A Discretized Stream (DStream), the basic abstraction in Spark Streaming,
0043 is a continuous sequence of RDDs (of the same type) representing a
0044 continuous stream of data (see :class:`RDD` in the Spark core documentation
0045 for more details on RDDs).
0046
0047 DStreams can either be created from live data (such as, data from TCP
0048 sockets, etc.) using a :class:`StreamingContext` or it can be
0049 generated by transforming existing DStreams using operations such as
0050 `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming
0051 program is running, each DStream periodically generates a RDD, either
0052 from live data or by transforming the RDD generated by a parent DStream.
0053
0054 DStreams internally is characterized by a few basic properties:
0055 - A list of other DStreams that the DStream depends on
0056 - A time interval at which the DStream generates an RDD
0057 - A function that is used to generate an RDD after each time interval
0058 """
0059 def __init__(self, jdstream, ssc, jrdd_deserializer):
0060 self._jdstream = jdstream
0061 self._ssc = ssc
0062 self._sc = ssc._sc
0063 self._jrdd_deserializer = jrdd_deserializer
0064 self.is_cached = False
0065 self.is_checkpointed = False
0066
0067 def context(self):
0068 """
0069 Return the StreamingContext associated with this DStream
0070 """
0071 return self._ssc
0072
0073 def count(self):
0074 """
0075 Return a new DStream in which each RDD has a single element
0076 generated by counting each RDD of this DStream.
0077 """
0078 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).reduce(operator.add)
0079
0080 def filter(self, f):
0081 """
0082 Return a new DStream containing only the elements that satisfy predicate.
0083 """
0084 def func(iterator):
0085 return filter(f, iterator)
0086 return self.mapPartitions(func, True)
0087
0088 def flatMap(self, f, preservesPartitioning=False):
0089 """
0090 Return a new DStream by applying a function to all elements of
0091 this DStream, and then flattening the results
0092 """
0093 def func(s, iterator):
0094 return chain.from_iterable(map(f, iterator))
0095 return self.mapPartitionsWithIndex(func, preservesPartitioning)
0096
0097 def map(self, f, preservesPartitioning=False):
0098 """
0099 Return a new DStream by applying a function to each element of DStream.
0100 """
0101 def func(iterator):
0102 return map(f, iterator)
0103 return self.mapPartitions(func, preservesPartitioning)
0104
0105 def mapPartitions(self, f, preservesPartitioning=False):
0106 """
0107 Return a new DStream in which each RDD is generated by applying
0108 mapPartitions() to each RDDs of this DStream.
0109 """
0110 def func(s, iterator):
0111 return f(iterator)
0112 return self.mapPartitionsWithIndex(func, preservesPartitioning)
0113
0114 def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
0115 """
0116 Return a new DStream in which each RDD is generated by applying
0117 mapPartitionsWithIndex() to each RDDs of this DStream.
0118 """
0119 return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, preservesPartitioning))
0120
0121 def reduce(self, func):
0122 """
0123 Return a new DStream in which each RDD has a single element
0124 generated by reducing each RDD of this DStream.
0125 """
0126 return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1])
0127
0128 def reduceByKey(self, func, numPartitions=None):
0129 """
0130 Return a new DStream by applying reduceByKey to each RDD.
0131 """
0132 if numPartitions is None:
0133 numPartitions = self._sc.defaultParallelism
0134 return self.combineByKey(lambda x: x, func, func, numPartitions)
0135
0136 def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
0137 numPartitions=None):
0138 """
0139 Return a new DStream by applying combineByKey to each RDD.
0140 """
0141 if numPartitions is None:
0142 numPartitions = self._sc.defaultParallelism
0143
0144 def func(rdd):
0145 return rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions)
0146 return self.transform(func)
0147
0148 def partitionBy(self, numPartitions, partitionFunc=portable_hash):
0149 """
0150 Return a copy of the DStream in which each RDD are partitioned
0151 using the specified partitioner.
0152 """
0153 return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc))
0154
0155 def foreachRDD(self, func):
0156 """
0157 Apply a function to each RDD in this DStream.
0158 """
0159 if func.__code__.co_argcount == 1:
0160 old_func = func
0161 func = lambda t, rdd: old_func(rdd)
0162 jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
0163 api = self._ssc._jvm.PythonDStream
0164 api.callForeachRDD(self._jdstream, jfunc)
0165
0166 def pprint(self, num=10):
0167 """
0168 Print the first num elements of each RDD generated in this DStream.
0169
0170 :param num: the number of elements from the first will be printed.
0171 """
0172 def takeAndPrint(time, rdd):
0173 taken = rdd.take(num + 1)
0174 print("-------------------------------------------")
0175 print("Time: %s" % time)
0176 print("-------------------------------------------")
0177 for record in taken[:num]:
0178 print(record)
0179 if len(taken) > num:
0180 print("...")
0181 print("")
0182
0183 self.foreachRDD(takeAndPrint)
0184
0185 def mapValues(self, f):
0186 """
0187 Return a new DStream by applying a map function to the value of
0188 each key-value pairs in this DStream without changing the key.
0189 """
0190 map_values_fn = lambda kv: (kv[0], f(kv[1]))
0191 return self.map(map_values_fn, preservesPartitioning=True)
0192
0193 def flatMapValues(self, f):
0194 """
0195 Return a new DStream by applying a flatmap function to the value
0196 of each key-value pairs in this DStream without changing the key.
0197 """
0198 flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
0199 return self.flatMap(flat_map_fn, preservesPartitioning=True)
0200
0201 def glom(self):
0202 """
0203 Return a new DStream in which RDD is generated by applying glom()
0204 to RDD of this DStream.
0205 """
0206 def func(iterator):
0207 yield list(iterator)
0208 return self.mapPartitions(func)
0209
0210 def cache(self):
0211 """
0212 Persist the RDDs of this DStream with the default storage level
0213 (`MEMORY_ONLY`).
0214 """
0215 self.is_cached = True
0216 self.persist(StorageLevel.MEMORY_ONLY)
0217 return self
0218
0219 def persist(self, storageLevel):
0220 """
0221 Persist the RDDs of this DStream with the given storage level
0222 """
0223 self.is_cached = True
0224 javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
0225 self._jdstream.persist(javaStorageLevel)
0226 return self
0227
0228 def checkpoint(self, interval):
0229 """
0230 Enable periodic checkpointing of RDDs of this DStream
0231
0232 :param interval: time in seconds, after each period of that, generated
0233 RDD will be checkpointed
0234 """
0235 self.is_checkpointed = True
0236 self._jdstream.checkpoint(self._ssc._jduration(interval))
0237 return self
0238
0239 def groupByKey(self, numPartitions=None):
0240 """
0241 Return a new DStream by applying groupByKey on each RDD.
0242 """
0243 if numPartitions is None:
0244 numPartitions = self._sc.defaultParallelism
0245 return self.transform(lambda rdd: rdd.groupByKey(numPartitions))
0246
0247 def countByValue(self):
0248 """
0249 Return a new DStream in which each RDD contains the counts of each
0250 distinct value in each RDD of this DStream.
0251 """
0252 return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
0253
0254 def saveAsTextFiles(self, prefix, suffix=None):
0255 """
0256 Save each RDD in this DStream as at text file, using string
0257 representation of elements.
0258 """
0259 def saveAsTextFile(t, rdd):
0260 path = rddToFileName(prefix, suffix, t)
0261 try:
0262 rdd.saveAsTextFile(path)
0263 except Py4JJavaError as e:
0264
0265
0266 if 'FileAlreadyExistsException' not in str(e):
0267 raise
0268 return self.foreachRDD(saveAsTextFile)
0269
0270
0271
0272
0273
0274
0275
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287 def transform(self, func):
0288 """
0289 Return a new DStream in which each RDD is generated by applying a function
0290 on each RDD of this DStream.
0291
0292 `func` can have one argument of `rdd`, or have two arguments of
0293 (`time`, `rdd`)
0294 """
0295 if func.__code__.co_argcount == 1:
0296 oldfunc = func
0297 func = lambda t, rdd: oldfunc(rdd)
0298 assert func.__code__.co_argcount == 2, "func should take one or two arguments"
0299 return TransformedDStream(self, func)
0300
0301 def transformWith(self, func, other, keepSerializer=False):
0302 """
0303 Return a new DStream in which each RDD is generated by applying a function
0304 on each RDD of this DStream and 'other' DStream.
0305
0306 `func` can have two arguments of (`rdd_a`, `rdd_b`) or have three
0307 arguments of (`time`, `rdd_a`, `rdd_b`)
0308 """
0309 if func.__code__.co_argcount == 2:
0310 oldfunc = func
0311 func = lambda t, a, b: oldfunc(a, b)
0312 assert func.__code__.co_argcount == 3, "func should take two or three arguments"
0313 jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, other._jrdd_deserializer)
0314 dstream = self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(),
0315 other._jdstream.dstream(), jfunc)
0316 jrdd_serializer = self._jrdd_deserializer if keepSerializer else self._sc.serializer
0317 return DStream(dstream.asJavaDStream(), self._ssc, jrdd_serializer)
0318
0319 def repartition(self, numPartitions):
0320 """
0321 Return a new DStream with an increased or decreased level of parallelism.
0322 """
0323 return self.transform(lambda rdd: rdd.repartition(numPartitions))
0324
0325 @property
0326 def _slideDuration(self):
0327 """
0328 Return the slideDuration in seconds of this DStream
0329 """
0330 return self._jdstream.dstream().slideDuration().milliseconds() / 1000.0
0331
0332 def union(self, other):
0333 """
0334 Return a new DStream by unifying data of another DStream with this DStream.
0335
0336 :param other: Another DStream having the same interval (i.e., slideDuration)
0337 as this DStream.
0338 """
0339 if self._slideDuration != other._slideDuration:
0340 raise ValueError("the two DStream should have same slide duration")
0341 return self.transformWith(lambda a, b: a.union(b), other, True)
0342
0343 def cogroup(self, other, numPartitions=None):
0344 """
0345 Return a new DStream by applying 'cogroup' between RDDs of this
0346 DStream and `other` DStream.
0347
0348 Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
0349 """
0350 if numPartitions is None:
0351 numPartitions = self._sc.defaultParallelism
0352 return self.transformWith(lambda a, b: a.cogroup(b, numPartitions), other)
0353
0354 def join(self, other, numPartitions=None):
0355 """
0356 Return a new DStream by applying 'join' between RDDs of this DStream and
0357 `other` DStream.
0358
0359 Hash partitioning is used to generate the RDDs with `numPartitions`
0360 partitions.
0361 """
0362 if numPartitions is None:
0363 numPartitions = self._sc.defaultParallelism
0364 return self.transformWith(lambda a, b: a.join(b, numPartitions), other)
0365
0366 def leftOuterJoin(self, other, numPartitions=None):
0367 """
0368 Return a new DStream by applying 'left outer join' between RDDs of this DStream and
0369 `other` DStream.
0370
0371 Hash partitioning is used to generate the RDDs with `numPartitions`
0372 partitions.
0373 """
0374 if numPartitions is None:
0375 numPartitions = self._sc.defaultParallelism
0376 return self.transformWith(lambda a, b: a.leftOuterJoin(b, numPartitions), other)
0377
0378 def rightOuterJoin(self, other, numPartitions=None):
0379 """
0380 Return a new DStream by applying 'right outer join' between RDDs of this DStream and
0381 `other` DStream.
0382
0383 Hash partitioning is used to generate the RDDs with `numPartitions`
0384 partitions.
0385 """
0386 if numPartitions is None:
0387 numPartitions = self._sc.defaultParallelism
0388 return self.transformWith(lambda a, b: a.rightOuterJoin(b, numPartitions), other)
0389
0390 def fullOuterJoin(self, other, numPartitions=None):
0391 """
0392 Return a new DStream by applying 'full outer join' between RDDs of this DStream and
0393 `other` DStream.
0394
0395 Hash partitioning is used to generate the RDDs with `numPartitions`
0396 partitions.
0397 """
0398 if numPartitions is None:
0399 numPartitions = self._sc.defaultParallelism
0400 return self.transformWith(lambda a, b: a.fullOuterJoin(b, numPartitions), other)
0401
0402 def _jtime(self, timestamp):
0403 """ Convert datetime or unix_timestamp into Time
0404 """
0405 if isinstance(timestamp, datetime):
0406 timestamp = time.mktime(timestamp.timetuple())
0407 return self._sc._jvm.Time(long(timestamp * 1000))
0408
0409 def slice(self, begin, end):
0410 """
0411 Return all the RDDs between 'begin' to 'end' (both included)
0412
0413 `begin`, `end` could be datetime.datetime() or unix_timestamp
0414 """
0415 jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
0416 return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
0417
0418 def _validate_window_param(self, window, slide):
0419 duration = self._jdstream.dstream().slideDuration().milliseconds()
0420 if int(window * 1000) % duration != 0:
0421 raise ValueError("windowDuration must be multiple of the slide duration (%d ms)"
0422 % duration)
0423 if slide and int(slide * 1000) % duration != 0:
0424 raise ValueError("slideDuration must be multiple of the slide duration (%d ms)"
0425 % duration)
0426
0427 def window(self, windowDuration, slideDuration=None):
0428 """
0429 Return a new DStream in which each RDD contains all the elements in seen in a
0430 sliding window of time over this DStream.
0431
0432 :param windowDuration: width of the window; must be a multiple of this DStream's
0433 batching interval
0434 :param slideDuration: sliding interval of the window (i.e., the interval after which
0435 the new DStream will generate RDDs); must be a multiple of this
0436 DStream's batching interval
0437 """
0438 self._validate_window_param(windowDuration, slideDuration)
0439 d = self._ssc._jduration(windowDuration)
0440 if slideDuration is None:
0441 return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
0442 s = self._ssc._jduration(slideDuration)
0443 return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)
0444
0445 def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
0446 """
0447 Return a new DStream in which each RDD has a single element generated by reducing all
0448 elements in a sliding window over this DStream.
0449
0450 if `invReduceFunc` is not None, the reduction is done incrementally
0451 using the old window's reduced value :
0452
0453 1. reduce the new values that entered the window (e.g., adding new counts)
0454
0455 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
0456 This is more efficient than `invReduceFunc` is None.
0457
0458 :param reduceFunc: associative and commutative reduce function
0459 :param invReduceFunc: inverse reduce function of `reduceFunc`; such that for all y,
0460 and invertible x:
0461 `invReduceFunc(reduceFunc(x, y), x) = y`
0462 :param windowDuration: width of the window; must be a multiple of this DStream's
0463 batching interval
0464 :param slideDuration: sliding interval of the window (i.e., the interval after which
0465 the new DStream will generate RDDs); must be a multiple of this
0466 DStream's batching interval
0467 """
0468 keyed = self.map(lambda x: (1, x))
0469 reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
0470 windowDuration, slideDuration, 1)
0471 return reduced.map(lambda kv: kv[1])
0472
0473 def countByWindow(self, windowDuration, slideDuration):
0474 """
0475 Return a new DStream in which each RDD has a single element generated
0476 by counting the number of elements in a window over this DStream.
0477 windowDuration and slideDuration are as defined in the window() operation.
0478
0479 This is equivalent to window(windowDuration, slideDuration).count(),
0480 but will be more efficient if window is large.
0481 """
0482 return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub,
0483 windowDuration, slideDuration)
0484
0485 def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
0486 """
0487 Return a new DStream in which each RDD contains the count of distinct elements in
0488 RDDs in a sliding window over this DStream.
0489
0490 :param windowDuration: width of the window; must be a multiple of this DStream's
0491 batching interval
0492 :param slideDuration: sliding interval of the window (i.e., the interval after which
0493 the new DStream will generate RDDs); must be a multiple of this
0494 DStream's batching interval
0495 :param numPartitions: number of partitions of each RDD in the new DStream.
0496 """
0497 keyed = self.map(lambda x: (x, 1))
0498 counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
0499 windowDuration, slideDuration, numPartitions)
0500 return counted.filter(lambda kv: kv[1] > 0)
0501
0502 def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None):
0503 """
0504 Return a new DStream by applying `groupByKey` over a sliding window.
0505 Similar to `DStream.groupByKey()`, but applies it over a sliding window.
0506
0507 :param windowDuration: width of the window; must be a multiple of this DStream's
0508 batching interval
0509 :param slideDuration: sliding interval of the window (i.e., the interval after which
0510 the new DStream will generate RDDs); must be a multiple of this
0511 DStream's batching interval
0512 :param numPartitions: Number of partitions of each RDD in the new DStream.
0513 """
0514 ls = self.mapValues(lambda x: [x])
0515 grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):],
0516 windowDuration, slideDuration, numPartitions)
0517 return grouped.mapValues(ResultIterable)
0518
0519 def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None,
0520 numPartitions=None, filterFunc=None):
0521 """
0522 Return a new DStream by applying incremental `reduceByKey` over a sliding window.
0523
0524 The reduced value of over a new window is calculated using the old window's reduce value :
0525 1. reduce the new values that entered the window (e.g., adding new counts)
0526 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
0527
0528 `invFunc` can be None, then it will reduce all the RDDs in window, could be slower
0529 than having `invFunc`.
0530
0531 :param func: associative and commutative reduce function
0532 :param invFunc: inverse function of `reduceFunc`
0533 :param windowDuration: width of the window; must be a multiple of this DStream's
0534 batching interval
0535 :param slideDuration: sliding interval of the window (i.e., the interval after which
0536 the new DStream will generate RDDs); must be a multiple of this
0537 DStream's batching interval
0538 :param numPartitions: number of partitions of each RDD in the new DStream.
0539 :param filterFunc: function to filter expired key-value pairs;
0540 only pairs that satisfy the function are retained
0541 set this to null if you do not want to filter
0542 """
0543 self._validate_window_param(windowDuration, slideDuration)
0544 if numPartitions is None:
0545 numPartitions = self._sc.defaultParallelism
0546
0547 reduced = self.reduceByKey(func, numPartitions)
0548
0549 if invFunc:
0550 def reduceFunc(t, a, b):
0551 b = b.reduceByKey(func, numPartitions)
0552 r = a.union(b).reduceByKey(func, numPartitions) if a else b
0553 if filterFunc:
0554 r = r.filter(filterFunc)
0555 return r
0556
0557 def invReduceFunc(t, a, b):
0558 b = b.reduceByKey(func, numPartitions)
0559 joined = a.leftOuterJoin(b, numPartitions)
0560 return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
0561 if kv[1] is not None else kv[0])
0562
0563 jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
0564 jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer)
0565 if slideDuration is None:
0566 slideDuration = self._slideDuration
0567 dstream = self._sc._jvm.PythonReducedWindowedDStream(
0568 reduced._jdstream.dstream(),
0569 jreduceFunc, jinvReduceFunc,
0570 self._ssc._jduration(windowDuration),
0571 self._ssc._jduration(slideDuration))
0572 return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
0573 else:
0574 return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions)
0575
0576 def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None):
0577 """
0578 Return a new "state" DStream where the state for each key is updated by applying
0579 the given function on the previous state of the key and the new values of the key.
0580
0581 :param updateFunc: State update function. If this function returns None, then
0582 corresponding state key-value pair will be eliminated.
0583 """
0584 if numPartitions is None:
0585 numPartitions = self._sc.defaultParallelism
0586
0587 if initialRDD and not isinstance(initialRDD, RDD):
0588 initialRDD = self._sc.parallelize(initialRDD)
0589
0590 def reduceFunc(t, a, b):
0591 if a is None:
0592 g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None))
0593 else:
0594 g = a.cogroup(b.partitionBy(numPartitions), numPartitions)
0595 g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None))
0596 state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1]))
0597 return state.filter(lambda k_v: k_v[1] is not None)
0598
0599 jreduceFunc = TransformFunction(self._sc, reduceFunc,
0600 self._sc.serializer, self._jrdd_deserializer)
0601 if initialRDD:
0602 initialRDD = initialRDD._reserialize(self._jrdd_deserializer)
0603 dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc,
0604 initialRDD._jrdd)
0605 else:
0606 dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc)
0607
0608 return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
0609
0610
0611 class TransformedDStream(DStream):
0612 """
0613 TransformedDStream is a DStream generated by an Python function
0614 transforming each RDD of a DStream to another RDDs.
0615
0616 Multiple continuous transformations of DStream can be combined into
0617 one transformation.
0618 """
0619 def __init__(self, prev, func):
0620 self._ssc = prev._ssc
0621 self._sc = self._ssc._sc
0622 self._jrdd_deserializer = self._sc.serializer
0623 self.is_cached = False
0624 self.is_checkpointed = False
0625 self._jdstream_val = None
0626
0627
0628
0629 if (type(prev) is TransformedDStream and
0630 not prev.is_cached and not prev.is_checkpointed):
0631 prev_func = prev.func
0632 self.func = lambda t, rdd: func(t, prev_func(t, rdd))
0633 self.prev = prev.prev
0634 else:
0635 self.prev = prev
0636 self.func = func
0637
0638 @property
0639 def _jdstream(self):
0640 if self._jdstream_val is not None:
0641 return self._jdstream_val
0642
0643 jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer)
0644 dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
0645 self._jdstream_val = dstream.asJavaDStream()
0646 return self._jdstream_val