Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples.streaming;
0019 
0020 import java.io.File;
0021 import java.nio.charset.Charset;
0022 import java.util.Arrays;
0023 import java.util.List;
0024 import java.util.regex.Pattern;
0025 
0026 import scala.Tuple2;
0027 
0028 import com.google.common.io.Files;
0029 
0030 import org.apache.spark.SparkConf;
0031 import org.apache.spark.api.java.JavaSparkContext;
0032 import org.apache.spark.api.java.function.*;
0033 import org.apache.spark.broadcast.Broadcast;
0034 import org.apache.spark.streaming.Durations;
0035 import org.apache.spark.streaming.api.java.JavaDStream;
0036 import org.apache.spark.streaming.api.java.JavaPairDStream;
0037 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
0038 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0039 import org.apache.spark.util.LongAccumulator;
0040 
0041 /**
0042  * Use this singleton to get or register a Broadcast variable.
0043  */
0044 class JavaWordBlacklist {
0045 
0046   private static volatile Broadcast<List<String>> instance = null;
0047 
0048   public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
0049     if (instance == null) {
0050       synchronized (JavaWordBlacklist.class) {
0051         if (instance == null) {
0052           List<String> wordBlacklist = Arrays.asList("a", "b", "c");
0053           instance = jsc.broadcast(wordBlacklist);
0054         }
0055       }
0056     }
0057     return instance;
0058   }
0059 }
0060 
0061 /**
0062  * Use this singleton to get or register an Accumulator.
0063  */
0064 class JavaDroppedWordsCounter {
0065 
0066   private static volatile LongAccumulator instance = null;
0067 
0068   public static LongAccumulator getInstance(JavaSparkContext jsc) {
0069     if (instance == null) {
0070       synchronized (JavaDroppedWordsCounter.class) {
0071         if (instance == null) {
0072           instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
0073         }
0074       }
0075     }
0076     return instance;
0077   }
0078 }
0079 
0080 /**
0081  * Counts words in text encoded with UTF8 received from the network every second. This example also
0082  * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
0083  * they can be registered on driver failures.
0084  *
0085  * Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
0086  *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
0087  *   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
0088  *   <output-file> file to which the word counts will be appended
0089  *
0090  * <checkpoint-directory> and <output-file> must be absolute paths
0091  *
0092  * To run this on your local machine, you need to first run a Netcat server
0093  *
0094  *      `$ nc -lk 9999`
0095  *
0096  * and run the example as
0097  *
0098  *      `$ ./bin/run-example org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \
0099  *              localhost 9999 ~/checkpoint/ ~/out`
0100  *
0101  * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
0102  * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
0103  * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
0104  * the checkpoint data.
0105  *
0106  * Refer to the online documentation for more details.
0107  */
0108 public final class JavaRecoverableNetworkWordCount {
0109   private static final Pattern SPACE = Pattern.compile(" ");
0110 
0111   private static JavaStreamingContext createContext(String ip,
0112                                                     int port,
0113                                                     String checkpointDirectory,
0114                                                     String outputPath) {
0115 
0116     // If you do not see this printed, that means the StreamingContext has been loaded
0117     // from the new checkpoint
0118     System.out.println("Creating new context");
0119     File outputFile = new File(outputPath);
0120     if (outputFile.exists()) {
0121       outputFile.delete();
0122     }
0123     SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
0124     // Create the context with a 1 second batch size
0125     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
0126     ssc.checkpoint(checkpointDirectory);
0127 
0128     // Create a socket stream on target ip:port and count the
0129     // words in input stream of \n delimited text (eg. generated by 'nc')
0130     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
0131     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
0132     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
0133         .reduceByKey((i1, i2) -> i1 + i2);
0134 
0135     wordCounts.foreachRDD((rdd, time) -> {
0136       // Get or register the blacklist Broadcast
0137       Broadcast<List<String>> blacklist =
0138           JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
0139       // Get or register the droppedWordsCounter Accumulator
0140       LongAccumulator droppedWordsCounter =
0141           JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
0142       // Use blacklist to drop words and use droppedWordsCounter to count them
0143       String counts = rdd.filter(wordCount -> {
0144         if (blacklist.value().contains(wordCount._1())) {
0145           droppedWordsCounter.add(wordCount._2());
0146           return false;
0147         } else {
0148           return true;
0149         }
0150       }).collect().toString();
0151       String output = "Counts at time " + time + " " + counts;
0152       System.out.println(output);
0153       System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
0154       System.out.println("Appending to " + outputFile.getAbsolutePath());
0155       Files.append(output + "\n", outputFile, Charset.defaultCharset());
0156     });
0157 
0158     return ssc;
0159   }
0160 
0161   public static void main(String[] args) throws Exception {
0162     if (args.length != 4) {
0163       System.err.println("You arguments were " + Arrays.asList(args));
0164       System.err.println(
0165           "Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
0166           "     <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
0167           "     Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
0168           "     HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
0169           "     the word counts will be appended\n" +
0170           "\n" +
0171           "In local mode, <master> should be 'local[n]' with n > 1\n" +
0172           "Both <checkpoint-directory> and <output-file> must be absolute paths");
0173       System.exit(1);
0174     }
0175 
0176     String ip = args[0];
0177     int port = Integer.parseInt(args[1]);
0178     String checkpointDirectory = args[2];
0179     String outputPath = args[3];
0180 
0181     // Function to create JavaStreamingContext without any output operations
0182     // (used to detect the new context)
0183     Function0<JavaStreamingContext> createContextFunc =
0184         () -> createContext(ip, port, checkpointDirectory, outputPath);
0185 
0186     JavaStreamingContext ssc =
0187       JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
0188     ssc.start();
0189     ssc.awaitTermination();
0190   }
0191 }