0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.mllib.classification;
0019
0020 import java.util.Arrays;
0021 import java.util.List;
0022
0023 import scala.Tuple2;
0024
0025 import org.junit.After;
0026 import org.junit.Before;
0027 import org.junit.Test;
0028
0029 import org.apache.spark.SparkConf;
0030 import org.apache.spark.mllib.linalg.Vector;
0031 import org.apache.spark.mllib.linalg.Vectors;
0032 import org.apache.spark.mllib.regression.LabeledPoint;
0033 import org.apache.spark.streaming.Duration;
0034 import org.apache.spark.streaming.api.java.JavaDStream;
0035 import org.apache.spark.streaming.api.java.JavaPairDStream;
0036 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0037 import static org.apache.spark.streaming.JavaTestUtils.*;
0038
0039 public class JavaStreamingLogisticRegressionSuite {
0040
0041 protected transient JavaStreamingContext ssc;
0042
0043 @Before
0044 public void setUp() {
0045 SparkConf conf = new SparkConf()
0046 .setMaster("local[2]")
0047 .setAppName("test")
0048 .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
0049 ssc = new JavaStreamingContext(conf, new Duration(1000));
0050 ssc.checkpoint("checkpoint");
0051 }
0052
0053 @After
0054 public void tearDown() {
0055 ssc.stop();
0056 ssc = null;
0057 }
0058
0059 @Test
0060 @SuppressWarnings("unchecked")
0061 public void javaAPI() {
0062 List<LabeledPoint> trainingBatch = Arrays.asList(
0063 new LabeledPoint(1.0, Vectors.dense(1.0)),
0064 new LabeledPoint(0.0, Vectors.dense(0.0)));
0065 JavaDStream<LabeledPoint> training =
0066 attachTestInputStream(ssc, Arrays.asList(trainingBatch, trainingBatch), 2);
0067 List<Tuple2<Integer, Vector>> testBatch = Arrays.asList(
0068 new Tuple2<>(10, Vectors.dense(1.0)),
0069 new Tuple2<>(11, Vectors.dense(0.0)));
0070 JavaPairDStream<Integer, Vector> test = JavaPairDStream.fromJavaDStream(
0071 attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2));
0072 StreamingLogisticRegressionWithSGD slr = new StreamingLogisticRegressionWithSGD()
0073 .setNumIterations(2)
0074 .setInitialWeights(Vectors.dense(0.0));
0075 slr.trainOn(training);
0076 JavaPairDStream<Integer, Double> prediction = slr.predictOnValues(test);
0077 attachTestOutputStream(prediction.count());
0078 runStreams(ssc, 2, 2);
0079 }
0080 }