0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.mllib;
0019
0020 import org.apache.spark.SparkConf;
0021 import org.apache.spark.api.java.JavaSparkContext;
0022
0023
0024 import scala.Tuple2;
0025
0026 import org.apache.spark.api.java.JavaPairRDD;
0027 import org.apache.spark.api.java.JavaRDD;
0028 import org.apache.spark.mllib.clustering.DistributedLDAModel;
0029 import org.apache.spark.mllib.clustering.LDA;
0030 import org.apache.spark.mllib.clustering.LDAModel;
0031 import org.apache.spark.mllib.linalg.Matrix;
0032 import org.apache.spark.mllib.linalg.Vector;
0033 import org.apache.spark.mllib.linalg.Vectors;
0034
0035
0036 public class JavaLatentDirichletAllocationExample {
0037 public static void main(String[] args) {
0038
0039 SparkConf conf = new SparkConf().setAppName("JavaKLatentDirichletAllocationExample");
0040 JavaSparkContext jsc = new JavaSparkContext(conf);
0041
0042
0043
0044 String path = "data/mllib/sample_lda_data.txt";
0045 JavaRDD<String> data = jsc.textFile(path);
0046 JavaRDD<Vector> parsedData = data.map(s -> {
0047 String[] sarray = s.trim().split(" ");
0048 double[] values = new double[sarray.length];
0049 for (int i = 0; i < sarray.length; i++) {
0050 values[i] = Double.parseDouble(sarray[i]);
0051 }
0052 return Vectors.dense(values);
0053 });
0054
0055 JavaPairRDD<Long, Vector> corpus =
0056 JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
0057 corpus.cache();
0058
0059
0060 LDAModel ldaModel = new LDA().setK(3).run(corpus);
0061
0062
0063 System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
0064 + " words):");
0065 Matrix topics = ldaModel.topicsMatrix();
0066 for (int topic = 0; topic < 3; topic++) {
0067 System.out.print("Topic " + topic + ":");
0068 for (int word = 0; word < ldaModel.vocabSize(); word++) {
0069 System.out.print(" " + topics.apply(word, topic));
0070 }
0071 System.out.println();
0072 }
0073
0074 ldaModel.save(jsc.sc(),
0075 "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
0076 DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
0077 "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
0078
0079
0080 jsc.stop();
0081 }
0082 }