0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 Streaming Linear Regression Example.
0020 """
0021 from __future__ import print_function
0022
0023
0024 import sys
0025
0026
0027 from pyspark import SparkContext
0028 from pyspark.streaming import StreamingContext
0029
0030 from pyspark.mllib.linalg import Vectors
0031 from pyspark.mllib.regression import LabeledPoint
0032 from pyspark.mllib.regression import StreamingLinearRegressionWithSGD
0033
0034
0035 if __name__ == "__main__":
0036 if len(sys.argv) != 3:
0037 print("Usage: streaming_linear_regression_example.py <trainingDir> <testDir>",
0038 file=sys.stderr)
0039 sys.exit(-1)
0040
0041 sc = SparkContext(appName="PythonLogisticRegressionWithLBFGSExample")
0042 ssc = StreamingContext(sc, 1)
0043
0044
0045 def parse(lp):
0046 label = float(lp[lp.find('(') + 1: lp.find(',')])
0047 vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
0048 return LabeledPoint(label, vec)
0049
0050 trainingData = ssc.textFileStream(sys.argv[1]).map(parse).cache()
0051 testData = ssc.textFileStream(sys.argv[2]).map(parse)
0052
0053 numFeatures = 3
0054 model = StreamingLinearRegressionWithSGD()
0055 model.setInitialWeights([0.0, 0.0, 0.0])
0056
0057 model.trainOn(trainingData)
0058 print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
0059
0060 ssc.start()
0061 ssc.awaitTermination()
0062