0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 Pipeline Example.
0020 """
0021
0022
0023 from pyspark.ml import Pipeline
0024 from pyspark.ml.classification import LogisticRegression
0025 from pyspark.ml.feature import HashingTF, Tokenizer
0026
0027 from pyspark.sql import SparkSession
0028
0029 if __name__ == "__main__":
0030 spark = SparkSession\
0031 .builder\
0032 .appName("PipelineExample")\
0033 .getOrCreate()
0034
0035
0036
0037 training = spark.createDataFrame([
0038 (0, "a b c d e spark", 1.0),
0039 (1, "b d", 0.0),
0040 (2, "spark f g h", 1.0),
0041 (3, "hadoop mapreduce", 0.0)
0042 ], ["id", "text", "label"])
0043
0044
0045 tokenizer = Tokenizer(inputCol="text", outputCol="words")
0046 hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
0047 lr = LogisticRegression(maxIter=10, regParam=0.001)
0048 pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
0049
0050
0051 model = pipeline.fit(training)
0052
0053
0054 test = spark.createDataFrame([
0055 (4, "spark i j k"),
0056 (5, "l m n"),
0057 (6, "spark hadoop spark"),
0058 (7, "apache hadoop")
0059 ], ["id", "text"])
0060
0061
0062 prediction = model.transform(test)
0063 selected = prediction.select("id", "text", "probability", "prediction")
0064 for row in selected.collect():
0065 rid, text, prob, prediction = row
0066 print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
0067
0068
0069 spark.stop()