0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.examples.ml;
0019
0020
0021 import org.apache.spark.ml.Pipeline;
0022 import org.apache.spark.ml.PipelineModel;
0023 import org.apache.spark.ml.PipelineStage;
0024 import org.apache.spark.ml.classification.FMClassificationModel;
0025 import org.apache.spark.ml.classification.FMClassifier;
0026 import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
0027 import org.apache.spark.ml.feature.*;
0028 import org.apache.spark.sql.Dataset;
0029 import org.apache.spark.sql.Row;
0030 import org.apache.spark.sql.SparkSession;
0031
0032
0033 public class JavaFMClassifierExample {
0034 public static void main(String[] args) {
0035 SparkSession spark = SparkSession
0036 .builder()
0037 .appName("JavaFMClassifierExample")
0038 .getOrCreate();
0039
0040
0041
0042 Dataset<Row> data = spark
0043 .read()
0044 .format("libsvm")
0045 .load("data/mllib/sample_libsvm_data.txt");
0046
0047
0048
0049 StringIndexerModel labelIndexer = new StringIndexer()
0050 .setInputCol("label")
0051 .setOutputCol("indexedLabel")
0052 .fit(data);
0053
0054 MinMaxScalerModel featureScaler = new MinMaxScaler()
0055 .setInputCol("features")
0056 .setOutputCol("scaledFeatures")
0057 .fit(data);
0058
0059
0060 Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
0061 Dataset<Row> trainingData = splits[0];
0062 Dataset<Row> testData = splits[1];
0063
0064
0065 FMClassifier fm = new FMClassifier()
0066 .setLabelCol("indexedLabel")
0067 .setFeaturesCol("scaledFeatures")
0068 .setStepSize(0.001);
0069
0070
0071 IndexToString labelConverter = new IndexToString()
0072 .setInputCol("prediction")
0073 .setOutputCol("predictedLabel")
0074 .setLabels(labelIndexer.labelsArray()[0]);
0075
0076
0077 Pipeline pipeline = new Pipeline()
0078 .setStages(new PipelineStage[] {labelIndexer, featureScaler, fm, labelConverter});
0079
0080
0081 PipelineModel model = pipeline.fit(trainingData);
0082
0083
0084 Dataset<Row> predictions = model.transform(testData);
0085
0086
0087 predictions.select("predictedLabel", "label", "features").show(5);
0088
0089
0090 MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
0091 .setLabelCol("indexedLabel")
0092 .setPredictionCol("prediction")
0093 .setMetricName("accuracy");
0094 double accuracy = evaluator.evaluate(predictions);
0095 System.out.println("Test Accuracy = " + accuracy);
0096
0097 FMClassificationModel fmModel = (FMClassificationModel)(model.stages()[2]);
0098 System.out.println("Factors: " + fmModel.factors());
0099 System.out.println("Linear: " + fmModel.linear());
0100 System.out.println("Intercept: " + fmModel.intercept());
0101
0102
0103 spark.stop();
0104 }
0105 }