0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.mllib;
0019
0020
0021 import org.apache.spark.mllib.stat.test.BinarySample;
0022 import org.apache.spark.mllib.stat.test.StreamingTest;
0023 import org.apache.spark.mllib.stat.test.StreamingTestResult;
0024
0025 import org.apache.spark.SparkConf;
0026 import org.apache.spark.streaming.Duration;
0027 import org.apache.spark.streaming.Seconds;
0028 import org.apache.spark.streaming.api.java.JavaDStream;
0029 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0030 import org.apache.spark.util.Utils;
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053 public class JavaStreamingTestExample {
0054
0055 private static int timeoutCounter = 0;
0056
0057 public static void main(String[] args) throws Exception {
0058 if (args.length != 3) {
0059 System.err.println("Usage: JavaStreamingTestExample " +
0060 "<dataDir> <batchDuration> <numBatchesTimeout>");
0061 System.exit(1);
0062 }
0063
0064 String dataDir = args[0];
0065 Duration batchDuration = Seconds.apply(Long.parseLong(args[1]));
0066 int numBatchesTimeout = Integer.parseInt(args[2]);
0067
0068 SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample");
0069 JavaStreamingContext ssc = new JavaStreamingContext(conf, batchDuration);
0070
0071 ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString());
0072
0073
0074 JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(line -> {
0075 String[] ts = line.split(",");
0076 boolean label = Boolean.parseBoolean(ts[0]);
0077 double value = Double.parseDouble(ts[1]);
0078 return new BinarySample(label, value);
0079 });
0080
0081 StreamingTest streamingTest = new StreamingTest()
0082 .setPeacePeriod(0)
0083 .setWindowSize(0)
0084 .setTestMethod("welch");
0085
0086 JavaDStream<StreamingTestResult> out = streamingTest.registerStream(data);
0087 out.print();
0088
0089
0090
0091 timeoutCounter = numBatchesTimeout;
0092
0093 out.foreachRDD(rdd -> {
0094 timeoutCounter -= 1;
0095 boolean anySignificant = !rdd.filter(v -> v.pValue() < 0.05).isEmpty();
0096 if (timeoutCounter <= 0 || anySignificant) {
0097 rdd.context().stop();
0098 }
0099 });
0100
0101 ssc.start();
0102 ssc.awaitTermination();
0103 }
0104 }