Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 Pipeline Example.
0020 """
0021 
0022 # $example on$
0023 from pyspark.ml import Pipeline
0024 from pyspark.ml.classification import LogisticRegression
0025 from pyspark.ml.feature import HashingTF, Tokenizer
0026 # $example off$
0027 from pyspark.sql import SparkSession
0028 
0029 if __name__ == "__main__":
0030     spark = SparkSession\
0031         .builder\
0032         .appName("PipelineExample")\
0033         .getOrCreate()
0034 
0035     # $example on$
0036     # Prepare training documents from a list of (id, text, label) tuples.
0037     training = spark.createDataFrame([
0038         (0, "a b c d e spark", 1.0),
0039         (1, "b d", 0.0),
0040         (2, "spark f g h", 1.0),
0041         (3, "hadoop mapreduce", 0.0)
0042     ], ["id", "text", "label"])
0043 
0044     # Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
0045     tokenizer = Tokenizer(inputCol="text", outputCol="words")
0046     hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
0047     lr = LogisticRegression(maxIter=10, regParam=0.001)
0048     pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
0049 
0050     # Fit the pipeline to training documents.
0051     model = pipeline.fit(training)
0052 
0053     # Prepare test documents, which are unlabeled (id, text) tuples.
0054     test = spark.createDataFrame([
0055         (4, "spark i j k"),
0056         (5, "l m n"),
0057         (6, "spark hadoop spark"),
0058         (7, "apache hadoop")
0059     ], ["id", "text"])
0060 
0061     # Make predictions on test documents and print columns of interest.
0062     prediction = model.transform(test)
0063     selected = prediction.select("id", "text", "probability", "prediction")
0064     for row in selected.collect():
0065         rid, text, prob, prediction = row
0066         print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
0067     # $example off$
0068 
0069     spark.stop()