0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.ml;
0019
0020
0021 import org.apache.spark.ml.Pipeline;
0022 import org.apache.spark.ml.PipelineModel;
0023 import org.apache.spark.ml.PipelineStage;
0024 import org.apache.spark.ml.evaluation.RegressionEvaluator;
0025 import org.apache.spark.ml.feature.MinMaxScaler;
0026 import org.apache.spark.ml.feature.MinMaxScalerModel;
0027 import org.apache.spark.ml.regression.FMRegressionModel;
0028 import org.apache.spark.ml.regression.FMRegressor;
0029 import org.apache.spark.sql.Dataset;
0030 import org.apache.spark.sql.Row;
0031 import org.apache.spark.sql.SparkSession;
0032
0033
0034 public class JavaFMRegressorExample {
0035 public static void main(String[] args) {
0036 SparkSession spark = SparkSession
0037 .builder()
0038 .appName("JavaFMRegressorExample")
0039 .getOrCreate();
0040
0041
0042
0043 Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
0044
0045
0046 MinMaxScalerModel featureScaler = new MinMaxScaler()
0047 .setInputCol("features")
0048 .setOutputCol("scaledFeatures")
0049 .fit(data);
0050
0051
0052 Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
0053 Dataset<Row> trainingData = splits[0];
0054 Dataset<Row> testData = splits[1];
0055
0056
0057 FMRegressor fm = new FMRegressor()
0058 .setLabelCol("label")
0059 .setFeaturesCol("scaledFeatures")
0060 .setStepSize(0.001);
0061
0062
0063 Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureScaler, fm});
0064
0065
0066 PipelineModel model = pipeline.fit(trainingData);
0067
0068
0069 Dataset<Row> predictions = model.transform(testData);
0070
0071
0072 predictions.select("prediction", "label", "features").show(5);
0073
0074
0075 RegressionEvaluator evaluator = new RegressionEvaluator()
0076 .setLabelCol("label")
0077 .setPredictionCol("prediction")
0078 .setMetricName("rmse");
0079 double rmse = evaluator.evaluate(predictions);
0080 System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
0081
0082 FMRegressionModel fmModel = (FMRegressionModel)(model.stages()[1]);
0083 System.out.println("Factors: " + fmModel.factors());
0084 System.out.println("Linear: " + fmModel.linear());
0085 System.out.println("Intercept: " + fmModel.intercept());
0086
0087
0088 spark.stop();
0089 }
0090 }