Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.examples.streaming;
0018 
0019 import java.nio.charset.StandardCharsets;
0020 import java.util.ArrayList;
0021 import java.util.Arrays;
0022 import java.util.Iterator;
0023 import java.util.List;
0024 import java.util.regex.Pattern;
0025 
0026 import org.apache.spark.SparkConf;
0027 import org.apache.spark.api.java.function.FlatMapFunction;
0028 import org.apache.spark.api.java.function.Function2;
0029 import org.apache.spark.api.java.function.PairFunction;
0030 import org.apache.spark.storage.StorageLevel;
0031 import org.apache.spark.streaming.Duration;
0032 import org.apache.spark.streaming.api.java.JavaDStream;
0033 import org.apache.spark.streaming.api.java.JavaPairDStream;
0034 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0035 
0036 import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
0037 import org.apache.spark.streaming.kinesis.KinesisInputDStream;
0038 import scala.Tuple2;
0039 import scala.reflect.ClassTag$;
0040 
0041 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
0042 import com.amazonaws.services.kinesis.AmazonKinesisClient;
0043 
0044 /**
0045  * Consumes messages from a Amazon Kinesis streams and does wordcount.
0046  *
0047  * This example spins up 1 Kinesis Receiver per shard for the given stream.
0048  * It then starts pulling from the last checkpointed sequence number of the given stream.
0049  *
0050  * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name]
0051  *   [app-name] is the name of the consumer app, used to track the read data in DynamoDB
0052  *   [stream-name] name of the Kinesis stream (ie. mySparkStream)
0053  *   [endpoint-url] endpoint of the Kinesis service
0054  *     (e.g. https://kinesis.us-east-1.amazonaws.com)
0055  *
0056  *
0057  * Example:
0058  *      # export AWS keys if necessary
0059  *      $ export AWS_ACCESS_KEY_ID=[your-access-key]
0060  *      $ export AWS_SECRET_KEY=<your-secret-key>
0061  *
0062  *      # run the example
0063  *      $ SPARK_HOME/bin/run-example   streaming.JavaKinesisWordCountASL myAppName  mySparkStream \
0064  *             https://kinesis.us-east-1.amazonaws.com
0065  *
0066  * There is a companion helper class called KinesisWordProducerASL which puts dummy data
0067  * onto the Kinesis stream.
0068  *
0069  * This code uses the DefaultAWSCredentialsProviderChain to find credentials
0070  * in the following order:
0071  *    Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
0072  *    Java System Properties - aws.accessKeyId and aws.secretKey
0073  *    Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
0074  *    Instance profile credentials - delivered through the Amazon EC2 metadata service
0075  * For more information, see
0076  * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
0077  *
0078  * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
0079  * the Kinesis Spark Streaming integration.
0080  */
0081 public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
0082   private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
0083 
0084   public static void main(String[] args) throws Exception {
0085     // Check that all required args were passed in.
0086     if (args.length != 3) {
0087       System.err.println(
0088           "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" +
0089           "    <app-name> is the name of the app, used to track the read data in DynamoDB\n" +
0090           "    <stream-name> is the name of the Kinesis stream\n" +
0091           "    <endpoint-url> is the endpoint of the Kinesis service\n" +
0092           "                   (e.g. https://kinesis.us-east-1.amazonaws.com)\n" +
0093           "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" +
0094           "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" +
0095           "details.\n"
0096       );
0097       System.exit(1);
0098     }
0099 
0100     // Set default log4j logging level to WARN to hide Spark logs
0101     StreamingExamples.setStreamingLogLevels();
0102 
0103     // Populate the appropriate variables from the given args
0104     String kinesisAppName = args[0];
0105     String streamName = args[1];
0106     String endpointUrl = args[2];
0107 
0108     // Create a Kinesis client in order to determine the number of shards for the given stream
0109     AmazonKinesisClient kinesisClient =
0110         new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
0111     kinesisClient.setEndpoint(endpointUrl);
0112     int numShards =
0113         kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
0114 
0115 
0116     // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
0117     // This is not a necessity; if there are less receivers/DStreams than the number of shards,
0118     // then the shards will be automatically distributed among the receivers and each receiver
0119     // will receive data from multiple shards.
0120     int numStreams = numShards;
0121 
0122     // Spark Streaming batch interval
0123     Duration batchInterval = new Duration(2000);
0124 
0125     // Kinesis checkpoint interval.  Same as batchInterval for this example.
0126     Duration kinesisCheckpointInterval = batchInterval;
0127 
0128     // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
0129     // DynamoDB of the same region as the Kinesis stream
0130     String regionName = KinesisExampleUtils.getRegionNameByEndpoint(endpointUrl);
0131 
0132     // Setup the Spark config and StreamingContext
0133     SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL");
0134     JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
0135 
0136     // Create the Kinesis DStreams
0137     List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams);
0138     for (int i = 0; i < numStreams; i++) {
0139       streamsList.add(JavaDStream.fromDStream(
0140           KinesisInputDStream.builder()
0141               .streamingContext(jssc)
0142               .checkpointAppName(kinesisAppName)
0143               .streamName(streamName)
0144               .endpointUrl(endpointUrl)
0145               .regionName(regionName)
0146               .initialPosition(new KinesisInitialPositions.Latest())
0147               .checkpointInterval(kinesisCheckpointInterval)
0148               .storageLevel(StorageLevel.MEMORY_AND_DISK_2())
0149               .build(),
0150           ClassTag$.MODULE$.apply(byte[].class)
0151       ));
0152     }
0153 
0154     // Union all the streams if there is more than 1 stream
0155     JavaDStream<byte[]> unionStreams;
0156     if (streamsList.size() > 1) {
0157       unionStreams = jssc.union(streamsList.toArray(new JavaDStream[0]));
0158     } else {
0159       // Otherwise, just use the 1 stream
0160       unionStreams = streamsList.get(0);
0161     }
0162 
0163     // Convert each line of Array[Byte] to String, and split into words
0164     JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
0165       @Override
0166       public Iterator<String> call(byte[] line) {
0167         String s = new String(line, StandardCharsets.UTF_8);
0168         return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
0169       }
0170     });
0171 
0172     // Map each word to a (word, 1) tuple so we can reduce by key to count the words
0173     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
0174         new PairFunction<String, String, Integer>() {
0175           @Override
0176           public Tuple2<String, Integer> call(String s) {
0177             return new Tuple2<>(s, 1);
0178           }
0179         }
0180     ).reduceByKey(
0181         new Function2<Integer, Integer, Integer>() {
0182           @Override
0183           public Integer call(Integer i1, Integer i2) {
0184             return i1 + i2;
0185           }
0186         }
0187     );
0188 
0189     // Print the first 10 wordCounts
0190     wordCounts.print();
0191 
0192     // Start the streaming context and await termination
0193     jssc.start();
0194     jssc.awaitTermination();
0195   }
0196 }