0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.mllib.clustering;
0019
0020 import java.util.Arrays;
0021 import java.util.List;
0022
0023 import scala.Tuple2;
0024
0025 import org.junit.After;
0026 import org.junit.Before;
0027 import org.junit.Test;
0028
0029 import org.apache.spark.SparkConf;
0030 import org.apache.spark.mllib.linalg.Vector;
0031 import org.apache.spark.mllib.linalg.Vectors;
0032 import org.apache.spark.streaming.Duration;
0033 import org.apache.spark.streaming.api.java.JavaDStream;
0034 import org.apache.spark.streaming.api.java.JavaPairDStream;
0035 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0036 import static org.apache.spark.streaming.JavaTestUtils.*;
0037
0038 public class JavaStreamingKMeansSuite {
0039
0040 protected transient JavaStreamingContext ssc;
0041
0042 @Before
0043 public void setUp() {
0044 SparkConf conf = new SparkConf()
0045 .setMaster("local[2]")
0046 .setAppName("test")
0047 .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
0048 ssc = new JavaStreamingContext(conf, new Duration(1000));
0049 ssc.checkpoint("checkpoint");
0050 }
0051
0052 @After
0053 public void tearDown() {
0054 ssc.stop();
0055 ssc = null;
0056 }
0057
0058 @Test
0059 @SuppressWarnings("unchecked")
0060 public void javaAPI() {
0061 List<Vector> trainingBatch = Arrays.asList(
0062 Vectors.dense(1.0),
0063 Vectors.dense(0.0));
0064 JavaDStream<Vector> training =
0065 attachTestInputStream(ssc, Arrays.asList(trainingBatch, trainingBatch), 2);
0066 List<Tuple2<Integer, Vector>> testBatch = Arrays.asList(
0067 new Tuple2<>(10, Vectors.dense(1.0)),
0068 new Tuple2<>(11, Vectors.dense(0.0)));
0069 JavaPairDStream<Integer, Vector> test = JavaPairDStream.fromJavaDStream(
0070 attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2));
0071 StreamingKMeans skmeans = new StreamingKMeans()
0072 .setK(1)
0073 .setDecayFactor(1.0)
0074 .setInitialCenters(new Vector[]{Vectors.dense(1.0)}, new double[]{0.0});
0075 skmeans.trainOn(training);
0076 JavaPairDStream<Integer, Integer> prediction = skmeans.predictOnValues(test);
0077 attachTestOutputStream(prediction.count());
0078 runStreams(ssc, 2, 2);
0079 }
0080 }