0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 from pyspark.streaming import StreamingListener
0018 from pyspark.testing.streamingutils import PySparkStreamingTestCase
0019
0020
0021 class StreamingListenerTests(PySparkStreamingTestCase):
0022
0023 duration = .5
0024
0025 class BatchInfoCollector(StreamingListener):
0026
0027 def __init__(self):
0028 super(StreamingListener, self).__init__()
0029 self.batchInfosCompleted = []
0030 self.batchInfosStarted = []
0031 self.batchInfosSubmitted = []
0032 self.streamingStartedTime = []
0033
0034 def onStreamingStarted(self, streamingStarted):
0035 self.streamingStartedTime.append(streamingStarted.time)
0036
0037 def onBatchSubmitted(self, batchSubmitted):
0038 self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
0039
0040 def onBatchStarted(self, batchStarted):
0041 self.batchInfosStarted.append(batchStarted.batchInfo())
0042
0043 def onBatchCompleted(self, batchCompleted):
0044 self.batchInfosCompleted.append(batchCompleted.batchInfo())
0045
0046 def test_batch_info_reports(self):
0047 batch_collector = self.BatchInfoCollector()
0048 self.ssc.addStreamingListener(batch_collector)
0049 input = [[1], [2], [3], [4]]
0050
0051 def func(dstream):
0052 return dstream.map(int)
0053 expected = [[1], [2], [3], [4]]
0054 self._test_func(input, func, expected)
0055
0056 batchInfosSubmitted = batch_collector.batchInfosSubmitted
0057 batchInfosStarted = batch_collector.batchInfosStarted
0058 batchInfosCompleted = batch_collector.batchInfosCompleted
0059 streamingStartedTime = batch_collector.streamingStartedTime
0060
0061 self.wait_for(batchInfosCompleted, 4)
0062
0063 self.assertEqual(len(streamingStartedTime), 1)
0064
0065 self.assertGreaterEqual(len(batchInfosSubmitted), 4)
0066 for info in batchInfosSubmitted:
0067 self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
0068 self.assertGreaterEqual(info.submissionTime(), 0)
0069
0070 for streamId in info.streamIdToInputInfo():
0071 streamInputInfo = info.streamIdToInputInfo()[streamId]
0072 self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
0073 self.assertGreaterEqual(streamInputInfo.numRecords, 0)
0074 for key in streamInputInfo.metadata():
0075 self.assertIsNotNone(streamInputInfo.metadata()[key])
0076 self.assertIsNotNone(streamInputInfo.metadataDescription())
0077
0078 for outputOpId in info.outputOperationInfos():
0079 outputInfo = info.outputOperationInfos()[outputOpId]
0080 self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
0081 self.assertGreaterEqual(outputInfo.id(), 0)
0082 self.assertIsNotNone(outputInfo.name())
0083 self.assertIsNotNone(outputInfo.description())
0084 self.assertGreaterEqual(outputInfo.startTime(), -1)
0085 self.assertGreaterEqual(outputInfo.endTime(), -1)
0086 self.assertIsNone(outputInfo.failureReason())
0087
0088 self.assertEqual(info.schedulingDelay(), -1)
0089 self.assertEqual(info.processingDelay(), -1)
0090 self.assertEqual(info.totalDelay(), -1)
0091 self.assertEqual(info.numRecords(), 0)
0092
0093 self.assertGreaterEqual(len(batchInfosStarted), 4)
0094 for info in batchInfosStarted:
0095 self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
0096 self.assertGreaterEqual(info.submissionTime(), 0)
0097
0098 for streamId in info.streamIdToInputInfo():
0099 streamInputInfo = info.streamIdToInputInfo()[streamId]
0100 self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
0101 self.assertGreaterEqual(streamInputInfo.numRecords, 0)
0102 for key in streamInputInfo.metadata():
0103 self.assertIsNotNone(streamInputInfo.metadata()[key])
0104 self.assertIsNotNone(streamInputInfo.metadataDescription())
0105
0106 for outputOpId in info.outputOperationInfos():
0107 outputInfo = info.outputOperationInfos()[outputOpId]
0108 self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
0109 self.assertGreaterEqual(outputInfo.id(), 0)
0110 self.assertIsNotNone(outputInfo.name())
0111 self.assertIsNotNone(outputInfo.description())
0112 self.assertGreaterEqual(outputInfo.startTime(), -1)
0113 self.assertGreaterEqual(outputInfo.endTime(), -1)
0114 self.assertIsNone(outputInfo.failureReason())
0115
0116 self.assertGreaterEqual(info.schedulingDelay(), 0)
0117 self.assertEqual(info.processingDelay(), -1)
0118 self.assertEqual(info.totalDelay(), -1)
0119 self.assertEqual(info.numRecords(), 0)
0120
0121 self.assertGreaterEqual(len(batchInfosCompleted), 4)
0122 for info in batchInfosCompleted:
0123 self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
0124 self.assertGreaterEqual(info.submissionTime(), 0)
0125
0126 for streamId in info.streamIdToInputInfo():
0127 streamInputInfo = info.streamIdToInputInfo()[streamId]
0128 self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
0129 self.assertGreaterEqual(streamInputInfo.numRecords, 0)
0130 for key in streamInputInfo.metadata():
0131 self.assertIsNotNone(streamInputInfo.metadata()[key])
0132 self.assertIsNotNone(streamInputInfo.metadataDescription())
0133
0134 for outputOpId in info.outputOperationInfos():
0135 outputInfo = info.outputOperationInfos()[outputOpId]
0136 self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
0137 self.assertGreaterEqual(outputInfo.id(), 0)
0138 self.assertIsNotNone(outputInfo.name())
0139 self.assertIsNotNone(outputInfo.description())
0140 self.assertGreaterEqual(outputInfo.startTime(), 0)
0141 self.assertGreaterEqual(outputInfo.endTime(), 0)
0142 self.assertIsNone(outputInfo.failureReason())
0143
0144 self.assertGreaterEqual(info.schedulingDelay(), 0)
0145 self.assertGreaterEqual(info.processingDelay(), 0)
0146 self.assertGreaterEqual(info.totalDelay(), 0)
0147 self.assertEqual(info.numRecords(), 0)
0148
0149
0150 if __name__ == "__main__":
0151 import unittest
0152 from pyspark.streaming.tests.test_listener import *
0153
0154 try:
0155 import xmlrunner
0156 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0157 except ImportError:
0158 testRunner = None
0159 unittest.main(testRunner=testRunner, verbosity=2)