0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 A logistic regression implementation that uses NumPy (http://www.numpy.org)
0020 to act on batches of input data using efficient matrix operations.
0021
0022 In practice, one may prefer to use the LogisticRegression algorithm in
0023 ML, as shown in examples/src/main/python/ml/logistic_regression_with_elastic_net.py.
0024 """
0025 from __future__ import print_function
0026
0027 import sys
0028
0029 import numpy as np
0030 from pyspark.sql import SparkSession
0031
0032
0033 D = 10
0034
0035
0036
0037
0038
0039
0040 def readPointBatch(iterator):
0041 strs = list(iterator)
0042 matrix = np.zeros((len(strs), D + 1))
0043 for i, s in enumerate(strs):
0044 matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ')
0045 return [matrix]
0046
0047 if __name__ == "__main__":
0048
0049 if len(sys.argv) != 3:
0050 print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
0051 sys.exit(-1)
0052
0053 print("""WARN: This is a naive implementation of Logistic Regression and is
0054 given as an example!
0055 Please refer to examples/src/main/python/ml/logistic_regression_with_elastic_net.py
0056 to see how ML's implementation is used.""", file=sys.stderr)
0057
0058 spark = SparkSession\
0059 .builder\
0060 .appName("PythonLR")\
0061 .getOrCreate()
0062
0063 points = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])\
0064 .mapPartitions(readPointBatch).cache()
0065 iterations = int(sys.argv[2])
0066
0067
0068 w = 2 * np.random.ranf(size=D) - 1
0069 print("Initial w: " + str(w))
0070
0071
0072 def gradient(matrix, w):
0073 Y = matrix[:, 0]
0074 X = matrix[:, 1:]
0075
0076 return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
0077
0078 def add(x, y):
0079 x += y
0080 return x
0081
0082 for i in range(iterations):
0083 print("On iteration %i" % (i + 1))
0084 w -= points.map(lambda m: gradient(m, w)).reduce(add)
0085
0086 print("Final w: " + str(w))
0087
0088 spark.stop()