0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.mllib;
0019
0020 import org.apache.spark.SparkConf;
0021 import org.apache.spark.api.java.JavaSparkContext;
0022
0023
0024 import org.apache.spark.api.java.JavaRDD;
0025 import org.apache.spark.mllib.clustering.KMeans;
0026 import org.apache.spark.mllib.clustering.KMeansModel;
0027 import org.apache.spark.mllib.linalg.Vector;
0028 import org.apache.spark.mllib.linalg.Vectors;
0029
0030
0031 public class JavaKMeansExample {
0032 public static void main(String[] args) {
0033
0034 SparkConf conf = new SparkConf().setAppName("JavaKMeansExample");
0035 JavaSparkContext jsc = new JavaSparkContext(conf);
0036
0037
0038
0039 String path = "data/mllib/kmeans_data.txt";
0040 JavaRDD<String> data = jsc.textFile(path);
0041 JavaRDD<Vector> parsedData = data.map(s -> {
0042 String[] sarray = s.split(" ");
0043 double[] values = new double[sarray.length];
0044 for (int i = 0; i < sarray.length; i++) {
0045 values[i] = Double.parseDouble(sarray[i]);
0046 }
0047 return Vectors.dense(values);
0048 });
0049 parsedData.cache();
0050
0051
0052 int numClusters = 2;
0053 int numIterations = 20;
0054 KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
0055
0056 System.out.println("Cluster centers:");
0057 for (Vector center: clusters.clusterCenters()) {
0058 System.out.println(" " + center);
0059 }
0060 double cost = clusters.computeCost(parsedData.rdd());
0061 System.out.println("Cost: " + cost);
0062
0063
0064 double WSSSE = clusters.computeCost(parsedData.rdd());
0065 System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
0066
0067
0068 clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
0069 KMeansModel sameModel = KMeansModel.load(jsc.sc(),
0070 "target/org/apache/spark/JavaKMeansExample/KMeansModel");
0071
0072
0073 jsc.stop();
0074 }
0075 }