Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 import os
0018 import struct
0019 import tempfile
0020 import time
0021 
0022 from pyspark.streaming import StreamingContext
0023 from pyspark.testing.streamingutils import PySparkStreamingTestCase
0024 
0025 
0026 class StreamingContextTests(PySparkStreamingTestCase):
0027 
0028     duration = 0.1
0029     setupCalled = False
0030 
0031     def _add_input_stream(self):
0032         inputs = [range(1, x) for x in range(101)]
0033         stream = self.ssc.queueStream(inputs)
0034         self._collect(stream, 1, block=False)
0035 
0036     def test_stop_only_streaming_context(self):
0037         self._add_input_stream()
0038         self.ssc.start()
0039         self.ssc.stop(False)
0040         self.assertEqual(len(self.sc.parallelize(range(5), 5).glom().collect()), 5)
0041 
0042     def test_stop_multiple_times(self):
0043         self._add_input_stream()
0044         self.ssc.start()
0045         self.ssc.stop(False)
0046         self.ssc.stop(False)
0047 
0048     def test_queue_stream(self):
0049         input = [list(range(i + 1)) for i in range(3)]
0050         dstream = self.ssc.queueStream(input)
0051         result = self._collect(dstream, 3)
0052         self.assertEqual(input, result)
0053 
0054     def test_text_file_stream(self):
0055         d = tempfile.mkdtemp()
0056         self.ssc = StreamingContext(self.sc, self.duration)
0057         dstream2 = self.ssc.textFileStream(d).map(int)
0058         result = self._collect(dstream2, 2, block=False)
0059         self.ssc.start()
0060         for name in ('a', 'b'):
0061             time.sleep(1)
0062             with open(os.path.join(d, name), "w") as f:
0063                 f.writelines(["%d\n" % i for i in range(10)])
0064         self.wait_for(result, 2)
0065         self.assertEqual([list(range(10)), list(range(10))], result)
0066 
0067     def test_binary_records_stream(self):
0068         d = tempfile.mkdtemp()
0069         self.ssc = StreamingContext(self.sc, self.duration)
0070         dstream = self.ssc.binaryRecordsStream(d, 10).map(
0071             lambda v: struct.unpack("10b", bytes(v)))
0072         result = self._collect(dstream, 2, block=False)
0073         self.ssc.start()
0074         for name in ('a', 'b'):
0075             time.sleep(1)
0076             with open(os.path.join(d, name), "wb") as f:
0077                 f.write(bytearray(range(10)))
0078         self.wait_for(result, 2)
0079         self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result])
0080 
0081     def test_union(self):
0082         input = [list(range(i + 1)) for i in range(3)]
0083         dstream = self.ssc.queueStream(input)
0084         dstream2 = self.ssc.queueStream(input)
0085         dstream3 = self.ssc.union(dstream, dstream2)
0086         result = self._collect(dstream3, 3)
0087         expected = [i * 2 for i in input]
0088         self.assertEqual(expected, result)
0089 
0090     def test_transform(self):
0091         dstream1 = self.ssc.queueStream([[1]])
0092         dstream2 = self.ssc.queueStream([[2]])
0093         dstream3 = self.ssc.queueStream([[3]])
0094 
0095         def func(rdds):
0096             rdd1, rdd2, rdd3 = rdds
0097             return rdd2.union(rdd3).union(rdd1)
0098 
0099         dstream = self.ssc.transform([dstream1, dstream2, dstream3], func)
0100 
0101         self.assertEqual([2, 3, 1], self._take(dstream, 3))
0102 
0103     def test_transform_pairrdd(self):
0104         # This regression test case is for SPARK-17756.
0105         dstream = self.ssc.queueStream(
0106             [[1], [2], [3]]).transform(lambda rdd: rdd.cartesian(rdd))
0107         self.assertEqual([(1, 1), (2, 2), (3, 3)], self._take(dstream, 3))
0108 
0109     def test_get_active(self):
0110         self.assertEqual(StreamingContext.getActive(), None)
0111 
0112         # Verify that getActive() returns the active context
0113         self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
0114         self.ssc.start()
0115         self.assertEqual(StreamingContext.getActive(), self.ssc)
0116 
0117         # Verify that getActive() returns None
0118         self.ssc.stop(False)
0119         self.assertEqual(StreamingContext.getActive(), None)
0120 
0121         # Verify that if the Java context is stopped, then getActive() returns None
0122         self.ssc = StreamingContext(self.sc, self.duration)
0123         self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
0124         self.ssc.start()
0125         self.assertEqual(StreamingContext.getActive(), self.ssc)
0126         self.ssc._jssc.stop(False)
0127         self.assertEqual(StreamingContext.getActive(), None)
0128 
0129     def test_get_active_or_create(self):
0130         # Test StreamingContext.getActiveOrCreate() without checkpoint data
0131         # See CheckpointTests for tests with checkpoint data
0132         self.ssc = None
0133         self.assertEqual(StreamingContext.getActive(), None)
0134 
0135         def setupFunc():
0136             ssc = StreamingContext(self.sc, self.duration)
0137             ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
0138             self.setupCalled = True
0139             return ssc
0140 
0141         # Verify that getActiveOrCreate() (w/o checkpoint) calls setupFunc when no context is active
0142         self.setupCalled = False
0143         self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
0144         self.assertTrue(self.setupCalled)
0145 
0146         # Verify that getActiveOrCreate() returns active context and does not call the setupFunc
0147         self.ssc.start()
0148         self.setupCalled = False
0149         self.assertEqual(StreamingContext.getActiveOrCreate(None, setupFunc), self.ssc)
0150         self.assertFalse(self.setupCalled)
0151 
0152         # Verify that getActiveOrCreate() calls setupFunc after active context is stopped
0153         self.ssc.stop(False)
0154         self.setupCalled = False
0155         self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
0156         self.assertTrue(self.setupCalled)
0157 
0158         # Verify that if the Java context is stopped, then getActive() returns None
0159         self.ssc = StreamingContext(self.sc, self.duration)
0160         self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
0161         self.ssc.start()
0162         self.assertEqual(StreamingContext.getActive(), self.ssc)
0163         self.ssc._jssc.stop(False)
0164         self.setupCalled = False
0165         self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
0166         self.assertTrue(self.setupCalled)
0167 
0168     def test_await_termination_or_timeout(self):
0169         self._add_input_stream()
0170         self.ssc.start()
0171         self.assertFalse(self.ssc.awaitTerminationOrTimeout(0.001))
0172         self.ssc.stop(False)
0173         self.assertTrue(self.ssc.awaitTerminationOrTimeout(0.001))
0174 
0175 
0176 if __name__ == "__main__":
0177     import unittest
0178     from pyspark.streaming.tests.test_context import *
0179 
0180     try:
0181         import xmlrunner
0182         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0183     except ImportError:
0184         testRunner = None
0185     unittest.main(testRunner=testRunner, verbosity=2)