Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 A logistic regression implementation that uses NumPy (http://www.numpy.org)
0020 to act on batches of input data using efficient matrix operations.
0021 
0022 In practice, one may prefer to use the LogisticRegression algorithm in
0023 ML, as shown in examples/src/main/python/ml/logistic_regression_with_elastic_net.py.
0024 """
0025 from __future__ import print_function
0026 
0027 import sys
0028 
0029 import numpy as np
0030 from pyspark.sql import SparkSession
0031 
0032 
0033 D = 10  # Number of dimensions
0034 
0035 
0036 # Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
0037 # make further computations faster.
0038 # The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
0039 # into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
0040 def readPointBatch(iterator):
0041     strs = list(iterator)
0042     matrix = np.zeros((len(strs), D + 1))
0043     for i, s in enumerate(strs):
0044         matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ')
0045     return [matrix]
0046 
0047 if __name__ == "__main__":
0048 
0049     if len(sys.argv) != 3:
0050         print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
0051         sys.exit(-1)
0052 
0053     print("""WARN: This is a naive implementation of Logistic Regression and is
0054       given as an example!
0055       Please refer to examples/src/main/python/ml/logistic_regression_with_elastic_net.py
0056       to see how ML's implementation is used.""", file=sys.stderr)
0057 
0058     spark = SparkSession\
0059         .builder\
0060         .appName("PythonLR")\
0061         .getOrCreate()
0062 
0063     points = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])\
0064         .mapPartitions(readPointBatch).cache()
0065     iterations = int(sys.argv[2])
0066 
0067     # Initialize w to a random value
0068     w = 2 * np.random.ranf(size=D) - 1
0069     print("Initial w: " + str(w))
0070 
0071     # Compute logistic regression gradient for a matrix of data points
0072     def gradient(matrix, w):
0073         Y = matrix[:, 0]    # point labels (first column of input file)
0074         X = matrix[:, 1:]   # point coordinates
0075         # For each point (x, y), compute gradient function, then sum these up
0076         return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
0077 
0078     def add(x, y):
0079         x += y
0080         return x
0081 
0082     for i in range(iterations):
0083         print("On iteration %i" % (i + 1))
0084         w -= points.map(lambda m: gradient(m, w)).reduce(add)
0085 
0086     print("Final w: " + str(w))
0087 
0088     spark.stop()