0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 from __future__ import print_function
0019
0020 from pyspark import SparkContext
0021 from pyspark.streaming import StreamingContext
0022
0023 from pyspark.mllib.linalg import Vectors
0024 from pyspark.mllib.regression import LabeledPoint
0025 from pyspark.mllib.clustering import StreamingKMeans
0026
0027
0028 if __name__ == "__main__":
0029 sc = SparkContext(appName="StreamingKMeansExample")
0030 ssc = StreamingContext(sc, 1)
0031
0032
0033
0034
0035 def parse(lp):
0036 label = float(lp[lp.find('(') + 1: lp.find(')')])
0037 vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
0038
0039 return LabeledPoint(label, vec)
0040
0041 trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
0042 .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
0043
0044 testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
0045
0046 trainingQueue = [trainingData]
0047 testingQueue = [testingData]
0048
0049 trainingStream = ssc.queueStream(trainingQueue)
0050 testingStream = ssc.queueStream(testingQueue)
0051
0052
0053 model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
0054
0055
0056
0057 model.trainOn(trainingStream)
0058
0059 result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
0060 result.pprint()
0061
0062 ssc.start()
0063 ssc.stop(stopSparkContext=True, stopGraceFully=True)
0064
0065
0066 print("Final centers: " + str(model.latestModel().centers))