Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import sys
0019 import numpy as np
0020 
0021 if sys.version > '3':
0022     xrange = range
0023     basestring = str
0024 
0025 from pyspark import SparkContext, since
0026 from pyspark.mllib.common import callMLlibFunc, inherit_doc
0027 from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
0028 from pyspark.sql import DataFrame
0029 
0030 
0031 class MLUtils(object):
0032 
0033     """
0034     Helper methods to load, save and pre-process data used in MLlib.
0035 
0036     .. versionadded:: 1.0.0
0037     """
0038 
0039     @staticmethod
0040     def _parse_libsvm_line(line):
0041         """
0042         Parses a line in LIBSVM format into (label, indices, values).
0043         """
0044         items = line.split(None)
0045         label = float(items[0])
0046         nnz = len(items) - 1
0047         indices = np.zeros(nnz, dtype=np.int32)
0048         values = np.zeros(nnz)
0049         for i in xrange(nnz):
0050             index, value = items[1 + i].split(":")
0051             indices[i] = int(index) - 1
0052             values[i] = float(value)
0053         return label, indices, values
0054 
0055     @staticmethod
0056     def _convert_labeled_point_to_libsvm(p):
0057         """Converts a LabeledPoint to a string in LIBSVM format."""
0058         from pyspark.mllib.regression import LabeledPoint
0059         assert isinstance(p, LabeledPoint)
0060         items = [str(p.label)]
0061         v = _convert_to_vector(p.features)
0062         if isinstance(v, SparseVector):
0063             nnz = len(v.indices)
0064             for i in xrange(nnz):
0065                 items.append(str(v.indices[i] + 1) + ":" + str(v.values[i]))
0066         else:
0067             for i in xrange(len(v)):
0068                 items.append(str(i + 1) + ":" + str(v[i]))
0069         return " ".join(items)
0070 
0071     @staticmethod
0072     @since("1.0.0")
0073     def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None):
0074         """
0075         Loads labeled data in the LIBSVM format into an RDD of
0076         LabeledPoint. The LIBSVM format is a text-based format used by
0077         LIBSVM and LIBLINEAR. Each line represents a labeled sparse
0078         feature vector using the following format:
0079 
0080         label index1:value1 index2:value2 ...
0081 
0082         where the indices are one-based and in ascending order. This
0083         method parses each line into a LabeledPoint, where the feature
0084         indices are converted to zero-based.
0085 
0086         :param sc: Spark context
0087         :param path: file or directory path in any Hadoop-supported file
0088                      system URI
0089         :param numFeatures: number of features, which will be determined
0090                             from the input data if a nonpositive value
0091                             is given. This is useful when the dataset is
0092                             already split into multiple files and you
0093                             want to load them separately, because some
0094                             features may not present in certain files,
0095                             which leads to inconsistent feature
0096                             dimensions.
0097         :param minPartitions: min number of partitions
0098         :return: labeled data stored as an RDD of LabeledPoint
0099 
0100         >>> from tempfile import NamedTemporaryFile
0101         >>> from pyspark.mllib.util import MLUtils
0102         >>> from pyspark.mllib.regression import LabeledPoint
0103         >>> tempFile = NamedTemporaryFile(delete=True)
0104         >>> _ = tempFile.write(b"+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0")
0105         >>> tempFile.flush()
0106         >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
0107         >>> tempFile.close()
0108         >>> examples[0]
0109         LabeledPoint(1.0, (6,[0,2,4],[1.0,2.0,3.0]))
0110         >>> examples[1]
0111         LabeledPoint(-1.0, (6,[],[]))
0112         >>> examples[2]
0113         LabeledPoint(-1.0, (6,[1,3,5],[4.0,5.0,6.0]))
0114         """
0115         from pyspark.mllib.regression import LabeledPoint
0116 
0117         lines = sc.textFile(path, minPartitions)
0118         parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l))
0119         if numFeatures <= 0:
0120             parsed.cache()
0121             numFeatures = parsed.map(lambda x: -1 if x[1].size == 0 else x[1][-1]).reduce(max) + 1
0122         return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2])))
0123 
0124     @staticmethod
0125     @since("1.0.0")
0126     def saveAsLibSVMFile(data, dir):
0127         """
0128         Save labeled data in LIBSVM format.
0129 
0130         :param data: an RDD of LabeledPoint to be saved
0131         :param dir: directory to save the data
0132 
0133         >>> from tempfile import NamedTemporaryFile
0134         >>> from fileinput import input
0135         >>> from pyspark.mllib.regression import LabeledPoint
0136         >>> from glob import glob
0137         >>> from pyspark.mllib.util import MLUtils
0138         >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])),
0139         ...             LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
0140         >>> tempFile = NamedTemporaryFile(delete=True)
0141         >>> tempFile.close()
0142         >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name)
0143         >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
0144         '0.0 1:1.01 2:2.02 3:3.03\\n1.1 1:1.23 3:4.56\\n'
0145         """
0146         lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p))
0147         lines.saveAsTextFile(dir)
0148 
0149     @staticmethod
0150     @since("1.1.0")
0151     def loadLabeledPoints(sc, path, minPartitions=None):
0152         """
0153         Load labeled points saved using RDD.saveAsTextFile.
0154 
0155         :param sc: Spark context
0156         :param path: file or directory path in any Hadoop-supported file
0157                      system URI
0158         :param minPartitions: min number of partitions
0159         :return: labeled data stored as an RDD of LabeledPoint
0160 
0161         >>> from tempfile import NamedTemporaryFile
0162         >>> from pyspark.mllib.util import MLUtils
0163         >>> from pyspark.mllib.regression import LabeledPoint
0164         >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])),
0165         ...             LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
0166         >>> tempFile = NamedTemporaryFile(delete=True)
0167         >>> tempFile.close()
0168         >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
0169         >>> MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
0170         [LabeledPoint(1.1, (3,[0,2],[-1.23,4.56e-07])), LabeledPoint(0.0, [1.01,2.02,3.03])]
0171         """
0172         minPartitions = minPartitions or min(sc.defaultParallelism, 2)
0173         return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)
0174 
0175     @staticmethod
0176     @since("1.5.0")
0177     def appendBias(data):
0178         """
0179         Returns a new vector with `1.0` (bias) appended to
0180         the end of the input vector.
0181         """
0182         vec = _convert_to_vector(data)
0183         if isinstance(vec, SparseVector):
0184             newIndices = np.append(vec.indices, len(vec))
0185             newValues = np.append(vec.values, 1.0)
0186             return SparseVector(len(vec) + 1, newIndices, newValues)
0187         else:
0188             return _convert_to_vector(np.append(vec.toArray(), 1.0))
0189 
0190     @staticmethod
0191     @since("1.5.0")
0192     def loadVectors(sc, path):
0193         """
0194         Loads vectors saved using `RDD[Vector].saveAsTextFile`
0195         with the default number of partitions.
0196         """
0197         return callMLlibFunc("loadVectors", sc, path)
0198 
0199     @staticmethod
0200     @since("2.0.0")
0201     def convertVectorColumnsToML(dataset, *cols):
0202         """
0203         Converts vector columns in an input DataFrame from the
0204         :py:class:`pyspark.mllib.linalg.Vector` type to the new
0205         :py:class:`pyspark.ml.linalg.Vector` type under the `spark.ml`
0206         package.
0207 
0208         :param dataset:
0209           input dataset
0210         :param cols:
0211           a list of vector columns to be converted.
0212           New vector columns will be ignored. If unspecified, all old
0213           vector columns will be converted excepted nested ones.
0214         :return:
0215           the input dataset with old vector columns converted to the
0216           new vector type
0217 
0218         >>> import pyspark
0219         >>> from pyspark.mllib.linalg import Vectors
0220         >>> from pyspark.mllib.util import MLUtils
0221         >>> df = spark.createDataFrame(
0222         ...     [(0, Vectors.sparse(2, [1], [1.0]), Vectors.dense(2.0, 3.0))],
0223         ...     ["id", "x", "y"])
0224         >>> r1 = MLUtils.convertVectorColumnsToML(df).first()
0225         >>> isinstance(r1.x, pyspark.ml.linalg.SparseVector)
0226         True
0227         >>> isinstance(r1.y, pyspark.ml.linalg.DenseVector)
0228         True
0229         >>> r2 = MLUtils.convertVectorColumnsToML(df, "x").first()
0230         >>> isinstance(r2.x, pyspark.ml.linalg.SparseVector)
0231         True
0232         >>> isinstance(r2.y, pyspark.mllib.linalg.DenseVector)
0233         True
0234         """
0235         if not isinstance(dataset, DataFrame):
0236             raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset)))
0237         return callMLlibFunc("convertVectorColumnsToML", dataset, list(cols))
0238 
0239     @staticmethod
0240     @since("2.0.0")
0241     def convertVectorColumnsFromML(dataset, *cols):
0242         """
0243         Converts vector columns in an input DataFrame to the
0244         :py:class:`pyspark.mllib.linalg.Vector` type from the new
0245         :py:class:`pyspark.ml.linalg.Vector` type under the `spark.ml`
0246         package.
0247 
0248         :param dataset:
0249           input dataset
0250         :param cols:
0251           a list of vector columns to be converted.
0252           Old vector columns will be ignored. If unspecified, all new
0253           vector columns will be converted except nested ones.
0254         :return:
0255           the input dataset with new vector columns converted to the
0256           old vector type
0257 
0258         >>> import pyspark
0259         >>> from pyspark.ml.linalg import Vectors
0260         >>> from pyspark.mllib.util import MLUtils
0261         >>> df = spark.createDataFrame(
0262         ...     [(0, Vectors.sparse(2, [1], [1.0]), Vectors.dense(2.0, 3.0))],
0263         ...     ["id", "x", "y"])
0264         >>> r1 = MLUtils.convertVectorColumnsFromML(df).first()
0265         >>> isinstance(r1.x, pyspark.mllib.linalg.SparseVector)
0266         True
0267         >>> isinstance(r1.y, pyspark.mllib.linalg.DenseVector)
0268         True
0269         >>> r2 = MLUtils.convertVectorColumnsFromML(df, "x").first()
0270         >>> isinstance(r2.x, pyspark.mllib.linalg.SparseVector)
0271         True
0272         >>> isinstance(r2.y, pyspark.ml.linalg.DenseVector)
0273         True
0274         """
0275         if not isinstance(dataset, DataFrame):
0276             raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset)))
0277         return callMLlibFunc("convertVectorColumnsFromML", dataset, list(cols))
0278 
0279     @staticmethod
0280     @since("2.0.0")
0281     def convertMatrixColumnsToML(dataset, *cols):
0282         """
0283         Converts matrix columns in an input DataFrame from the
0284         :py:class:`pyspark.mllib.linalg.Matrix` type to the new
0285         :py:class:`pyspark.ml.linalg.Matrix` type under the `spark.ml`
0286         package.
0287 
0288         :param dataset:
0289           input dataset
0290         :param cols:
0291           a list of matrix columns to be converted.
0292           New matrix columns will be ignored. If unspecified, all old
0293           matrix columns will be converted excepted nested ones.
0294         :return:
0295           the input dataset with old matrix columns converted to the
0296           new matrix type
0297 
0298         >>> import pyspark
0299         >>> from pyspark.mllib.linalg import Matrices
0300         >>> from pyspark.mllib.util import MLUtils
0301         >>> df = spark.createDataFrame(
0302         ...     [(0, Matrices.sparse(2, 2, [0, 2, 3], [0, 1, 1], [2, 3, 4]),
0303         ...     Matrices.dense(2, 2, range(4)))], ["id", "x", "y"])
0304         >>> r1 = MLUtils.convertMatrixColumnsToML(df).first()
0305         >>> isinstance(r1.x, pyspark.ml.linalg.SparseMatrix)
0306         True
0307         >>> isinstance(r1.y, pyspark.ml.linalg.DenseMatrix)
0308         True
0309         >>> r2 = MLUtils.convertMatrixColumnsToML(df, "x").first()
0310         >>> isinstance(r2.x, pyspark.ml.linalg.SparseMatrix)
0311         True
0312         >>> isinstance(r2.y, pyspark.mllib.linalg.DenseMatrix)
0313         True
0314         """
0315         if not isinstance(dataset, DataFrame):
0316             raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset)))
0317         return callMLlibFunc("convertMatrixColumnsToML", dataset, list(cols))
0318 
0319     @staticmethod
0320     @since("2.0.0")
0321     def convertMatrixColumnsFromML(dataset, *cols):
0322         """
0323         Converts matrix columns in an input DataFrame to the
0324         :py:class:`pyspark.mllib.linalg.Matrix` type from the new
0325         :py:class:`pyspark.ml.linalg.Matrix` type under the `spark.ml`
0326         package.
0327 
0328         :param dataset:
0329           input dataset
0330         :param cols:
0331           a list of matrix columns to be converted.
0332           Old matrix columns will be ignored. If unspecified, all new
0333           matrix columns will be converted except nested ones.
0334         :return:
0335           the input dataset with new matrix columns converted to the
0336           old matrix type
0337 
0338         >>> import pyspark
0339         >>> from pyspark.ml.linalg import Matrices
0340         >>> from pyspark.mllib.util import MLUtils
0341         >>> df = spark.createDataFrame(
0342         ...     [(0, Matrices.sparse(2, 2, [0, 2, 3], [0, 1, 1], [2, 3, 4]),
0343         ...     Matrices.dense(2, 2, range(4)))], ["id", "x", "y"])
0344         >>> r1 = MLUtils.convertMatrixColumnsFromML(df).first()
0345         >>> isinstance(r1.x, pyspark.mllib.linalg.SparseMatrix)
0346         True
0347         >>> isinstance(r1.y, pyspark.mllib.linalg.DenseMatrix)
0348         True
0349         >>> r2 = MLUtils.convertMatrixColumnsFromML(df, "x").first()
0350         >>> isinstance(r2.x, pyspark.mllib.linalg.SparseMatrix)
0351         True
0352         >>> isinstance(r2.y, pyspark.ml.linalg.DenseMatrix)
0353         True
0354         """
0355         if not isinstance(dataset, DataFrame):
0356             raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset)))
0357         return callMLlibFunc("convertMatrixColumnsFromML", dataset, list(cols))
0358 
0359 
0360 class Saveable(object):
0361     """
0362     Mixin for models and transformers which may be saved as files.
0363 
0364     .. versionadded:: 1.3.0
0365     """
0366 
0367     def save(self, sc, path):
0368         """
0369         Save this model to the given path.
0370 
0371         This saves:
0372          * human-readable (JSON) model metadata to path/metadata/
0373          * Parquet formatted data to path/data/
0374 
0375         The model may be loaded using :py:meth:`Loader.load`.
0376 
0377         :param sc: Spark context used to save model data.
0378         :param path: Path specifying the directory in which to save
0379                      this model. If the directory already exists,
0380                      this method throws an exception.
0381         """
0382         raise NotImplementedError
0383 
0384 
0385 @inherit_doc
0386 class JavaSaveable(Saveable):
0387     """
0388     Mixin for models that provide save() through their Scala
0389     implementation.
0390 
0391     .. versionadded:: 1.3.0
0392     """
0393 
0394     @since("1.3.0")
0395     def save(self, sc, path):
0396         """Save this model to the given path."""
0397         if not isinstance(sc, SparkContext):
0398             raise TypeError("sc should be a SparkContext, got type %s" % type(sc))
0399         if not isinstance(path, basestring):
0400             raise TypeError("path should be a basestring, got type %s" % type(path))
0401         self._java_model.save(sc._jsc.sc(), path)
0402 
0403 
0404 class Loader(object):
0405     """
0406     Mixin for classes which can load saved models from files.
0407 
0408     .. versionadded:: 1.3.0
0409     """
0410 
0411     @classmethod
0412     def load(cls, sc, path):
0413         """
0414         Load a model from the given path. The model should have been
0415         saved using :py:meth:`Saveable.save`.
0416 
0417         :param sc: Spark context used for loading model files.
0418         :param path: Path specifying the directory to which the model
0419                      was saved.
0420         :return: model instance
0421         """
0422         raise NotImplementedError
0423 
0424 
0425 @inherit_doc
0426 class JavaLoader(Loader):
0427     """
0428     Mixin for classes which can load saved models using its Scala
0429     implementation.
0430 
0431     .. versionadded:: 1.3.0
0432     """
0433 
0434     @classmethod
0435     def _java_loader_class(cls):
0436         """
0437         Returns the full class name of the Java loader. The default
0438         implementation replaces "pyspark" by "org.apache.spark" in
0439         the Python full class name.
0440         """
0441         java_package = cls.__module__.replace("pyspark", "org.apache.spark")
0442         return ".".join([java_package, cls.__name__])
0443 
0444     @classmethod
0445     def _load_java(cls, sc, path):
0446         """
0447         Load a Java model from the given path.
0448         """
0449         java_class = cls._java_loader_class()
0450         java_obj = sc._jvm
0451         for name in java_class.split("."):
0452             java_obj = getattr(java_obj, name)
0453         return java_obj.load(sc._jsc.sc(), path)
0454 
0455     @classmethod
0456     @since("1.3.0")
0457     def load(cls, sc, path):
0458         """Load a model from the given path."""
0459         java_model = cls._load_java(sc, path)
0460         return cls(java_model)
0461 
0462 
0463 class LinearDataGenerator(object):
0464     """Utils for generating linear data.
0465 
0466     .. versionadded:: 1.5.0
0467     """
0468 
0469     @staticmethod
0470     @since("1.5.0")
0471     def generateLinearInput(intercept, weights, xMean, xVariance,
0472                             nPoints, seed, eps):
0473         """
0474         :param: intercept bias factor, the term c in X'w + c
0475         :param: weights   feature vector, the term w in X'w + c
0476         :param: xMean     Point around which the data X is centered.
0477         :param: xVariance Variance of the given data
0478         :param: nPoints   Number of points to be generated
0479         :param: seed      Random Seed
0480         :param: eps       Used to scale the noise. If eps is set high,
0481                           the amount of gaussian noise added is more.
0482 
0483         Returns a list of LabeledPoints of length nPoints
0484         """
0485         weights = [float(weight) for weight in weights]
0486         xMean = [float(mean) for mean in xMean]
0487         xVariance = [float(var) for var in xVariance]
0488         return list(callMLlibFunc(
0489             "generateLinearInputWrapper", float(intercept), weights, xMean,
0490             xVariance, int(nPoints), int(seed), float(eps)))
0491 
0492     @staticmethod
0493     @since("1.5.0")
0494     def generateLinearRDD(sc, nexamples, nfeatures, eps,
0495                           nParts=2, intercept=0.0):
0496         """
0497         Generate an RDD of LabeledPoints.
0498         """
0499         return callMLlibFunc(
0500             "generateLinearRDDWrapper", sc, int(nexamples), int(nfeatures),
0501             float(eps), int(nParts), float(intercept))
0502 
0503 
0504 def _test():
0505     import doctest
0506     from pyspark.sql import SparkSession
0507     globs = globals().copy()
0508     # The small batch size here ensures that we see multiple batches,
0509     # even in these small test examples:
0510     spark = SparkSession.builder\
0511         .master("local[2]")\
0512         .appName("mllib.util tests")\
0513         .getOrCreate()
0514     globs['spark'] = spark
0515     globs['sc'] = spark.sparkContext
0516     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0517     spark.stop()
0518     if failure_count:
0519         sys.exit(-1)
0520 
0521 
0522 if __name__ == "__main__":
0523     _test()