0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 import time
0018 import unittest
0019
0020 from pyspark import StorageLevel
0021 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
0022 from pyspark.testing.streamingutils import should_test_kinesis, kinesis_requirement_message, \
0023 PySparkStreamingTestCase
0024
0025
0026 @unittest.skipIf(not should_test_kinesis, kinesis_requirement_message)
0027 class KinesisStreamTests(PySparkStreamingTestCase):
0028
0029 def test_kinesis_stream_api(self):
0030
0031 KinesisUtils.createStream(
0032 self.ssc, "myAppNam", "mySparkStream",
0033 "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
0034 InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2)
0035 KinesisUtils.createStream(
0036 self.ssc, "myAppNam", "mySparkStream",
0037 "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
0038 InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2,
0039 "awsAccessKey", "awsSecretKey")
0040
0041 def test_kinesis_stream(self):
0042 import random
0043 kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
0044 kinesisTestUtils = self.ssc._jvm.org.apache.spark.streaming.kinesis.KinesisTestUtils(2)
0045 try:
0046 kinesisTestUtils.createStream()
0047 aWSCredentials = kinesisTestUtils.getAWSCredentials()
0048 stream = KinesisUtils.createStream(
0049 self.ssc, kinesisAppName, kinesisTestUtils.streamName(),
0050 kinesisTestUtils.endpointUrl(), kinesisTestUtils.regionName(),
0051 InitialPositionInStream.LATEST, 10, StorageLevel.MEMORY_ONLY,
0052 aWSCredentials.getAWSAccessKeyId(), aWSCredentials.getAWSSecretKey())
0053
0054 outputBuffer = []
0055
0056 def get_output(_, rdd):
0057 for e in rdd.collect():
0058 outputBuffer.append(e)
0059
0060 stream.foreachRDD(get_output)
0061 self.ssc.start()
0062
0063 testData = [i for i in range(1, 11)]
0064 expectedOutput = set([str(i) for i in testData])
0065 start_time = time.time()
0066 while time.time() - start_time < 120:
0067 kinesisTestUtils.pushData(testData)
0068 if expectedOutput == set(outputBuffer):
0069 break
0070 time.sleep(10)
0071 self.assertEqual(expectedOutput, set(outputBuffer))
0072 except:
0073 import traceback
0074 traceback.print_exc()
0075 raise
0076 finally:
0077 self.ssc.stop(False)
0078 kinesisTestUtils.deleteStream()
0079 kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
0080
0081
0082 if __name__ == "__main__":
0083 from pyspark.streaming.tests.test_kinesis import *
0084
0085 try:
0086 import xmlrunner
0087 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0088 except ImportError:
0089 testRunner = None
0090 unittest.main(testRunner=testRunner, verbosity=2)