Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.ml;
0019 
0020 import java.io.IOException;
0021 
0022 import org.junit.Test;
0023 
0024 import org.apache.spark.SharedSparkSession;
0025 import org.apache.spark.api.java.JavaRDD;
0026 import org.apache.spark.ml.classification.LogisticRegression;
0027 import static org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInputAsList;
0028 import org.apache.spark.ml.feature.LabeledPoint;
0029 import org.apache.spark.ml.feature.StandardScaler;
0030 import org.apache.spark.sql.Dataset;
0031 import org.apache.spark.sql.Row;
0032 
0033 /**
0034  * Test Pipeline construction and fitting in Java.
0035  */
0036 public class JavaPipelineSuite extends SharedSparkSession {
0037 
0038   private transient Dataset<Row> dataset;
0039 
0040   @Override
0041   public void setUp() throws IOException {
0042     super.setUp();
0043     JavaRDD<LabeledPoint> points =
0044       jsc.parallelize(generateLogisticInputAsList(1.0, 1.0, 100, 42), 2);
0045     dataset = spark.createDataFrame(points, LabeledPoint.class);
0046   }
0047 
0048   @Test
0049   public void pipeline() {
0050     StandardScaler scaler = new StandardScaler()
0051       .setInputCol("features")
0052       .setOutputCol("scaledFeatures");
0053     LogisticRegression lr = new LogisticRegression()
0054       .setFeaturesCol("scaledFeatures");
0055     Pipeline pipeline = new Pipeline()
0056       .setStages(new PipelineStage[]{scaler, lr});
0057     PipelineModel model = pipeline.fit(dataset);
0058     model.transform(dataset).createOrReplaceTempView("prediction");
0059     Dataset<Row> predictions = spark.sql("SELECT label, probability, prediction FROM prediction");
0060     predictions.collectAsList();
0061   }
0062 }