Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019   Consumes messages from a Amazon Kinesis streams and does wordcount.
0020 
0021   This example spins up 1 Kinesis Receiver per shard for the given stream.
0022   It then starts pulling from the last checkpointed sequence number of the given stream.
0023 
0024   Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
0025     <app-name> is the name of the consumer app, used to track the read data in DynamoDB
0026     <stream-name> name of the Kinesis stream (ie. mySparkStream)
0027     <endpoint-url> endpoint of the Kinesis service
0028       (e.g. https://kinesis.us-east-1.amazonaws.com)
0029     <region-name> region name of the Kinesis endpoint (e.g. us-east-1)
0030 
0031 
0032   Example:
0033       # export AWS keys if necessary
0034       $ export AWS_ACCESS_KEY_ID=<your-access-key>
0035       $ export AWS_SECRET_KEY=<your-secret-key>
0036 
0037       # run the example
0038       $ bin/spark-submit --jars \
0039         'external/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \
0040         external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
0041         myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com us-east-1
0042 
0043   There is a companion helper class called KinesisWordProducerASL which puts dummy data
0044   onto the Kinesis stream.
0045 
0046   This code uses the DefaultAWSCredentialsProviderChain to find credentials
0047   in the following order:
0048       Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
0049       Java System Properties - aws.accessKeyId and aws.secretKey
0050       Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
0051       Instance profile credentials - delivered through the Amazon EC2 metadata service
0052   For more information, see
0053       http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
0054 
0055   See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
0056   the Kinesis Spark Streaming integration.
0057 """
0058 from __future__ import print_function
0059 
0060 import sys
0061 
0062 from pyspark import SparkContext
0063 from pyspark.streaming import StreamingContext
0064 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
0065 
0066 if __name__ == "__main__":
0067     if len(sys.argv) != 5:
0068         print(
0069             "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>",
0070             file=sys.stderr)
0071         sys.exit(-1)
0072 
0073     sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
0074     ssc = StreamingContext(sc, 1)
0075     appName, streamName, endpointUrl, regionName = sys.argv[1:]
0076     lines = KinesisUtils.createStream(
0077         ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
0078     counts = lines.flatMap(lambda line: line.split(" ")) \
0079         .map(lambda word: (word, 1)) \
0080         .reduceByKey(lambda a, b: a+b)
0081     counts.pprint()
0082 
0083     ssc.start()
0084     ssc.awaitTermination()