0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 import os
0018 import struct
0019 import tempfile
0020 import time
0021
0022 from pyspark.streaming import StreamingContext
0023 from pyspark.testing.streamingutils import PySparkStreamingTestCase
0024
0025
0026 class StreamingContextTests(PySparkStreamingTestCase):
0027
0028 duration = 0.1
0029 setupCalled = False
0030
0031 def _add_input_stream(self):
0032 inputs = [range(1, x) for x in range(101)]
0033 stream = self.ssc.queueStream(inputs)
0034 self._collect(stream, 1, block=False)
0035
0036 def test_stop_only_streaming_context(self):
0037 self._add_input_stream()
0038 self.ssc.start()
0039 self.ssc.stop(False)
0040 self.assertEqual(len(self.sc.parallelize(range(5), 5).glom().collect()), 5)
0041
0042 def test_stop_multiple_times(self):
0043 self._add_input_stream()
0044 self.ssc.start()
0045 self.ssc.stop(False)
0046 self.ssc.stop(False)
0047
0048 def test_queue_stream(self):
0049 input = [list(range(i + 1)) for i in range(3)]
0050 dstream = self.ssc.queueStream(input)
0051 result = self._collect(dstream, 3)
0052 self.assertEqual(input, result)
0053
0054 def test_text_file_stream(self):
0055 d = tempfile.mkdtemp()
0056 self.ssc = StreamingContext(self.sc, self.duration)
0057 dstream2 = self.ssc.textFileStream(d).map(int)
0058 result = self._collect(dstream2, 2, block=False)
0059 self.ssc.start()
0060 for name in ('a', 'b'):
0061 time.sleep(1)
0062 with open(os.path.join(d, name), "w") as f:
0063 f.writelines(["%d\n" % i for i in range(10)])
0064 self.wait_for(result, 2)
0065 self.assertEqual([list(range(10)), list(range(10))], result)
0066
0067 def test_binary_records_stream(self):
0068 d = tempfile.mkdtemp()
0069 self.ssc = StreamingContext(self.sc, self.duration)
0070 dstream = self.ssc.binaryRecordsStream(d, 10).map(
0071 lambda v: struct.unpack("10b", bytes(v)))
0072 result = self._collect(dstream, 2, block=False)
0073 self.ssc.start()
0074 for name in ('a', 'b'):
0075 time.sleep(1)
0076 with open(os.path.join(d, name), "wb") as f:
0077 f.write(bytearray(range(10)))
0078 self.wait_for(result, 2)
0079 self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result])
0080
0081 def test_union(self):
0082 input = [list(range(i + 1)) for i in range(3)]
0083 dstream = self.ssc.queueStream(input)
0084 dstream2 = self.ssc.queueStream(input)
0085 dstream3 = self.ssc.union(dstream, dstream2)
0086 result = self._collect(dstream3, 3)
0087 expected = [i * 2 for i in input]
0088 self.assertEqual(expected, result)
0089
0090 def test_transform(self):
0091 dstream1 = self.ssc.queueStream([[1]])
0092 dstream2 = self.ssc.queueStream([[2]])
0093 dstream3 = self.ssc.queueStream([[3]])
0094
0095 def func(rdds):
0096 rdd1, rdd2, rdd3 = rdds
0097 return rdd2.union(rdd3).union(rdd1)
0098
0099 dstream = self.ssc.transform([dstream1, dstream2, dstream3], func)
0100
0101 self.assertEqual([2, 3, 1], self._take(dstream, 3))
0102
0103 def test_transform_pairrdd(self):
0104
0105 dstream = self.ssc.queueStream(
0106 [[1], [2], [3]]).transform(lambda rdd: rdd.cartesian(rdd))
0107 self.assertEqual([(1, 1), (2, 2), (3, 3)], self._take(dstream, 3))
0108
0109 def test_get_active(self):
0110 self.assertEqual(StreamingContext.getActive(), None)
0111
0112
0113 self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
0114 self.ssc.start()
0115 self.assertEqual(StreamingContext.getActive(), self.ssc)
0116
0117
0118 self.ssc.stop(False)
0119 self.assertEqual(StreamingContext.getActive(), None)
0120
0121
0122 self.ssc = StreamingContext(self.sc, self.duration)
0123 self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
0124 self.ssc.start()
0125 self.assertEqual(StreamingContext.getActive(), self.ssc)
0126 self.ssc._jssc.stop(False)
0127 self.assertEqual(StreamingContext.getActive(), None)
0128
0129 def test_get_active_or_create(self):
0130
0131
0132 self.ssc = None
0133 self.assertEqual(StreamingContext.getActive(), None)
0134
0135 def setupFunc():
0136 ssc = StreamingContext(self.sc, self.duration)
0137 ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
0138 self.setupCalled = True
0139 return ssc
0140
0141
0142 self.setupCalled = False
0143 self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
0144 self.assertTrue(self.setupCalled)
0145
0146
0147 self.ssc.start()
0148 self.setupCalled = False
0149 self.assertEqual(StreamingContext.getActiveOrCreate(None, setupFunc), self.ssc)
0150 self.assertFalse(self.setupCalled)
0151
0152
0153 self.ssc.stop(False)
0154 self.setupCalled = False
0155 self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
0156 self.assertTrue(self.setupCalled)
0157
0158
0159 self.ssc = StreamingContext(self.sc, self.duration)
0160 self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
0161 self.ssc.start()
0162 self.assertEqual(StreamingContext.getActive(), self.ssc)
0163 self.ssc._jssc.stop(False)
0164 self.setupCalled = False
0165 self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
0166 self.assertTrue(self.setupCalled)
0167
0168 def test_await_termination_or_timeout(self):
0169 self._add_input_stream()
0170 self.ssc.start()
0171 self.assertFalse(self.ssc.awaitTerminationOrTimeout(0.001))
0172 self.ssc.stop(False)
0173 self.assertTrue(self.ssc.awaitTerminationOrTimeout(0.001))
0174
0175
0176 if __name__ == "__main__":
0177 import unittest
0178 from pyspark.streaming.tests.test_context import *
0179
0180 try:
0181 import xmlrunner
0182 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0183 except ImportError:
0184 testRunner = None
0185 unittest.main(testRunner=testRunner, verbosity=2)