0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.examples.streaming;
0018
0019 import java.nio.charset.StandardCharsets;
0020 import java.util.ArrayList;
0021 import java.util.Arrays;
0022 import java.util.Iterator;
0023 import java.util.List;
0024 import java.util.regex.Pattern;
0025
0026 import org.apache.spark.SparkConf;
0027 import org.apache.spark.api.java.function.FlatMapFunction;
0028 import org.apache.spark.api.java.function.Function2;
0029 import org.apache.spark.api.java.function.PairFunction;
0030 import org.apache.spark.storage.StorageLevel;
0031 import org.apache.spark.streaming.Duration;
0032 import org.apache.spark.streaming.api.java.JavaDStream;
0033 import org.apache.spark.streaming.api.java.JavaPairDStream;
0034 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0035
0036 import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
0037 import org.apache.spark.streaming.kinesis.KinesisInputDStream;
0038 import scala.Tuple2;
0039 import scala.reflect.ClassTag$;
0040
0041 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
0042 import com.amazonaws.services.kinesis.AmazonKinesisClient;
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081 public final class JavaKinesisWordCountASL {
0082 private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
0083
0084 public static void main(String[] args) throws Exception {
0085
0086 if (args.length != 3) {
0087 System.err.println(
0088 "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" +
0089 " <app-name> is the name of the app, used to track the read data in DynamoDB\n" +
0090 " <stream-name> is the name of the Kinesis stream\n" +
0091 " <endpoint-url> is the endpoint of the Kinesis service\n" +
0092 " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" +
0093 "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" +
0094 "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" +
0095 "details.\n"
0096 );
0097 System.exit(1);
0098 }
0099
0100
0101 StreamingExamples.setStreamingLogLevels();
0102
0103
0104 String kinesisAppName = args[0];
0105 String streamName = args[1];
0106 String endpointUrl = args[2];
0107
0108
0109 AmazonKinesisClient kinesisClient =
0110 new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
0111 kinesisClient.setEndpoint(endpointUrl);
0112 int numShards =
0113 kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
0114
0115
0116
0117
0118
0119
0120 int numStreams = numShards;
0121
0122
0123 Duration batchInterval = new Duration(2000);
0124
0125
0126 Duration kinesisCheckpointInterval = batchInterval;
0127
0128
0129
0130 String regionName = KinesisExampleUtils.getRegionNameByEndpoint(endpointUrl);
0131
0132
0133 SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL");
0134 JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
0135
0136
0137 List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams);
0138 for (int i = 0; i < numStreams; i++) {
0139 streamsList.add(JavaDStream.fromDStream(
0140 KinesisInputDStream.builder()
0141 .streamingContext(jssc)
0142 .checkpointAppName(kinesisAppName)
0143 .streamName(streamName)
0144 .endpointUrl(endpointUrl)
0145 .regionName(regionName)
0146 .initialPosition(new KinesisInitialPositions.Latest())
0147 .checkpointInterval(kinesisCheckpointInterval)
0148 .storageLevel(StorageLevel.MEMORY_AND_DISK_2())
0149 .build(),
0150 ClassTag$.MODULE$.apply(byte[].class)
0151 ));
0152 }
0153
0154
0155 JavaDStream<byte[]> unionStreams;
0156 if (streamsList.size() > 1) {
0157 unionStreams = jssc.union(streamsList.toArray(new JavaDStream[0]));
0158 } else {
0159
0160 unionStreams = streamsList.get(0);
0161 }
0162
0163
0164 JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
0165 @Override
0166 public Iterator<String> call(byte[] line) {
0167 String s = new String(line, StandardCharsets.UTF_8);
0168 return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
0169 }
0170 });
0171
0172
0173 JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
0174 new PairFunction<String, String, Integer>() {
0175 @Override
0176 public Tuple2<String, Integer> call(String s) {
0177 return new Tuple2<>(s, 1);
0178 }
0179 }
0180 ).reduceByKey(
0181 new Function2<Integer, Integer, Integer>() {
0182 @Override
0183 public Integer call(Integer i1, Integer i2) {
0184 return i1 + i2;
0185 }
0186 }
0187 );
0188
0189
0190 wordCounts.print();
0191
0192
0193 jssc.start();
0194 jssc.awaitTermination();
0195 }
0196 }