Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.examples.mllib;
0019 
0020 // $example on$
0021 import org.apache.spark.mllib.stat.test.BinarySample;
0022 import org.apache.spark.mllib.stat.test.StreamingTest;
0023 import org.apache.spark.mllib.stat.test.StreamingTestResult;
0024 // $example off$
0025 import org.apache.spark.SparkConf;
0026 import org.apache.spark.streaming.Duration;
0027 import org.apache.spark.streaming.Seconds;
0028 import org.apache.spark.streaming.api.java.JavaDStream;
0029 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0030 import org.apache.spark.util.Utils;
0031 
0032 
0033 /**
0034  * Perform streaming testing using Welch's 2-sample t-test on a stream of data, where the data
0035  * stream arrives as text files in a directory. Stops when the two groups are statistically
0036  * significant (p-value < 0.05) or after a user-specified timeout in number of batches is exceeded.
0037  *
0038  * The rows of the text files must be in the form `Boolean, Double`. For example:
0039  *   false, -3.92
0040  *   true, 99.32
0041  *
0042  * Usage:
0043  *   JavaStreamingTestExample <dataDir> <batchDuration> <numBatchesTimeout>
0044  *
0045  * To run on your local machine using the directory `dataDir` with 5 seconds between each batch and
0046  * a timeout after 100 insignificant batches, call:
0047  *    $ bin/run-example mllib.JavaStreamingTestExample dataDir 5 100
0048  *
0049  * As you add text files to `dataDir` the significance test wil continually update every
0050  * `batchDuration` seconds until the test becomes significant (p-value < 0.05) or the number of
0051  * batches processed exceeds `numBatchesTimeout`.
0052  */
0053 public class JavaStreamingTestExample {
0054 
0055   private static int timeoutCounter = 0;
0056 
0057   public static void main(String[] args) throws Exception {
0058     if (args.length != 3) {
0059       System.err.println("Usage: JavaStreamingTestExample " +
0060         "<dataDir> <batchDuration> <numBatchesTimeout>");
0061         System.exit(1);
0062     }
0063 
0064     String dataDir = args[0];
0065     Duration batchDuration = Seconds.apply(Long.parseLong(args[1]));
0066     int numBatchesTimeout = Integer.parseInt(args[2]);
0067 
0068     SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample");
0069     JavaStreamingContext ssc = new JavaStreamingContext(conf, batchDuration);
0070 
0071     ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString());
0072 
0073     // $example on$
0074     JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(line -> {
0075       String[] ts = line.split(",");
0076       boolean label = Boolean.parseBoolean(ts[0]);
0077       double value = Double.parseDouble(ts[1]);
0078       return new BinarySample(label, value);
0079     });
0080 
0081     StreamingTest streamingTest = new StreamingTest()
0082       .setPeacePeriod(0)
0083       .setWindowSize(0)
0084       .setTestMethod("welch");
0085 
0086     JavaDStream<StreamingTestResult> out = streamingTest.registerStream(data);
0087     out.print();
0088     // $example off$
0089 
0090     // Stop processing if test becomes significant or we time out
0091     timeoutCounter = numBatchesTimeout;
0092 
0093     out.foreachRDD(rdd -> {
0094       timeoutCounter -= 1;
0095       boolean anySignificant = !rdd.filter(v -> v.pValue() < 0.05).isEmpty();
0096       if (timeoutCounter <= 0 || anySignificant) {
0097         rdd.context().stop();
0098       }
0099     });
0100 
0101     ssc.start();
0102     ssc.awaitTermination();
0103   }
0104 }