Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 from pyspark.streaming import StreamingListener
0018 from pyspark.testing.streamingutils import PySparkStreamingTestCase
0019 
0020 
0021 class StreamingListenerTests(PySparkStreamingTestCase):
0022 
0023     duration = .5
0024 
0025     class BatchInfoCollector(StreamingListener):
0026 
0027         def __init__(self):
0028             super(StreamingListener, self).__init__()
0029             self.batchInfosCompleted = []
0030             self.batchInfosStarted = []
0031             self.batchInfosSubmitted = []
0032             self.streamingStartedTime = []
0033 
0034         def onStreamingStarted(self, streamingStarted):
0035             self.streamingStartedTime.append(streamingStarted.time)
0036 
0037         def onBatchSubmitted(self, batchSubmitted):
0038             self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
0039 
0040         def onBatchStarted(self, batchStarted):
0041             self.batchInfosStarted.append(batchStarted.batchInfo())
0042 
0043         def onBatchCompleted(self, batchCompleted):
0044             self.batchInfosCompleted.append(batchCompleted.batchInfo())
0045 
0046     def test_batch_info_reports(self):
0047         batch_collector = self.BatchInfoCollector()
0048         self.ssc.addStreamingListener(batch_collector)
0049         input = [[1], [2], [3], [4]]
0050 
0051         def func(dstream):
0052             return dstream.map(int)
0053         expected = [[1], [2], [3], [4]]
0054         self._test_func(input, func, expected)
0055 
0056         batchInfosSubmitted = batch_collector.batchInfosSubmitted
0057         batchInfosStarted = batch_collector.batchInfosStarted
0058         batchInfosCompleted = batch_collector.batchInfosCompleted
0059         streamingStartedTime = batch_collector.streamingStartedTime
0060 
0061         self.wait_for(batchInfosCompleted, 4)
0062 
0063         self.assertEqual(len(streamingStartedTime), 1)
0064 
0065         self.assertGreaterEqual(len(batchInfosSubmitted), 4)
0066         for info in batchInfosSubmitted:
0067             self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
0068             self.assertGreaterEqual(info.submissionTime(), 0)
0069 
0070             for streamId in info.streamIdToInputInfo():
0071                 streamInputInfo = info.streamIdToInputInfo()[streamId]
0072                 self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
0073                 self.assertGreaterEqual(streamInputInfo.numRecords, 0)
0074                 for key in streamInputInfo.metadata():
0075                     self.assertIsNotNone(streamInputInfo.metadata()[key])
0076                 self.assertIsNotNone(streamInputInfo.metadataDescription())
0077 
0078             for outputOpId in info.outputOperationInfos():
0079                 outputInfo = info.outputOperationInfos()[outputOpId]
0080                 self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
0081                 self.assertGreaterEqual(outputInfo.id(), 0)
0082                 self.assertIsNotNone(outputInfo.name())
0083                 self.assertIsNotNone(outputInfo.description())
0084                 self.assertGreaterEqual(outputInfo.startTime(), -1)
0085                 self.assertGreaterEqual(outputInfo.endTime(), -1)
0086                 self.assertIsNone(outputInfo.failureReason())
0087 
0088             self.assertEqual(info.schedulingDelay(), -1)
0089             self.assertEqual(info.processingDelay(), -1)
0090             self.assertEqual(info.totalDelay(), -1)
0091             self.assertEqual(info.numRecords(), 0)
0092 
0093         self.assertGreaterEqual(len(batchInfosStarted), 4)
0094         for info in batchInfosStarted:
0095             self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
0096             self.assertGreaterEqual(info.submissionTime(), 0)
0097 
0098             for streamId in info.streamIdToInputInfo():
0099                 streamInputInfo = info.streamIdToInputInfo()[streamId]
0100                 self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
0101                 self.assertGreaterEqual(streamInputInfo.numRecords, 0)
0102                 for key in streamInputInfo.metadata():
0103                     self.assertIsNotNone(streamInputInfo.metadata()[key])
0104                 self.assertIsNotNone(streamInputInfo.metadataDescription())
0105 
0106             for outputOpId in info.outputOperationInfos():
0107                 outputInfo = info.outputOperationInfos()[outputOpId]
0108                 self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
0109                 self.assertGreaterEqual(outputInfo.id(), 0)
0110                 self.assertIsNotNone(outputInfo.name())
0111                 self.assertIsNotNone(outputInfo.description())
0112                 self.assertGreaterEqual(outputInfo.startTime(), -1)
0113                 self.assertGreaterEqual(outputInfo.endTime(), -1)
0114                 self.assertIsNone(outputInfo.failureReason())
0115 
0116             self.assertGreaterEqual(info.schedulingDelay(), 0)
0117             self.assertEqual(info.processingDelay(), -1)
0118             self.assertEqual(info.totalDelay(), -1)
0119             self.assertEqual(info.numRecords(), 0)
0120 
0121         self.assertGreaterEqual(len(batchInfosCompleted), 4)
0122         for info in batchInfosCompleted:
0123             self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
0124             self.assertGreaterEqual(info.submissionTime(), 0)
0125 
0126             for streamId in info.streamIdToInputInfo():
0127                 streamInputInfo = info.streamIdToInputInfo()[streamId]
0128                 self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
0129                 self.assertGreaterEqual(streamInputInfo.numRecords, 0)
0130                 for key in streamInputInfo.metadata():
0131                     self.assertIsNotNone(streamInputInfo.metadata()[key])
0132                 self.assertIsNotNone(streamInputInfo.metadataDescription())
0133 
0134             for outputOpId in info.outputOperationInfos():
0135                 outputInfo = info.outputOperationInfos()[outputOpId]
0136                 self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
0137                 self.assertGreaterEqual(outputInfo.id(), 0)
0138                 self.assertIsNotNone(outputInfo.name())
0139                 self.assertIsNotNone(outputInfo.description())
0140                 self.assertGreaterEqual(outputInfo.startTime(), 0)
0141                 self.assertGreaterEqual(outputInfo.endTime(), 0)
0142                 self.assertIsNone(outputInfo.failureReason())
0143 
0144             self.assertGreaterEqual(info.schedulingDelay(), 0)
0145             self.assertGreaterEqual(info.processingDelay(), 0)
0146             self.assertGreaterEqual(info.totalDelay(), 0)
0147             self.assertEqual(info.numRecords(), 0)
0148 
0149 
0150 if __name__ == "__main__":
0151     import unittest
0152     from pyspark.streaming.tests.test_listener import *
0153 
0154     try:
0155         import xmlrunner
0156         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0157     except ImportError:
0158         testRunner = None
0159     unittest.main(testRunner=testRunner, verbosity=2)