0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 Consumes messages from a Amazon Kinesis streams and does wordcount.
0020
0021 This example spins up 1 Kinesis Receiver per shard for the given stream.
0022 It then starts pulling from the last checkpointed sequence number of the given stream.
0023
0024 Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
0025 <app-name> is the name of the consumer app, used to track the read data in DynamoDB
0026 <stream-name> name of the Kinesis stream (ie. mySparkStream)
0027 <endpoint-url> endpoint of the Kinesis service
0028 (e.g. https://kinesis.us-east-1.amazonaws.com)
0029 <region-name> region name of the Kinesis endpoint (e.g. us-east-1)
0030
0031
0032 Example:
0033 # export AWS keys if necessary
0034 $ export AWS_ACCESS_KEY_ID=<your-access-key>
0035 $ export AWS_SECRET_KEY=<your-secret-key>
0036
0037 # run the example
0038 $ bin/spark-submit --jars \
0039 'external/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \
0040 external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
0041 myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com us-east-1
0042
0043 There is a companion helper class called KinesisWordProducerASL which puts dummy data
0044 onto the Kinesis stream.
0045
0046 This code uses the DefaultAWSCredentialsProviderChain to find credentials
0047 in the following order:
0048 Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
0049 Java System Properties - aws.accessKeyId and aws.secretKey
0050 Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
0051 Instance profile credentials - delivered through the Amazon EC2 metadata service
0052 For more information, see
0053 http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
0054
0055 See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
0056 the Kinesis Spark Streaming integration.
0057 """
0058 from __future__ import print_function
0059
0060 import sys
0061
0062 from pyspark import SparkContext
0063 from pyspark.streaming import StreamingContext
0064 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
0065
0066 if __name__ == "__main__":
0067 if len(sys.argv) != 5:
0068 print(
0069 "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>",
0070 file=sys.stderr)
0071 sys.exit(-1)
0072
0073 sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
0074 ssc = StreamingContext(sc, 1)
0075 appName, streamName, endpointUrl, regionName = sys.argv[1:]
0076 lines = KinesisUtils.createStream(
0077 ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
0078 counts = lines.flatMap(lambda line: line.split(" ")) \
0079 .map(lambda word: (word, 1)) \
0080 .reduceByKey(lambda a, b: a+b)
0081 counts.pprint()
0082
0083 ssc.start()
0084 ssc.awaitTermination()