0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.ml;
0019
0020 import java.io.IOException;
0021
0022 import org.junit.Test;
0023
0024 import org.apache.spark.SharedSparkSession;
0025 import org.apache.spark.api.java.JavaRDD;
0026 import org.apache.spark.ml.classification.LogisticRegression;
0027 import static org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInputAsList;
0028 import org.apache.spark.ml.feature.LabeledPoint;
0029 import org.apache.spark.ml.feature.StandardScaler;
0030 import org.apache.spark.sql.Dataset;
0031 import org.apache.spark.sql.Row;
0032
0033
0034
0035
0036 public class JavaPipelineSuite extends SharedSparkSession {
0037
0038 private transient Dataset<Row> dataset;
0039
0040 @Override
0041 public void setUp() throws IOException {
0042 super.setUp();
0043 JavaRDD<LabeledPoint> points =
0044 jsc.parallelize(generateLogisticInputAsList(1.0, 1.0, 100, 42), 2);
0045 dataset = spark.createDataFrame(points, LabeledPoint.class);
0046 }
0047
0048 @Test
0049 public void pipeline() {
0050 StandardScaler scaler = new StandardScaler()
0051 .setInputCol("features")
0052 .setOutputCol("scaledFeatures");
0053 LogisticRegression lr = new LogisticRegression()
0054 .setFeaturesCol("scaledFeatures");
0055 Pipeline pipeline = new Pipeline()
0056 .setStages(new PipelineStage[]{scaler, lr});
0057 PipelineModel model = pipeline.fit(dataset);
0058 model.transform(dataset).createOrReplaceTempView("prediction");
0059 Dataset<Row> predictions = spark.sql("SELECT label, probability, prediction FROM prediction");
0060 predictions.collectAsList();
0061 }
0062 }