Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 import operator
0018 import os
0019 import shutil
0020 import tempfile
0021 import time
0022 import unittest
0023 from functools import reduce
0024 from itertools import chain
0025 import platform
0026 
0027 from pyspark import SparkConf, SparkContext, RDD
0028 from pyspark.streaming import StreamingContext
0029 from pyspark.testing.streamingutils import PySparkStreamingTestCase
0030 
0031 
0032 @unittest.skipIf(
0033     "pypy" in platform.python_implementation().lower() and "COVERAGE_PROCESS_START" in os.environ,
0034     "PyPy implementation causes to hang DStream tests forever when Coverage report is used.")
0035 class BasicOperationTests(PySparkStreamingTestCase):
0036 
0037     def test_map(self):
0038         """Basic operation test for DStream.map."""
0039         input = [range(1, 5), range(5, 9), range(9, 13)]
0040 
0041         def func(dstream):
0042             return dstream.map(str)
0043         expected = [list(map(str, x)) for x in input]
0044         self._test_func(input, func, expected)
0045 
0046     def test_flatMap(self):
0047         """Basic operation test for DStream.flatMap."""
0048         input = [range(1, 5), range(5, 9), range(9, 13)]
0049 
0050         def func(dstream):
0051             return dstream.flatMap(lambda x: (x, x * 2))
0052         expected = [list(chain.from_iterable((map(lambda y: [y, y * 2], x))))
0053                     for x in input]
0054         self._test_func(input, func, expected)
0055 
0056     def test_filter(self):
0057         """Basic operation test for DStream.filter."""
0058         input = [range(1, 5), range(5, 9), range(9, 13)]
0059 
0060         def func(dstream):
0061             return dstream.filter(lambda x: x % 2 == 0)
0062         expected = [[y for y in x if y % 2 == 0] for x in input]
0063         self._test_func(input, func, expected)
0064 
0065     def test_count(self):
0066         """Basic operation test for DStream.count."""
0067         input = [range(5), range(10), range(20)]
0068 
0069         def func(dstream):
0070             return dstream.count()
0071         expected = [[len(x)] for x in input]
0072         self._test_func(input, func, expected)
0073 
0074     def test_slice(self):
0075         """Basic operation test for DStream.slice."""
0076         import datetime as dt
0077         self.ssc = StreamingContext(self.sc, 1.0)
0078         self.ssc.remember(4.0)
0079         input = [[1], [2], [3], [4]]
0080         stream = self.ssc.queueStream([self.sc.parallelize(d, 1) for d in input])
0081 
0082         time_vals = []
0083 
0084         def get_times(t, rdd):
0085             if rdd and len(time_vals) < len(input):
0086                 time_vals.append(t)
0087 
0088         stream.foreachRDD(get_times)
0089 
0090         self.ssc.start()
0091         self.wait_for(time_vals, 4)
0092         begin_time = time_vals[0]
0093 
0094         def get_sliced(begin_delta, end_delta):
0095             begin = begin_time + dt.timedelta(seconds=begin_delta)
0096             end = begin_time + dt.timedelta(seconds=end_delta)
0097             rdds = stream.slice(begin, end)
0098             result_list = [rdd.collect() for rdd in rdds]
0099             return [r for result in result_list for r in result]
0100 
0101         self.assertEqual(set([1]), set(get_sliced(0, 0)))
0102         self.assertEqual(set([2, 3]), set(get_sliced(1, 2)))
0103         self.assertEqual(set([2, 3, 4]), set(get_sliced(1, 4)))
0104         self.assertEqual(set([1, 2, 3, 4]), set(get_sliced(0, 4)))
0105 
0106     def test_reduce(self):
0107         """Basic operation test for DStream.reduce."""
0108         input = [range(1, 5), range(5, 9), range(9, 13)]
0109 
0110         def func(dstream):
0111             return dstream.reduce(operator.add)
0112         expected = [[reduce(operator.add, x)] for x in input]
0113         self._test_func(input, func, expected)
0114 
0115     def test_reduceByKey(self):
0116         """Basic operation test for DStream.reduceByKey."""
0117         input = [[("a", 1), ("a", 1), ("b", 1), ("b", 1)],
0118                  [("", 1), ("", 1), ("", 1), ("", 1)],
0119                  [(1, 1), (1, 1), (2, 1), (2, 1), (3, 1)]]
0120 
0121         def func(dstream):
0122             return dstream.reduceByKey(operator.add)
0123         expected = [[("a", 2), ("b", 2)], [("", 4)], [(1, 2), (2, 2), (3, 1)]]
0124         self._test_func(input, func, expected, sort=True)
0125 
0126     def test_mapValues(self):
0127         """Basic operation test for DStream.mapValues."""
0128         input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
0129                  [(0, 4), (1, 1), (2, 2), (3, 3)],
0130                  [(1, 1), (2, 1), (3, 1), (4, 1)]]
0131 
0132         def func(dstream):
0133             return dstream.mapValues(lambda x: x + 10)
0134         expected = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)],
0135                     [(0, 14), (1, 11), (2, 12), (3, 13)],
0136                     [(1, 11), (2, 11), (3, 11), (4, 11)]]
0137         self._test_func(input, func, expected, sort=True)
0138 
0139     def test_flatMapValues(self):
0140         """Basic operation test for DStream.flatMapValues."""
0141         input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
0142                  [(0, 4), (1, 1), (2, 1), (3, 1)],
0143                  [(1, 1), (2, 1), (3, 1), (4, 1)]]
0144 
0145         def func(dstream):
0146             return dstream.flatMapValues(lambda x: (x, x + 10))
0147         expected = [[("a", 2), ("a", 12), ("b", 2), ("b", 12),
0148                      ("c", 1), ("c", 11), ("d", 1), ("d", 11)],
0149                     [(0, 4), (0, 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)],
0150                     [(1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11), (4, 1), (4, 11)]]
0151         self._test_func(input, func, expected)
0152 
0153     def test_glom(self):
0154         """Basic operation test for DStream.glom."""
0155         input = [range(1, 5), range(5, 9), range(9, 13)]
0156         rdds = [self.sc.parallelize(r, 2) for r in input]
0157 
0158         def func(dstream):
0159             return dstream.glom()
0160         expected = [[[1, 2], [3, 4]], [[5, 6], [7, 8]], [[9, 10], [11, 12]]]
0161         self._test_func(rdds, func, expected)
0162 
0163     def test_mapPartitions(self):
0164         """Basic operation test for DStream.mapPartitions."""
0165         input = [range(1, 5), range(5, 9), range(9, 13)]
0166         rdds = [self.sc.parallelize(r, 2) for r in input]
0167 
0168         def func(dstream):
0169             def f(iterator):
0170                 yield sum(iterator)
0171             return dstream.mapPartitions(f)
0172         expected = [[3, 7], [11, 15], [19, 23]]
0173         self._test_func(rdds, func, expected)
0174 
0175     def test_countByValue(self):
0176         """Basic operation test for DStream.countByValue."""
0177         input = [list(range(1, 5)) * 2, list(range(5, 7)) + list(range(5, 9)), ["a", "a", "b", ""]]
0178 
0179         def func(dstream):
0180             return dstream.countByValue()
0181         expected = [[(1, 2), (2, 2), (3, 2), (4, 2)],
0182                     [(5, 2), (6, 2), (7, 1), (8, 1)],
0183                     [("a", 2), ("b", 1), ("", 1)]]
0184         self._test_func(input, func, expected, sort=True)
0185 
0186     def test_groupByKey(self):
0187         """Basic operation test for DStream.groupByKey."""
0188         input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
0189                  [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
0190                  [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]
0191 
0192         def func(dstream):
0193             return dstream.groupByKey().mapValues(list)
0194 
0195         expected = [[(1, [1]), (2, [1]), (3, [1]), (4, [1])],
0196                     [(1, [1, 1, 1]), (2, [1, 1]), (3, [1])],
0197                     [("a", [1, 1]), ("b", [1]), ("", [1, 1, 1])]]
0198         self._test_func(input, func, expected, sort=True)
0199 
0200     def test_combineByKey(self):
0201         """Basic operation test for DStream.combineByKey."""
0202         input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
0203                  [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
0204                  [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]
0205 
0206         def func(dstream):
0207             def add(a, b):
0208                 return a + str(b)
0209             return dstream.combineByKey(str, add, add)
0210         expected = [[(1, "1"), (2, "1"), (3, "1"), (4, "1")],
0211                     [(1, "111"), (2, "11"), (3, "1")],
0212                     [("a", "11"), ("b", "1"), ("", "111")]]
0213         self._test_func(input, func, expected, sort=True)
0214 
0215     def test_repartition(self):
0216         input = [range(1, 5), range(5, 9)]
0217         rdds = [self.sc.parallelize(r, 2) for r in input]
0218 
0219         def func(dstream):
0220             return dstream.repartition(1).glom()
0221         expected = [[[1, 2, 3, 4]], [[5, 6, 7, 8]]]
0222         self._test_func(rdds, func, expected)
0223 
0224     def test_union(self):
0225         input1 = [range(3), range(5), range(6)]
0226         input2 = [range(3, 6), range(5, 6)]
0227 
0228         def func(d1, d2):
0229             return d1.union(d2)
0230 
0231         expected = [list(range(6)), list(range(6)), list(range(6))]
0232         self._test_func(input1, func, expected, input2=input2)
0233 
0234     def test_cogroup(self):
0235         input = [[(1, 1), (2, 1), (3, 1)],
0236                  [(1, 1), (1, 1), (1, 1), (2, 1)],
0237                  [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1)]]
0238         input2 = [[(1, 2)],
0239                   [(4, 1)],
0240                   [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 2)]]
0241 
0242         def func(d1, d2):
0243             return d1.cogroup(d2).mapValues(lambda vs: tuple(map(list, vs)))
0244 
0245         expected = [[(1, ([1], [2])), (2, ([1], [])), (3, ([1], []))],
0246                     [(1, ([1, 1, 1], [])), (2, ([1], [])), (4, ([], [1]))],
0247                     [("a", ([1, 1], [1, 1])), ("b", ([1], [1])), ("", ([1, 1], [1, 2]))]]
0248         self._test_func(input, func, expected, sort=True, input2=input2)
0249 
0250     def test_join(self):
0251         input = [[('a', 1), ('b', 2)]]
0252         input2 = [[('b', 3), ('c', 4)]]
0253 
0254         def func(a, b):
0255             return a.join(b)
0256 
0257         expected = [[('b', (2, 3))]]
0258         self._test_func(input, func, expected, True, input2)
0259 
0260     def test_left_outer_join(self):
0261         input = [[('a', 1), ('b', 2)]]
0262         input2 = [[('b', 3), ('c', 4)]]
0263 
0264         def func(a, b):
0265             return a.leftOuterJoin(b)
0266 
0267         expected = [[('a', (1, None)), ('b', (2, 3))]]
0268         self._test_func(input, func, expected, True, input2)
0269 
0270     def test_right_outer_join(self):
0271         input = [[('a', 1), ('b', 2)]]
0272         input2 = [[('b', 3), ('c', 4)]]
0273 
0274         def func(a, b):
0275             return a.rightOuterJoin(b)
0276 
0277         expected = [[('b', (2, 3)), ('c', (None, 4))]]
0278         self._test_func(input, func, expected, True, input2)
0279 
0280     def test_full_outer_join(self):
0281         input = [[('a', 1), ('b', 2)]]
0282         input2 = [[('b', 3), ('c', 4)]]
0283 
0284         def func(a, b):
0285             return a.fullOuterJoin(b)
0286 
0287         expected = [[('a', (1, None)), ('b', (2, 3)), ('c', (None, 4))]]
0288         self._test_func(input, func, expected, True, input2)
0289 
0290     def test_update_state_by_key(self):
0291 
0292         def updater(vs, s):
0293             if not s:
0294                 s = []
0295             s.extend(vs)
0296             return s
0297 
0298         input = [[('k', i)] for i in range(5)]
0299 
0300         def func(dstream):
0301             return dstream.updateStateByKey(updater)
0302 
0303         expected = [[0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4]]
0304         expected = [[('k', v)] for v in expected]
0305         self._test_func(input, func, expected)
0306 
0307     def test_update_state_by_key_initial_rdd(self):
0308 
0309         def updater(vs, s):
0310             if not s:
0311                 s = []
0312             s.extend(vs)
0313             return s
0314 
0315         initial = [('k', [0, 1])]
0316         initial = self.sc.parallelize(initial, 1)
0317 
0318         input = [[('k', i)] for i in range(2, 5)]
0319 
0320         def func(dstream):
0321             return dstream.updateStateByKey(updater, initialRDD=initial)
0322 
0323         expected = [[0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4]]
0324         expected = [[('k', v)] for v in expected]
0325         self._test_func(input, func, expected)
0326 
0327     def test_failed_func(self):
0328         # Test failure in
0329         # TransformFunction.apply(rdd: Option[RDD[_]], time: Time)
0330         input = [self.sc.parallelize([d], 1) for d in range(4)]
0331         input_stream = self.ssc.queueStream(input)
0332 
0333         def failed_func(i):
0334             raise ValueError("This is a special error")
0335 
0336         input_stream.map(failed_func).pprint()
0337         self.ssc.start()
0338         try:
0339             self.ssc.awaitTerminationOrTimeout(10)
0340         except:
0341             import traceback
0342             failure = traceback.format_exc()
0343             self.assertTrue("This is a special error" in failure)
0344             return
0345 
0346         self.fail("a failed func should throw an error")
0347 
0348     def test_failed_func2(self):
0349         # Test failure in
0350         # TransformFunction.apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time)
0351         input = [self.sc.parallelize([d], 1) for d in range(4)]
0352         input_stream1 = self.ssc.queueStream(input)
0353         input_stream2 = self.ssc.queueStream(input)
0354 
0355         def failed_func(rdd1, rdd2):
0356             raise ValueError("This is a special error")
0357 
0358         input_stream1.transformWith(failed_func, input_stream2, True).pprint()
0359         self.ssc.start()
0360         try:
0361             self.ssc.awaitTerminationOrTimeout(10)
0362         except:
0363             import traceback
0364             failure = traceback.format_exc()
0365             self.assertTrue("This is a special error" in failure)
0366             return
0367 
0368         self.fail("a failed func should throw an error")
0369 
0370     def test_failed_func_with_reseting_failure(self):
0371         input = [self.sc.parallelize([d], 1) for d in range(4)]
0372         input_stream = self.ssc.queueStream(input)
0373 
0374         def failed_func(i):
0375             if i == 1:
0376                 # Make it fail in the second batch
0377                 raise ValueError("This is a special error")
0378             else:
0379                 return i
0380 
0381         # We should be able to see the results of the 3rd and 4th batches even if the second batch
0382         # fails
0383         expected = [[0], [2], [3]]
0384         self.assertEqual(expected, self._collect(input_stream.map(failed_func), 3))
0385         try:
0386             self.ssc.awaitTerminationOrTimeout(10)
0387         except:
0388             import traceback
0389             failure = traceback.format_exc()
0390             self.assertTrue("This is a special error" in failure)
0391             return
0392 
0393         self.fail("a failed func should throw an error")
0394 
0395 
0396 @unittest.skipIf(
0397     "pypy" in platform.python_implementation().lower() and "COVERAGE_PROCESS_START" in os.environ,
0398     "PyPy implementation causes to hang DStream tests forever when Coverage report is used.")
0399 class WindowFunctionTests(PySparkStreamingTestCase):
0400 
0401     timeout = 15
0402 
0403     def test_window(self):
0404         input = [range(1), range(2), range(3), range(4), range(5)]
0405 
0406         def func(dstream):
0407             return dstream.window(1.5, .5).count()
0408 
0409         expected = [[1], [3], [6], [9], [12], [9], [5]]
0410         self._test_func(input, func, expected)
0411 
0412     def test_count_by_window(self):
0413         input = [range(1), range(2), range(3), range(4), range(5)]
0414 
0415         def func(dstream):
0416             return dstream.countByWindow(1.5, .5)
0417 
0418         expected = [[1], [3], [6], [9], [12], [9], [5]]
0419         self._test_func(input, func, expected)
0420 
0421     def test_count_by_window_large(self):
0422         input = [range(1), range(2), range(3), range(4), range(5), range(6)]
0423 
0424         def func(dstream):
0425             return dstream.countByWindow(2.5, .5)
0426 
0427         expected = [[1], [3], [6], [10], [15], [20], [18], [15], [11], [6]]
0428         self._test_func(input, func, expected)
0429 
0430     def test_count_by_value_and_window(self):
0431         input = [range(1), range(2), range(3), range(4), range(5), range(6)]
0432 
0433         def func(dstream):
0434             return dstream.countByValueAndWindow(2.5, .5)
0435 
0436         expected = [[(0, 1)],
0437                     [(0, 2), (1, 1)],
0438                     [(0, 3), (1, 2), (2, 1)],
0439                     [(0, 4), (1, 3), (2, 2), (3, 1)],
0440                     [(0, 5), (1, 4), (2, 3), (3, 2), (4, 1)],
0441                     [(0, 5), (1, 5), (2, 4), (3, 3), (4, 2), (5, 1)],
0442                     [(0, 4), (1, 4), (2, 4), (3, 3), (4, 2), (5, 1)],
0443                     [(0, 3), (1, 3), (2, 3), (3, 3), (4, 2), (5, 1)],
0444                     [(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 1)],
0445                     [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]]
0446         self._test_func(input, func, expected)
0447 
0448     def test_group_by_key_and_window(self):
0449         input = [[('a', i)] for i in range(5)]
0450 
0451         def func(dstream):
0452             return dstream.groupByKeyAndWindow(1.5, .5).mapValues(list)
0453 
0454         expected = [[('a', [0])], [('a', [0, 1])], [('a', [0, 1, 2])], [('a', [1, 2, 3])],
0455                     [('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]]
0456         self._test_func(input, func, expected)
0457 
0458     def test_reduce_by_invalid_window(self):
0459         input1 = [range(3), range(5), range(1), range(6)]
0460         d1 = self.ssc.queueStream(input1)
0461         self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
0462         self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1))
0463 
0464     def test_reduce_by_key_and_window_with_none_invFunc(self):
0465         input = [range(1), range(2), range(3), range(4), range(5), range(6)]
0466 
0467         def func(dstream):
0468             return dstream.map(lambda x: (x, 1))\
0469                 .reduceByKeyAndWindow(operator.add, None, 5, 1)\
0470                 .filter(lambda kv: kv[1] > 0).count()
0471 
0472         expected = [[2], [4], [6], [6], [6], [6]]
0473         self._test_func(input, func, expected)
0474 
0475 
0476 @unittest.skipIf(
0477     "pypy" in platform.python_implementation().lower() and "COVERAGE_PROCESS_START" in os.environ,
0478     "PyPy implementation causes to hang DStream tests forever when Coverage report is used.")
0479 class CheckpointTests(unittest.TestCase):
0480 
0481     setupCalled = False
0482 
0483     @staticmethod
0484     def tearDownClass():
0485         # Clean up in the JVM just in case there has been some issues in Python API
0486         if SparkContext._jvm is not None:
0487             jStreamingContextOption = \
0488                 SparkContext._jvm.org.apache.spark.streaming.StreamingContext.getActive()
0489             if jStreamingContextOption.nonEmpty():
0490                 jStreamingContextOption.get().stop()
0491 
0492     def setUp(self):
0493         self.ssc = None
0494         self.sc = None
0495         self.cpd = None
0496 
0497     def tearDown(self):
0498         if self.ssc is not None:
0499             self.ssc.stop(True)
0500         if self.sc is not None:
0501             self.sc.stop()
0502         if self.cpd is not None:
0503             shutil.rmtree(self.cpd)
0504 
0505     def test_transform_function_serializer_failure(self):
0506         inputd = tempfile.mkdtemp()
0507         self.cpd = tempfile.mkdtemp("test_transform_function_serializer_failure")
0508 
0509         def setup():
0510             conf = SparkConf().set("spark.default.parallelism", 1)
0511             sc = SparkContext(conf=conf)
0512             ssc = StreamingContext(sc, 0.5)
0513 
0514             # A function that cannot be serialized
0515             def process(time, rdd):
0516                 sc.parallelize(range(1, 10))
0517 
0518             ssc.textFileStream(inputd).foreachRDD(process)
0519             return ssc
0520 
0521         self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
0522         try:
0523             self.ssc.start()
0524         except:
0525             import traceback
0526             failure = traceback.format_exc()
0527             self.assertTrue(
0528                 "It appears that you are attempting to reference SparkContext" in failure)
0529             return
0530 
0531         self.fail("using SparkContext in process should fail because it's not Serializable")
0532 
0533     def test_get_or_create_and_get_active_or_create(self):
0534         inputd = tempfile.mkdtemp()
0535         outputd = tempfile.mkdtemp() + "/"
0536 
0537         def updater(vs, s):
0538             return sum(vs, s or 0)
0539 
0540         def setup():
0541             conf = SparkConf().set("spark.default.parallelism", 1)
0542             sc = SparkContext(conf=conf)
0543             ssc = StreamingContext(sc, 2)
0544             dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1))
0545             wc = dstream.updateStateByKey(updater)
0546             wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
0547             wc.checkpoint(2)
0548             self.setupCalled = True
0549             return ssc
0550 
0551         # Verify that getOrCreate() calls setup() in absence of checkpoint files
0552         self.cpd = tempfile.mkdtemp("test_streaming_cps")
0553         self.setupCalled = False
0554         self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
0555         self.assertTrue(self.setupCalled)
0556 
0557         self.ssc.start()
0558 
0559         def check_output(n):
0560             while not os.listdir(outputd):
0561                 if self.ssc.awaitTerminationOrTimeout(0.5):
0562                     raise Exception("ssc stopped")
0563             time.sleep(1)  # make sure mtime is larger than the previous one
0564             with open(os.path.join(inputd, str(n)), 'w') as f:
0565                 f.writelines(["%d\n" % i for i in range(10)])
0566 
0567             while True:
0568                 if self.ssc.awaitTerminationOrTimeout(0.5):
0569                     raise Exception("ssc stopped")
0570                 p = os.path.join(outputd, max(os.listdir(outputd)))
0571                 if '_SUCCESS' not in os.listdir(p):
0572                     # not finished
0573                     continue
0574                 ordd = self.ssc.sparkContext.textFile(p).map(lambda line: line.split(","))
0575                 d = ordd.values().map(int).collect()
0576                 if not d:
0577                     continue
0578                 self.assertEqual(10, len(d))
0579                 s = set(d)
0580                 self.assertEqual(1, len(s))
0581                 m = s.pop()
0582                 if n > m:
0583                     continue
0584                 self.assertEqual(n, m)
0585                 break
0586 
0587         check_output(1)
0588         check_output(2)
0589 
0590         # Verify the getOrCreate() recovers from checkpoint files
0591         self.ssc.stop(True, True)
0592         time.sleep(1)
0593         self.setupCalled = False
0594         self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
0595         self.assertFalse(self.setupCalled)
0596         self.ssc.start()
0597         check_output(3)
0598 
0599         # Verify that getOrCreate() uses existing SparkContext
0600         self.ssc.stop(True, True)
0601         time.sleep(1)
0602         self.sc = SparkContext(conf=SparkConf())
0603         self.setupCalled = False
0604         self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
0605         self.assertFalse(self.setupCalled)
0606         self.assertTrue(self.ssc.sparkContext == self.sc)
0607 
0608         # Verify the getActiveOrCreate() recovers from checkpoint files
0609         self.ssc.stop(True, True)
0610         time.sleep(1)
0611         self.setupCalled = False
0612         self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
0613         self.assertFalse(self.setupCalled)
0614         self.ssc.start()
0615         check_output(4)
0616 
0617         # Verify that getActiveOrCreate() returns active context
0618         self.setupCalled = False
0619         self.assertEqual(StreamingContext.getActiveOrCreate(self.cpd, setup), self.ssc)
0620         self.assertFalse(self.setupCalled)
0621 
0622         # Verify that getActiveOrCreate() uses existing SparkContext
0623         self.ssc.stop(True, True)
0624         time.sleep(1)
0625         self.sc = SparkContext(conf=SparkConf())
0626         self.setupCalled = False
0627         self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
0628         self.assertFalse(self.setupCalled)
0629         self.assertTrue(self.ssc.sparkContext == self.sc)
0630 
0631         # Verify that getActiveOrCreate() calls setup() in absence of checkpoint files
0632         self.ssc.stop(True, True)
0633         shutil.rmtree(self.cpd)  # delete checkpoint directory
0634         time.sleep(1)
0635         self.setupCalled = False
0636         self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
0637         self.assertTrue(self.setupCalled)
0638 
0639         # Stop everything
0640         self.ssc.stop(True, True)
0641 
0642 
0643 if __name__ == "__main__":
0644     from pyspark.streaming.tests.test_dstream import *
0645 
0646     try:
0647         import xmlrunner
0648         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0649     except ImportError:
0650         testRunner = None
0651     unittest.main(testRunner=testRunner, verbosity=2)