0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.ml.feature;
0019
0020 import java.io.Serializable;
0021 import java.util.Arrays;
0022 import java.util.List;
0023
0024 import org.junit.Assert;
0025 import org.junit.Test;
0026
0027 import org.apache.spark.SharedSparkSession;
0028 import org.apache.spark.api.java.JavaRDD;
0029 import org.apache.spark.ml.linalg.Vector;
0030 import org.apache.spark.ml.linalg.Vectors;
0031 import org.apache.spark.mllib.linalg.Matrix;
0032 import org.apache.spark.mllib.linalg.distributed.RowMatrix;
0033 import org.apache.spark.sql.Dataset;
0034 import org.apache.spark.sql.Row;
0035
0036 public class JavaPCASuite extends SharedSparkSession {
0037
0038 public static class VectorPair implements Serializable {
0039 private Vector features = Vectors.dense(0.0);
0040 private Vector expected = Vectors.dense(0.0);
0041
0042 public void setFeatures(Vector features) {
0043 this.features = features;
0044 }
0045
0046 public Vector getFeatures() {
0047 return this.features;
0048 }
0049
0050 public void setExpected(Vector expected) {
0051 this.expected = expected;
0052 }
0053
0054 public Vector getExpected() {
0055 return this.expected;
0056 }
0057 }
0058
0059 @Test
0060 public void testPCA() {
0061 List<Vector> points = Arrays.asList(
0062 Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0}),
0063 Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
0064 Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
0065 );
0066 JavaRDD<Vector> dataRDD = jsc.parallelize(points, 2);
0067
0068 RowMatrix mat = new RowMatrix(dataRDD.map(
0069 (Vector vector) -> org.apache.spark.mllib.linalg.Vectors.fromML(vector)
0070 ).rdd());
0071
0072 Matrix pc = mat.computePrincipalComponents(3);
0073
0074 mat.multiply(pc).rows().toJavaRDD();
0075
0076 JavaRDD<Vector> expected = mat.multiply(pc).rows().toJavaRDD()
0077 .map(org.apache.spark.mllib.linalg.Vector::asML);
0078
0079 JavaRDD<VectorPair> featuresExpected = dataRDD.zip(expected).map(pair -> {
0080 VectorPair featuresExpected1 = new VectorPair();
0081 featuresExpected1.setFeatures(pair._1());
0082 featuresExpected1.setExpected(pair._2());
0083 return featuresExpected1;
0084 });
0085
0086 Dataset<Row> df = spark.createDataFrame(featuresExpected, VectorPair.class);
0087 PCAModel pca = new PCA()
0088 .setInputCol("features")
0089 .setOutputCol("pca_features")
0090 .setK(3)
0091 .fit(df);
0092 List<Row> result = pca.transform(df).select("pca_features", "expected").toJavaRDD().collect();
0093 for (Row r : result) {
0094 Vector calculatedVector = (Vector) r.get(0);
0095 Vector expectedVector = (Vector) r.get(1);
0096 for (int i = 0; i < calculatedVector.size(); i++) {
0097 Assert.assertEquals(calculatedVector.apply(i), expectedVector.apply(i), 1.0e-8);
0098 }
0099 }
0100 }
0101 }