0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.streaming;
0019
0020 import java.io.File;
0021 import java.nio.charset.Charset;
0022 import java.util.Arrays;
0023 import java.util.List;
0024 import java.util.regex.Pattern;
0025
0026 import scala.Tuple2;
0027
0028 import com.google.common.io.Files;
0029
0030 import org.apache.spark.SparkConf;
0031 import org.apache.spark.api.java.JavaSparkContext;
0032 import org.apache.spark.api.java.function.*;
0033 import org.apache.spark.broadcast.Broadcast;
0034 import org.apache.spark.streaming.Durations;
0035 import org.apache.spark.streaming.api.java.JavaDStream;
0036 import org.apache.spark.streaming.api.java.JavaPairDStream;
0037 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
0038 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0039 import org.apache.spark.util.LongAccumulator;
0040
0041
0042
0043
0044 class JavaWordBlacklist {
0045
0046 private static volatile Broadcast<List<String>> instance = null;
0047
0048 public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
0049 if (instance == null) {
0050 synchronized (JavaWordBlacklist.class) {
0051 if (instance == null) {
0052 List<String> wordBlacklist = Arrays.asList("a", "b", "c");
0053 instance = jsc.broadcast(wordBlacklist);
0054 }
0055 }
0056 }
0057 return instance;
0058 }
0059 }
0060
0061
0062
0063
0064 class JavaDroppedWordsCounter {
0065
0066 private static volatile LongAccumulator instance = null;
0067
0068 public static LongAccumulator getInstance(JavaSparkContext jsc) {
0069 if (instance == null) {
0070 synchronized (JavaDroppedWordsCounter.class) {
0071 if (instance == null) {
0072 instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
0073 }
0074 }
0075 }
0076 return instance;
0077 }
0078 }
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107
0108 public final class JavaRecoverableNetworkWordCount {
0109 private static final Pattern SPACE = Pattern.compile(" ");
0110
0111 private static JavaStreamingContext createContext(String ip,
0112 int port,
0113 String checkpointDirectory,
0114 String outputPath) {
0115
0116
0117
0118 System.out.println("Creating new context");
0119 File outputFile = new File(outputPath);
0120 if (outputFile.exists()) {
0121 outputFile.delete();
0122 }
0123 SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
0124
0125 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
0126 ssc.checkpoint(checkpointDirectory);
0127
0128
0129
0130 JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
0131 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0132 JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
0133 .reduceByKey((i1, i2) -> i1 + i2);
0134
0135 wordCounts.foreachRDD((rdd, time) -> {
0136
0137 Broadcast<List<String>> blacklist =
0138 JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
0139
0140 LongAccumulator droppedWordsCounter =
0141 JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
0142
0143 String counts = rdd.filter(wordCount -> {
0144 if (blacklist.value().contains(wordCount._1())) {
0145 droppedWordsCounter.add(wordCount._2());
0146 return false;
0147 } else {
0148 return true;
0149 }
0150 }).collect().toString();
0151 String output = "Counts at time " + time + " " + counts;
0152 System.out.println(output);
0153 System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
0154 System.out.println("Appending to " + outputFile.getAbsolutePath());
0155 Files.append(output + "\n", outputFile, Charset.defaultCharset());
0156 });
0157
0158 return ssc;
0159 }
0160
0161 public static void main(String[] args) throws Exception {
0162 if (args.length != 4) {
0163 System.err.println("You arguments were " + Arrays.asList(args));
0164 System.err.println(
0165 "Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
0166 " <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
0167 " Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
0168 " HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
0169 " the word counts will be appended\n" +
0170 "\n" +
0171 "In local mode, <master> should be 'local[n]' with n > 1\n" +
0172 "Both <checkpoint-directory> and <output-file> must be absolute paths");
0173 System.exit(1);
0174 }
0175
0176 String ip = args[0];
0177 int port = Integer.parseInt(args[1]);
0178 String checkpointDirectory = args[2];
0179 String outputPath = args[3];
0180
0181
0182
0183 Function0<JavaStreamingContext> createContextFunc =
0184 () -> createContext(ip, port, checkpointDirectory, outputPath);
0185
0186 JavaStreamingContext ssc =
0187 JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
0188 ssc.start();
0189 ssc.awaitTermination();
0190 }
0191 }