Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 import time
0018 import unittest
0019 
0020 from pyspark import StorageLevel
0021 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
0022 from pyspark.testing.streamingutils import should_test_kinesis, kinesis_requirement_message, \
0023     PySparkStreamingTestCase
0024 
0025 
0026 @unittest.skipIf(not should_test_kinesis, kinesis_requirement_message)
0027 class KinesisStreamTests(PySparkStreamingTestCase):
0028 
0029     def test_kinesis_stream_api(self):
0030         # Don't start the StreamingContext because we cannot test it in Jenkins
0031         KinesisUtils.createStream(
0032             self.ssc, "myAppNam", "mySparkStream",
0033             "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
0034             InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2)
0035         KinesisUtils.createStream(
0036             self.ssc, "myAppNam", "mySparkStream",
0037             "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
0038             InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2,
0039             "awsAccessKey", "awsSecretKey")
0040 
0041     def test_kinesis_stream(self):
0042         import random
0043         kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
0044         kinesisTestUtils = self.ssc._jvm.org.apache.spark.streaming.kinesis.KinesisTestUtils(2)
0045         try:
0046             kinesisTestUtils.createStream()
0047             aWSCredentials = kinesisTestUtils.getAWSCredentials()
0048             stream = KinesisUtils.createStream(
0049                 self.ssc, kinesisAppName, kinesisTestUtils.streamName(),
0050                 kinesisTestUtils.endpointUrl(), kinesisTestUtils.regionName(),
0051                 InitialPositionInStream.LATEST, 10, StorageLevel.MEMORY_ONLY,
0052                 aWSCredentials.getAWSAccessKeyId(), aWSCredentials.getAWSSecretKey())
0053 
0054             outputBuffer = []
0055 
0056             def get_output(_, rdd):
0057                 for e in rdd.collect():
0058                     outputBuffer.append(e)
0059 
0060             stream.foreachRDD(get_output)
0061             self.ssc.start()
0062 
0063             testData = [i for i in range(1, 11)]
0064             expectedOutput = set([str(i) for i in testData])
0065             start_time = time.time()
0066             while time.time() - start_time < 120:
0067                 kinesisTestUtils.pushData(testData)
0068                 if expectedOutput == set(outputBuffer):
0069                     break
0070                 time.sleep(10)
0071             self.assertEqual(expectedOutput, set(outputBuffer))
0072         except:
0073             import traceback
0074             traceback.print_exc()
0075             raise
0076         finally:
0077             self.ssc.stop(False)
0078             kinesisTestUtils.deleteStream()
0079             kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
0080 
0081 
0082 if __name__ == "__main__":
0083     from pyspark.streaming.tests.test_kinesis import *
0084 
0085     try:
0086         import xmlrunner
0087         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0088     except ImportError:
0089         testRunner = None
0090     unittest.main(testRunner=testRunner, verbosity=2)