0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 import operator
0018 import os
0019 import shutil
0020 import tempfile
0021 import time
0022 import unittest
0023 from functools import reduce
0024 from itertools import chain
0025 import platform
0026
0027 from pyspark import SparkConf, SparkContext, RDD
0028 from pyspark.streaming import StreamingContext
0029 from pyspark.testing.streamingutils import PySparkStreamingTestCase
0030
0031
0032 @unittest.skipIf(
0033 "pypy" in platform.python_implementation().lower() and "COVERAGE_PROCESS_START" in os.environ,
0034 "PyPy implementation causes to hang DStream tests forever when Coverage report is used.")
0035 class BasicOperationTests(PySparkStreamingTestCase):
0036
0037 def test_map(self):
0038 """Basic operation test for DStream.map."""
0039 input = [range(1, 5), range(5, 9), range(9, 13)]
0040
0041 def func(dstream):
0042 return dstream.map(str)
0043 expected = [list(map(str, x)) for x in input]
0044 self._test_func(input, func, expected)
0045
0046 def test_flatMap(self):
0047 """Basic operation test for DStream.flatMap."""
0048 input = [range(1, 5), range(5, 9), range(9, 13)]
0049
0050 def func(dstream):
0051 return dstream.flatMap(lambda x: (x, x * 2))
0052 expected = [list(chain.from_iterable((map(lambda y: [y, y * 2], x))))
0053 for x in input]
0054 self._test_func(input, func, expected)
0055
0056 def test_filter(self):
0057 """Basic operation test for DStream.filter."""
0058 input = [range(1, 5), range(5, 9), range(9, 13)]
0059
0060 def func(dstream):
0061 return dstream.filter(lambda x: x % 2 == 0)
0062 expected = [[y for y in x if y % 2 == 0] for x in input]
0063 self._test_func(input, func, expected)
0064
0065 def test_count(self):
0066 """Basic operation test for DStream.count."""
0067 input = [range(5), range(10), range(20)]
0068
0069 def func(dstream):
0070 return dstream.count()
0071 expected = [[len(x)] for x in input]
0072 self._test_func(input, func, expected)
0073
0074 def test_slice(self):
0075 """Basic operation test for DStream.slice."""
0076 import datetime as dt
0077 self.ssc = StreamingContext(self.sc, 1.0)
0078 self.ssc.remember(4.0)
0079 input = [[1], [2], [3], [4]]
0080 stream = self.ssc.queueStream([self.sc.parallelize(d, 1) for d in input])
0081
0082 time_vals = []
0083
0084 def get_times(t, rdd):
0085 if rdd and len(time_vals) < len(input):
0086 time_vals.append(t)
0087
0088 stream.foreachRDD(get_times)
0089
0090 self.ssc.start()
0091 self.wait_for(time_vals, 4)
0092 begin_time = time_vals[0]
0093
0094 def get_sliced(begin_delta, end_delta):
0095 begin = begin_time + dt.timedelta(seconds=begin_delta)
0096 end = begin_time + dt.timedelta(seconds=end_delta)
0097 rdds = stream.slice(begin, end)
0098 result_list = [rdd.collect() for rdd in rdds]
0099 return [r for result in result_list for r in result]
0100
0101 self.assertEqual(set([1]), set(get_sliced(0, 0)))
0102 self.assertEqual(set([2, 3]), set(get_sliced(1, 2)))
0103 self.assertEqual(set([2, 3, 4]), set(get_sliced(1, 4)))
0104 self.assertEqual(set([1, 2, 3, 4]), set(get_sliced(0, 4)))
0105
0106 def test_reduce(self):
0107 """Basic operation test for DStream.reduce."""
0108 input = [range(1, 5), range(5, 9), range(9, 13)]
0109
0110 def func(dstream):
0111 return dstream.reduce(operator.add)
0112 expected = [[reduce(operator.add, x)] for x in input]
0113 self._test_func(input, func, expected)
0114
0115 def test_reduceByKey(self):
0116 """Basic operation test for DStream.reduceByKey."""
0117 input = [[("a", 1), ("a", 1), ("b", 1), ("b", 1)],
0118 [("", 1), ("", 1), ("", 1), ("", 1)],
0119 [(1, 1), (1, 1), (2, 1), (2, 1), (3, 1)]]
0120
0121 def func(dstream):
0122 return dstream.reduceByKey(operator.add)
0123 expected = [[("a", 2), ("b", 2)], [("", 4)], [(1, 2), (2, 2), (3, 1)]]
0124 self._test_func(input, func, expected, sort=True)
0125
0126 def test_mapValues(self):
0127 """Basic operation test for DStream.mapValues."""
0128 input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
0129 [(0, 4), (1, 1), (2, 2), (3, 3)],
0130 [(1, 1), (2, 1), (3, 1), (4, 1)]]
0131
0132 def func(dstream):
0133 return dstream.mapValues(lambda x: x + 10)
0134 expected = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)],
0135 [(0, 14), (1, 11), (2, 12), (3, 13)],
0136 [(1, 11), (2, 11), (3, 11), (4, 11)]]
0137 self._test_func(input, func, expected, sort=True)
0138
0139 def test_flatMapValues(self):
0140 """Basic operation test for DStream.flatMapValues."""
0141 input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
0142 [(0, 4), (1, 1), (2, 1), (3, 1)],
0143 [(1, 1), (2, 1), (3, 1), (4, 1)]]
0144
0145 def func(dstream):
0146 return dstream.flatMapValues(lambda x: (x, x + 10))
0147 expected = [[("a", 2), ("a", 12), ("b", 2), ("b", 12),
0148 ("c", 1), ("c", 11), ("d", 1), ("d", 11)],
0149 [(0, 4), (0, 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)],
0150 [(1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11), (4, 1), (4, 11)]]
0151 self._test_func(input, func, expected)
0152
0153 def test_glom(self):
0154 """Basic operation test for DStream.glom."""
0155 input = [range(1, 5), range(5, 9), range(9, 13)]
0156 rdds = [self.sc.parallelize(r, 2) for r in input]
0157
0158 def func(dstream):
0159 return dstream.glom()
0160 expected = [[[1, 2], [3, 4]], [[5, 6], [7, 8]], [[9, 10], [11, 12]]]
0161 self._test_func(rdds, func, expected)
0162
0163 def test_mapPartitions(self):
0164 """Basic operation test for DStream.mapPartitions."""
0165 input = [range(1, 5), range(5, 9), range(9, 13)]
0166 rdds = [self.sc.parallelize(r, 2) for r in input]
0167
0168 def func(dstream):
0169 def f(iterator):
0170 yield sum(iterator)
0171 return dstream.mapPartitions(f)
0172 expected = [[3, 7], [11, 15], [19, 23]]
0173 self._test_func(rdds, func, expected)
0174
0175 def test_countByValue(self):
0176 """Basic operation test for DStream.countByValue."""
0177 input = [list(range(1, 5)) * 2, list(range(5, 7)) + list(range(5, 9)), ["a", "a", "b", ""]]
0178
0179 def func(dstream):
0180 return dstream.countByValue()
0181 expected = [[(1, 2), (2, 2), (3, 2), (4, 2)],
0182 [(5, 2), (6, 2), (7, 1), (8, 1)],
0183 [("a", 2), ("b", 1), ("", 1)]]
0184 self._test_func(input, func, expected, sort=True)
0185
0186 def test_groupByKey(self):
0187 """Basic operation test for DStream.groupByKey."""
0188 input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
0189 [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
0190 [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]
0191
0192 def func(dstream):
0193 return dstream.groupByKey().mapValues(list)
0194
0195 expected = [[(1, [1]), (2, [1]), (3, [1]), (4, [1])],
0196 [(1, [1, 1, 1]), (2, [1, 1]), (3, [1])],
0197 [("a", [1, 1]), ("b", [1]), ("", [1, 1, 1])]]
0198 self._test_func(input, func, expected, sort=True)
0199
0200 def test_combineByKey(self):
0201 """Basic operation test for DStream.combineByKey."""
0202 input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
0203 [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
0204 [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]
0205
0206 def func(dstream):
0207 def add(a, b):
0208 return a + str(b)
0209 return dstream.combineByKey(str, add, add)
0210 expected = [[(1, "1"), (2, "1"), (3, "1"), (4, "1")],
0211 [(1, "111"), (2, "11"), (3, "1")],
0212 [("a", "11"), ("b", "1"), ("", "111")]]
0213 self._test_func(input, func, expected, sort=True)
0214
0215 def test_repartition(self):
0216 input = [range(1, 5), range(5, 9)]
0217 rdds = [self.sc.parallelize(r, 2) for r in input]
0218
0219 def func(dstream):
0220 return dstream.repartition(1).glom()
0221 expected = [[[1, 2, 3, 4]], [[5, 6, 7, 8]]]
0222 self._test_func(rdds, func, expected)
0223
0224 def test_union(self):
0225 input1 = [range(3), range(5), range(6)]
0226 input2 = [range(3, 6), range(5, 6)]
0227
0228 def func(d1, d2):
0229 return d1.union(d2)
0230
0231 expected = [list(range(6)), list(range(6)), list(range(6))]
0232 self._test_func(input1, func, expected, input2=input2)
0233
0234 def test_cogroup(self):
0235 input = [[(1, 1), (2, 1), (3, 1)],
0236 [(1, 1), (1, 1), (1, 1), (2, 1)],
0237 [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1)]]
0238 input2 = [[(1, 2)],
0239 [(4, 1)],
0240 [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 2)]]
0241
0242 def func(d1, d2):
0243 return d1.cogroup(d2).mapValues(lambda vs: tuple(map(list, vs)))
0244
0245 expected = [[(1, ([1], [2])), (2, ([1], [])), (3, ([1], []))],
0246 [(1, ([1, 1, 1], [])), (2, ([1], [])), (4, ([], [1]))],
0247 [("a", ([1, 1], [1, 1])), ("b", ([1], [1])), ("", ([1, 1], [1, 2]))]]
0248 self._test_func(input, func, expected, sort=True, input2=input2)
0249
0250 def test_join(self):
0251 input = [[('a', 1), ('b', 2)]]
0252 input2 = [[('b', 3), ('c', 4)]]
0253
0254 def func(a, b):
0255 return a.join(b)
0256
0257 expected = [[('b', (2, 3))]]
0258 self._test_func(input, func, expected, True, input2)
0259
0260 def test_left_outer_join(self):
0261 input = [[('a', 1), ('b', 2)]]
0262 input2 = [[('b', 3), ('c', 4)]]
0263
0264 def func(a, b):
0265 return a.leftOuterJoin(b)
0266
0267 expected = [[('a', (1, None)), ('b', (2, 3))]]
0268 self._test_func(input, func, expected, True, input2)
0269
0270 def test_right_outer_join(self):
0271 input = [[('a', 1), ('b', 2)]]
0272 input2 = [[('b', 3), ('c', 4)]]
0273
0274 def func(a, b):
0275 return a.rightOuterJoin(b)
0276
0277 expected = [[('b', (2, 3)), ('c', (None, 4))]]
0278 self._test_func(input, func, expected, True, input2)
0279
0280 def test_full_outer_join(self):
0281 input = [[('a', 1), ('b', 2)]]
0282 input2 = [[('b', 3), ('c', 4)]]
0283
0284 def func(a, b):
0285 return a.fullOuterJoin(b)
0286
0287 expected = [[('a', (1, None)), ('b', (2, 3)), ('c', (None, 4))]]
0288 self._test_func(input, func, expected, True, input2)
0289
0290 def test_update_state_by_key(self):
0291
0292 def updater(vs, s):
0293 if not s:
0294 s = []
0295 s.extend(vs)
0296 return s
0297
0298 input = [[('k', i)] for i in range(5)]
0299
0300 def func(dstream):
0301 return dstream.updateStateByKey(updater)
0302
0303 expected = [[0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4]]
0304 expected = [[('k', v)] for v in expected]
0305 self._test_func(input, func, expected)
0306
0307 def test_update_state_by_key_initial_rdd(self):
0308
0309 def updater(vs, s):
0310 if not s:
0311 s = []
0312 s.extend(vs)
0313 return s
0314
0315 initial = [('k', [0, 1])]
0316 initial = self.sc.parallelize(initial, 1)
0317
0318 input = [[('k', i)] for i in range(2, 5)]
0319
0320 def func(dstream):
0321 return dstream.updateStateByKey(updater, initialRDD=initial)
0322
0323 expected = [[0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4]]
0324 expected = [[('k', v)] for v in expected]
0325 self._test_func(input, func, expected)
0326
0327 def test_failed_func(self):
0328
0329
0330 input = [self.sc.parallelize([d], 1) for d in range(4)]
0331 input_stream = self.ssc.queueStream(input)
0332
0333 def failed_func(i):
0334 raise ValueError("This is a special error")
0335
0336 input_stream.map(failed_func).pprint()
0337 self.ssc.start()
0338 try:
0339 self.ssc.awaitTerminationOrTimeout(10)
0340 except:
0341 import traceback
0342 failure = traceback.format_exc()
0343 self.assertTrue("This is a special error" in failure)
0344 return
0345
0346 self.fail("a failed func should throw an error")
0347
0348 def test_failed_func2(self):
0349
0350
0351 input = [self.sc.parallelize([d], 1) for d in range(4)]
0352 input_stream1 = self.ssc.queueStream(input)
0353 input_stream2 = self.ssc.queueStream(input)
0354
0355 def failed_func(rdd1, rdd2):
0356 raise ValueError("This is a special error")
0357
0358 input_stream1.transformWith(failed_func, input_stream2, True).pprint()
0359 self.ssc.start()
0360 try:
0361 self.ssc.awaitTerminationOrTimeout(10)
0362 except:
0363 import traceback
0364 failure = traceback.format_exc()
0365 self.assertTrue("This is a special error" in failure)
0366 return
0367
0368 self.fail("a failed func should throw an error")
0369
0370 def test_failed_func_with_reseting_failure(self):
0371 input = [self.sc.parallelize([d], 1) for d in range(4)]
0372 input_stream = self.ssc.queueStream(input)
0373
0374 def failed_func(i):
0375 if i == 1:
0376
0377 raise ValueError("This is a special error")
0378 else:
0379 return i
0380
0381
0382
0383 expected = [[0], [2], [3]]
0384 self.assertEqual(expected, self._collect(input_stream.map(failed_func), 3))
0385 try:
0386 self.ssc.awaitTerminationOrTimeout(10)
0387 except:
0388 import traceback
0389 failure = traceback.format_exc()
0390 self.assertTrue("This is a special error" in failure)
0391 return
0392
0393 self.fail("a failed func should throw an error")
0394
0395
0396 @unittest.skipIf(
0397 "pypy" in platform.python_implementation().lower() and "COVERAGE_PROCESS_START" in os.environ,
0398 "PyPy implementation causes to hang DStream tests forever when Coverage report is used.")
0399 class WindowFunctionTests(PySparkStreamingTestCase):
0400
0401 timeout = 15
0402
0403 def test_window(self):
0404 input = [range(1), range(2), range(3), range(4), range(5)]
0405
0406 def func(dstream):
0407 return dstream.window(1.5, .5).count()
0408
0409 expected = [[1], [3], [6], [9], [12], [9], [5]]
0410 self._test_func(input, func, expected)
0411
0412 def test_count_by_window(self):
0413 input = [range(1), range(2), range(3), range(4), range(5)]
0414
0415 def func(dstream):
0416 return dstream.countByWindow(1.5, .5)
0417
0418 expected = [[1], [3], [6], [9], [12], [9], [5]]
0419 self._test_func(input, func, expected)
0420
0421 def test_count_by_window_large(self):
0422 input = [range(1), range(2), range(3), range(4), range(5), range(6)]
0423
0424 def func(dstream):
0425 return dstream.countByWindow(2.5, .5)
0426
0427 expected = [[1], [3], [6], [10], [15], [20], [18], [15], [11], [6]]
0428 self._test_func(input, func, expected)
0429
0430 def test_count_by_value_and_window(self):
0431 input = [range(1), range(2), range(3), range(4), range(5), range(6)]
0432
0433 def func(dstream):
0434 return dstream.countByValueAndWindow(2.5, .5)
0435
0436 expected = [[(0, 1)],
0437 [(0, 2), (1, 1)],
0438 [(0, 3), (1, 2), (2, 1)],
0439 [(0, 4), (1, 3), (2, 2), (3, 1)],
0440 [(0, 5), (1, 4), (2, 3), (3, 2), (4, 1)],
0441 [(0, 5), (1, 5), (2, 4), (3, 3), (4, 2), (5, 1)],
0442 [(0, 4), (1, 4), (2, 4), (3, 3), (4, 2), (5, 1)],
0443 [(0, 3), (1, 3), (2, 3), (3, 3), (4, 2), (5, 1)],
0444 [(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 1)],
0445 [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]]
0446 self._test_func(input, func, expected)
0447
0448 def test_group_by_key_and_window(self):
0449 input = [[('a', i)] for i in range(5)]
0450
0451 def func(dstream):
0452 return dstream.groupByKeyAndWindow(1.5, .5).mapValues(list)
0453
0454 expected = [[('a', [0])], [('a', [0, 1])], [('a', [0, 1, 2])], [('a', [1, 2, 3])],
0455 [('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]]
0456 self._test_func(input, func, expected)
0457
0458 def test_reduce_by_invalid_window(self):
0459 input1 = [range(3), range(5), range(1), range(6)]
0460 d1 = self.ssc.queueStream(input1)
0461 self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
0462 self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1))
0463
0464 def test_reduce_by_key_and_window_with_none_invFunc(self):
0465 input = [range(1), range(2), range(3), range(4), range(5), range(6)]
0466
0467 def func(dstream):
0468 return dstream.map(lambda x: (x, 1))\
0469 .reduceByKeyAndWindow(operator.add, None, 5, 1)\
0470 .filter(lambda kv: kv[1] > 0).count()
0471
0472 expected = [[2], [4], [6], [6], [6], [6]]
0473 self._test_func(input, func, expected)
0474
0475
0476 @unittest.skipIf(
0477 "pypy" in platform.python_implementation().lower() and "COVERAGE_PROCESS_START" in os.environ,
0478 "PyPy implementation causes to hang DStream tests forever when Coverage report is used.")
0479 class CheckpointTests(unittest.TestCase):
0480
0481 setupCalled = False
0482
0483 @staticmethod
0484 def tearDownClass():
0485
0486 if SparkContext._jvm is not None:
0487 jStreamingContextOption = \
0488 SparkContext._jvm.org.apache.spark.streaming.StreamingContext.getActive()
0489 if jStreamingContextOption.nonEmpty():
0490 jStreamingContextOption.get().stop()
0491
0492 def setUp(self):
0493 self.ssc = None
0494 self.sc = None
0495 self.cpd = None
0496
0497 def tearDown(self):
0498 if self.ssc is not None:
0499 self.ssc.stop(True)
0500 if self.sc is not None:
0501 self.sc.stop()
0502 if self.cpd is not None:
0503 shutil.rmtree(self.cpd)
0504
0505 def test_transform_function_serializer_failure(self):
0506 inputd = tempfile.mkdtemp()
0507 self.cpd = tempfile.mkdtemp("test_transform_function_serializer_failure")
0508
0509 def setup():
0510 conf = SparkConf().set("spark.default.parallelism", 1)
0511 sc = SparkContext(conf=conf)
0512 ssc = StreamingContext(sc, 0.5)
0513
0514
0515 def process(time, rdd):
0516 sc.parallelize(range(1, 10))
0517
0518 ssc.textFileStream(inputd).foreachRDD(process)
0519 return ssc
0520
0521 self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
0522 try:
0523 self.ssc.start()
0524 except:
0525 import traceback
0526 failure = traceback.format_exc()
0527 self.assertTrue(
0528 "It appears that you are attempting to reference SparkContext" in failure)
0529 return
0530
0531 self.fail("using SparkContext in process should fail because it's not Serializable")
0532
0533 def test_get_or_create_and_get_active_or_create(self):
0534 inputd = tempfile.mkdtemp()
0535 outputd = tempfile.mkdtemp() + "/"
0536
0537 def updater(vs, s):
0538 return sum(vs, s or 0)
0539
0540 def setup():
0541 conf = SparkConf().set("spark.default.parallelism", 1)
0542 sc = SparkContext(conf=conf)
0543 ssc = StreamingContext(sc, 2)
0544 dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1))
0545 wc = dstream.updateStateByKey(updater)
0546 wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
0547 wc.checkpoint(2)
0548 self.setupCalled = True
0549 return ssc
0550
0551
0552 self.cpd = tempfile.mkdtemp("test_streaming_cps")
0553 self.setupCalled = False
0554 self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
0555 self.assertTrue(self.setupCalled)
0556
0557 self.ssc.start()
0558
0559 def check_output(n):
0560 while not os.listdir(outputd):
0561 if self.ssc.awaitTerminationOrTimeout(0.5):
0562 raise Exception("ssc stopped")
0563 time.sleep(1)
0564 with open(os.path.join(inputd, str(n)), 'w') as f:
0565 f.writelines(["%d\n" % i for i in range(10)])
0566
0567 while True:
0568 if self.ssc.awaitTerminationOrTimeout(0.5):
0569 raise Exception("ssc stopped")
0570 p = os.path.join(outputd, max(os.listdir(outputd)))
0571 if '_SUCCESS' not in os.listdir(p):
0572
0573 continue
0574 ordd = self.ssc.sparkContext.textFile(p).map(lambda line: line.split(","))
0575 d = ordd.values().map(int).collect()
0576 if not d:
0577 continue
0578 self.assertEqual(10, len(d))
0579 s = set(d)
0580 self.assertEqual(1, len(s))
0581 m = s.pop()
0582 if n > m:
0583 continue
0584 self.assertEqual(n, m)
0585 break
0586
0587 check_output(1)
0588 check_output(2)
0589
0590
0591 self.ssc.stop(True, True)
0592 time.sleep(1)
0593 self.setupCalled = False
0594 self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
0595 self.assertFalse(self.setupCalled)
0596 self.ssc.start()
0597 check_output(3)
0598
0599
0600 self.ssc.stop(True, True)
0601 time.sleep(1)
0602 self.sc = SparkContext(conf=SparkConf())
0603 self.setupCalled = False
0604 self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
0605 self.assertFalse(self.setupCalled)
0606 self.assertTrue(self.ssc.sparkContext == self.sc)
0607
0608
0609 self.ssc.stop(True, True)
0610 time.sleep(1)
0611 self.setupCalled = False
0612 self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
0613 self.assertFalse(self.setupCalled)
0614 self.ssc.start()
0615 check_output(4)
0616
0617
0618 self.setupCalled = False
0619 self.assertEqual(StreamingContext.getActiveOrCreate(self.cpd, setup), self.ssc)
0620 self.assertFalse(self.setupCalled)
0621
0622
0623 self.ssc.stop(True, True)
0624 time.sleep(1)
0625 self.sc = SparkContext(conf=SparkConf())
0626 self.setupCalled = False
0627 self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
0628 self.assertFalse(self.setupCalled)
0629 self.assertTrue(self.ssc.sparkContext == self.sc)
0630
0631
0632 self.ssc.stop(True, True)
0633 shutil.rmtree(self.cpd)
0634 time.sleep(1)
0635 self.setupCalled = False
0636 self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
0637 self.assertTrue(self.setupCalled)
0638
0639
0640 self.ssc.stop(True, True)
0641
0642
0643 if __name__ == "__main__":
0644 from pyspark.streaming.tests.test_dstream import *
0645
0646 try:
0647 import xmlrunner
0648 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0649 except ImportError:
0650 testRunner = None
0651 unittest.main(testRunner=testRunner, verbosity=2)