Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 from __future__ import print_function
0019 
0020 from pyspark import SparkContext
0021 from pyspark.streaming import StreamingContext
0022 # $example on$
0023 from pyspark.mllib.linalg import Vectors
0024 from pyspark.mllib.regression import LabeledPoint
0025 from pyspark.mllib.clustering import StreamingKMeans
0026 # $example off$
0027 
0028 if __name__ == "__main__":
0029     sc = SparkContext(appName="StreamingKMeansExample")  # SparkContext
0030     ssc = StreamingContext(sc, 1)
0031 
0032     # $example on$
0033     # we make an input stream of vectors for training,
0034     # as well as a stream of vectors for testing
0035     def parse(lp):
0036         label = float(lp[lp.find('(') + 1: lp.find(')')])
0037         vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
0038 
0039         return LabeledPoint(label, vec)
0040 
0041     trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
0042         .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
0043 
0044     testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
0045 
0046     trainingQueue = [trainingData]
0047     testingQueue = [testingData]
0048 
0049     trainingStream = ssc.queueStream(trainingQueue)
0050     testingStream = ssc.queueStream(testingQueue)
0051 
0052     # We create a model with random clusters and specify the number of clusters to find
0053     model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
0054 
0055     # Now register the streams for training and testing and start the job,
0056     # printing the predicted cluster assignments on new data points as they arrive.
0057     model.trainOn(trainingStream)
0058 
0059     result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
0060     result.pprint()
0061 
0062     ssc.start()
0063     ssc.stop(stopSparkContext=True, stopGraceFully=True)
0064     # $example off$
0065 
0066     print("Final centers: " + str(model.latestModel().centers))