Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.ml.feature;
0019 
0020 import java.io.Serializable;
0021 import java.util.Arrays;
0022 import java.util.List;
0023 
0024 import org.junit.Assert;
0025 import org.junit.Test;
0026 
0027 import org.apache.spark.SharedSparkSession;
0028 import org.apache.spark.api.java.JavaRDD;
0029 import org.apache.spark.ml.linalg.Vector;
0030 import org.apache.spark.ml.linalg.Vectors;
0031 import org.apache.spark.mllib.linalg.Matrix;
0032 import org.apache.spark.mllib.linalg.distributed.RowMatrix;
0033 import org.apache.spark.sql.Dataset;
0034 import org.apache.spark.sql.Row;
0035 
0036 public class JavaPCASuite extends SharedSparkSession {
0037 
0038   public static class VectorPair implements Serializable {
0039     private Vector features = Vectors.dense(0.0);
0040     private Vector expected = Vectors.dense(0.0);
0041 
0042     public void setFeatures(Vector features) {
0043       this.features = features;
0044     }
0045 
0046     public Vector getFeatures() {
0047       return this.features;
0048     }
0049 
0050     public void setExpected(Vector expected) {
0051       this.expected = expected;
0052     }
0053 
0054     public Vector getExpected() {
0055       return this.expected;
0056     }
0057   }
0058 
0059   @Test
0060   public void testPCA() {
0061     List<Vector> points = Arrays.asList(
0062       Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0}),
0063       Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
0064       Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
0065     );
0066     JavaRDD<Vector> dataRDD = jsc.parallelize(points, 2);
0067 
0068     RowMatrix mat = new RowMatrix(dataRDD.map(
0069         (Vector vector) -> org.apache.spark.mllib.linalg.Vectors.fromML(vector)
0070     ).rdd());
0071 
0072     Matrix pc = mat.computePrincipalComponents(3);
0073 
0074     mat.multiply(pc).rows().toJavaRDD();
0075 
0076     JavaRDD<Vector> expected = mat.multiply(pc).rows().toJavaRDD()
0077         .map(org.apache.spark.mllib.linalg.Vector::asML);
0078 
0079     JavaRDD<VectorPair> featuresExpected = dataRDD.zip(expected).map(pair -> {
0080       VectorPair featuresExpected1 = new VectorPair();
0081       featuresExpected1.setFeatures(pair._1());
0082       featuresExpected1.setExpected(pair._2());
0083       return featuresExpected1;
0084     });
0085 
0086     Dataset<Row> df = spark.createDataFrame(featuresExpected, VectorPair.class);
0087     PCAModel pca = new PCA()
0088       .setInputCol("features")
0089       .setOutputCol("pca_features")
0090       .setK(3)
0091       .fit(df);
0092     List<Row> result = pca.transform(df).select("pca_features", "expected").toJavaRDD().collect();
0093     for (Row r : result) {
0094       Vector calculatedVector = (Vector) r.get(0);
0095       Vector expectedVector = (Vector) r.get(1);
0096       for (int i = 0; i < calculatedVector.size(); i++) {
0097         Assert.assertEquals(calculatedVector.apply(i), expectedVector.apply(i), 1.0e-8);
0098       }
0099     }
0100   }
0101 }