Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import sys
0019 import array as pyarray
0020 import warnings
0021 
0022 if sys.version > '3':
0023     xrange = range
0024     basestring = str
0025 
0026 from math import exp, log
0027 
0028 from numpy import array, random, tile
0029 
0030 from collections import namedtuple
0031 
0032 from pyspark import SparkContext, since
0033 from pyspark.rdd import RDD, ignore_unicode_prefix
0034 from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
0035 from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
0036 from pyspark.mllib.stat.distribution import MultivariateGaussian
0037 from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
0038 from pyspark.streaming import DStream
0039 
0040 __all__ = ['BisectingKMeansModel', 'BisectingKMeans', 'KMeansModel', 'KMeans',
0041            'GaussianMixtureModel', 'GaussianMixture', 'PowerIterationClusteringModel',
0042            'PowerIterationClustering', 'StreamingKMeans', 'StreamingKMeansModel',
0043            'LDA', 'LDAModel']
0044 
0045 
0046 @inherit_doc
0047 class BisectingKMeansModel(JavaModelWrapper):
0048     """
0049     A clustering model derived from the bisecting k-means method.
0050 
0051     >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
0052     >>> bskm = BisectingKMeans()
0053     >>> model = bskm.train(sc.parallelize(data, 2), k=4)
0054     >>> p = array([0.0, 0.0])
0055     >>> model.predict(p)
0056     0
0057     >>> model.k
0058     4
0059     >>> model.computeCost(p)
0060     0.0
0061 
0062     .. versionadded:: 2.0.0
0063     """
0064 
0065     def __init__(self, java_model):
0066         super(BisectingKMeansModel, self).__init__(java_model)
0067         self.centers = [c.toArray() for c in self.call("clusterCenters")]
0068 
0069     @property
0070     @since('2.0.0')
0071     def clusterCenters(self):
0072         """Get the cluster centers, represented as a list of NumPy
0073         arrays."""
0074         return self.centers
0075 
0076     @property
0077     @since('2.0.0')
0078     def k(self):
0079         """Get the number of clusters"""
0080         return self.call("k")
0081 
0082     @since('2.0.0')
0083     def predict(self, x):
0084         """
0085         Find the cluster that each of the points belongs to in this
0086         model.
0087 
0088         :param x:
0089           A data point (or RDD of points) to determine cluster index.
0090         :return:
0091           Predicted cluster index or an RDD of predicted cluster indices
0092           if the input is an RDD.
0093         """
0094         if isinstance(x, RDD):
0095             vecs = x.map(_convert_to_vector)
0096             return self.call("predict", vecs)
0097 
0098         x = _convert_to_vector(x)
0099         return self.call("predict", x)
0100 
0101     @since('2.0.0')
0102     def computeCost(self, x):
0103         """
0104         Return the Bisecting K-means cost (sum of squared distances of
0105         points to their nearest center) for this model on the given
0106         data. If provided with an RDD of points returns the sum.
0107 
0108         :param point:
0109           A data point (or RDD of points) to compute the cost(s).
0110         """
0111         if isinstance(x, RDD):
0112             vecs = x.map(_convert_to_vector)
0113             return self.call("computeCost", vecs)
0114 
0115         return self.call("computeCost", _convert_to_vector(x))
0116 
0117 
0118 class BisectingKMeans(object):
0119     """
0120     A bisecting k-means algorithm based on the paper "A comparison of
0121     document clustering techniques" by Steinbach, Karypis, and Kumar,
0122     with modification to fit Spark.
0123     The algorithm starts from a single cluster that contains all points.
0124     Iteratively it finds divisible clusters on the bottom level and
0125     bisects each of them using k-means, until there are `k` leaf
0126     clusters in total or no leaf clusters are divisible.
0127     The bisecting steps of clusters on the same level are grouped
0128     together to increase parallelism. If bisecting all divisible
0129     clusters on the bottom level would result more than `k` leaf
0130     clusters, larger clusters get higher priority.
0131 
0132     Based on
0133     `Steinbach, Karypis, and Kumar, A comparison of document clustering
0134     techniques, KDD Workshop on Text Mining, 2000
0135     <http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf>`_.
0136 
0137     .. versionadded:: 2.0.0
0138     """
0139 
0140     @classmethod
0141     @since('2.0.0')
0142     def train(self, rdd, k=4, maxIterations=20, minDivisibleClusterSize=1.0, seed=-1888008604):
0143         """
0144         Runs the bisecting k-means algorithm return the model.
0145 
0146         :param rdd:
0147           Training points as an `RDD` of `Vector` or convertible
0148           sequence types.
0149         :param k:
0150           The desired number of leaf clusters. The actual number could
0151           be smaller if there are no divisible leaf clusters.
0152           (default: 4)
0153         :param maxIterations:
0154           Maximum number of iterations allowed to split clusters.
0155           (default: 20)
0156         :param minDivisibleClusterSize:
0157           Minimum number of points (if >= 1.0) or the minimum proportion
0158           of points (if < 1.0) of a divisible cluster.
0159           (default: 1)
0160         :param seed:
0161           Random seed value for cluster initialization.
0162           (default: -1888008604 from classOf[BisectingKMeans].getName.##)
0163         """
0164         java_model = callMLlibFunc(
0165             "trainBisectingKMeans", rdd.map(_convert_to_vector),
0166             k, maxIterations, minDivisibleClusterSize, seed)
0167         return BisectingKMeansModel(java_model)
0168 
0169 
0170 @inherit_doc
0171 class KMeansModel(Saveable, Loader):
0172 
0173     """A clustering model derived from the k-means method.
0174 
0175     >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
0176     >>> model = KMeans.train(
0177     ...     sc.parallelize(data), 2, maxIterations=10, initializationMode="random",
0178     ...                    seed=50, initializationSteps=5, epsilon=1e-4)
0179     >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
0180     True
0181     >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
0182     True
0183     >>> model.k
0184     2
0185     >>> model.computeCost(sc.parallelize(data))
0186     2.0
0187     >>> model = KMeans.train(sc.parallelize(data), 2)
0188     >>> sparse_data = [
0189     ...     SparseVector(3, {1: 1.0}),
0190     ...     SparseVector(3, {1: 1.1}),
0191     ...     SparseVector(3, {2: 1.0}),
0192     ...     SparseVector(3, {2: 1.1})
0193     ... ]
0194     >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||",
0195     ...                                     seed=50, initializationSteps=5, epsilon=1e-4)
0196     >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
0197     True
0198     >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
0199     True
0200     >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1])
0201     True
0202     >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
0203     True
0204     >>> isinstance(model.clusterCenters, list)
0205     True
0206     >>> import os, tempfile
0207     >>> path = tempfile.mkdtemp()
0208     >>> model.save(sc, path)
0209     >>> sameModel = KMeansModel.load(sc, path)
0210     >>> sameModel.predict(sparse_data[0]) == model.predict(sparse_data[0])
0211     True
0212     >>> from shutil import rmtree
0213     >>> try:
0214     ...     rmtree(path)
0215     ... except OSError:
0216     ...     pass
0217 
0218     >>> data = array([-383.1,-382.9, 28.7,31.2, 366.2,367.3]).reshape(3, 2)
0219     >>> model = KMeans.train(sc.parallelize(data), 3, maxIterations=0,
0220     ...     initialModel = KMeansModel([(-1000.0,-1000.0),(5.0,5.0),(1000.0,1000.0)]))
0221     >>> model.clusterCenters
0222     [array([-1000., -1000.]), array([ 5.,  5.]), array([ 1000.,  1000.])]
0223 
0224     .. versionadded:: 0.9.0
0225     """
0226 
0227     def __init__(self, centers):
0228         self.centers = centers
0229 
0230     @property
0231     @since('1.0.0')
0232     def clusterCenters(self):
0233         """Get the cluster centers, represented as a list of NumPy arrays."""
0234         return self.centers
0235 
0236     @property
0237     @since('1.4.0')
0238     def k(self):
0239         """Total number of clusters."""
0240         return len(self.centers)
0241 
0242     @since('0.9.0')
0243     def predict(self, x):
0244         """
0245         Find the cluster that each of the points belongs to in this
0246         model.
0247 
0248         :param x:
0249           A data point (or RDD of points) to determine cluster index.
0250         :return:
0251           Predicted cluster index or an RDD of predicted cluster indices
0252           if the input is an RDD.
0253         """
0254         best = 0
0255         best_distance = float("inf")
0256         if isinstance(x, RDD):
0257             return x.map(self.predict)
0258 
0259         x = _convert_to_vector(x)
0260         for i in xrange(len(self.centers)):
0261             distance = x.squared_distance(self.centers[i])
0262             if distance < best_distance:
0263                 best = i
0264                 best_distance = distance
0265         return best
0266 
0267     @since('1.4.0')
0268     def computeCost(self, rdd):
0269         """
0270         Return the K-means cost (sum of squared distances of points to
0271         their nearest center) for this model on the given
0272         data.
0273 
0274         :param rdd:
0275           The RDD of points to compute the cost on.
0276         """
0277         cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector),
0278                              [_convert_to_vector(c) for c in self.centers])
0279         return cost
0280 
0281     @since('1.4.0')
0282     def save(self, sc, path):
0283         """
0284         Save this model to the given path.
0285         """
0286         java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers])
0287         java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers)
0288         java_model.save(sc._jsc.sc(), path)
0289 
0290     @classmethod
0291     @since('1.4.0')
0292     def load(cls, sc, path):
0293         """
0294         Load a model from the given path.
0295         """
0296         java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel.load(sc._jsc.sc(), path)
0297         return KMeansModel(_java2py(sc, java_model.clusterCenters()))
0298 
0299 
0300 class KMeans(object):
0301     """
0302     .. versionadded:: 0.9.0
0303     """
0304 
0305     @classmethod
0306     @since('0.9.0')
0307     def train(cls, rdd, k, maxIterations=100, initializationMode="k-means||",
0308               seed=None, initializationSteps=2, epsilon=1e-4, initialModel=None):
0309         """
0310         Train a k-means clustering model.
0311 
0312         :param rdd:
0313           Training points as an `RDD` of `Vector` or convertible
0314           sequence types.
0315         :param k:
0316           Number of clusters to create.
0317         :param maxIterations:
0318           Maximum number of iterations allowed.
0319           (default: 100)
0320         :param initializationMode:
0321           The initialization algorithm. This can be either "random" or
0322           "k-means||".
0323           (default: "k-means||")
0324         :param seed:
0325           Random seed value for cluster initialization. Set as None to
0326           generate seed based on system time.
0327           (default: None)
0328         :param initializationSteps:
0329           Number of steps for the k-means|| initialization mode.
0330           This is an advanced setting -- the default of 2 is almost
0331           always enough.
0332           (default: 2)
0333         :param epsilon:
0334           Distance threshold within which a center will be considered to
0335           have converged. If all centers move less than this Euclidean
0336           distance, iterations are stopped.
0337           (default: 1e-4)
0338         :param initialModel:
0339           Initial cluster centers can be provided as a KMeansModel object
0340           rather than using the random or k-means|| initializationModel.
0341           (default: None)
0342         """
0343         clusterInitialModel = []
0344         if initialModel is not None:
0345             if not isinstance(initialModel, KMeansModel):
0346                 raise Exception("initialModel is of "+str(type(initialModel))+". It needs "
0347                                 "to be of <type 'KMeansModel'>")
0348             clusterInitialModel = [_convert_to_vector(c) for c in initialModel.clusterCenters]
0349         model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
0350                               initializationMode, seed, initializationSteps, epsilon,
0351                               clusterInitialModel)
0352         centers = callJavaFunc(rdd.context, model.clusterCenters)
0353         return KMeansModel([c.toArray() for c in centers])
0354 
0355 
0356 @inherit_doc
0357 class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
0358 
0359     """
0360     A clustering model derived from the Gaussian Mixture Model method.
0361 
0362     >>> from pyspark.mllib.linalg import Vectors, DenseMatrix
0363     >>> from numpy.testing import assert_equal
0364     >>> from shutil import rmtree
0365     >>> import os, tempfile
0366 
0367     >>> clusterdata_1 =  sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
0368     ...                                         0.9,0.8,0.75,0.935,
0369     ...                                        -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2), 2)
0370     >>> model = GaussianMixture.train(clusterdata_1, 3, convergenceTol=0.0001,
0371     ...                                 maxIterations=50, seed=10)
0372     >>> labels = model.predict(clusterdata_1).collect()
0373     >>> labels[0]==labels[1]
0374     False
0375     >>> labels[1]==labels[2]
0376     False
0377     >>> labels[4]==labels[5]
0378     True
0379     >>> model.predict([-0.1,-0.05])
0380     0
0381     >>> softPredicted = model.predictSoft([-0.1,-0.05])
0382     >>> abs(softPredicted[0] - 1.0) < 0.03
0383     True
0384     >>> abs(softPredicted[1] - 0.0) < 0.03
0385     True
0386     >>> abs(softPredicted[2] - 0.0) < 0.03
0387     True
0388 
0389     >>> path = tempfile.mkdtemp()
0390     >>> model.save(sc, path)
0391     >>> sameModel = GaussianMixtureModel.load(sc, path)
0392     >>> assert_equal(model.weights, sameModel.weights)
0393     >>> mus, sigmas = list(
0394     ...     zip(*[(g.mu, g.sigma) for g in model.gaussians]))
0395     >>> sameMus, sameSigmas = list(
0396     ...     zip(*[(g.mu, g.sigma) for g in sameModel.gaussians]))
0397     >>> mus == sameMus
0398     True
0399     >>> sigmas == sameSigmas
0400     True
0401     >>> from shutil import rmtree
0402     >>> try:
0403     ...     rmtree(path)
0404     ... except OSError:
0405     ...     pass
0406 
0407     >>> data =  array([-5.1971, -2.5359, -3.8220,
0408     ...                -5.2211, -5.0602,  4.7118,
0409     ...                 6.8989, 3.4592,  4.6322,
0410     ...                 5.7048,  4.6567, 5.5026,
0411     ...                 4.5605,  5.2043,  6.2734])
0412     >>> clusterdata_2 = sc.parallelize(data.reshape(5,3))
0413     >>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
0414     ...                               maxIterations=150, seed=4)
0415     >>> labels = model.predict(clusterdata_2).collect()
0416     >>> labels[0]==labels[1]
0417     True
0418     >>> labels[2]==labels[3]==labels[4]
0419     True
0420 
0421     .. versionadded:: 1.3.0
0422     """
0423 
0424     @property
0425     @since('1.4.0')
0426     def weights(self):
0427         """
0428         Weights for each Gaussian distribution in the mixture, where weights[i] is
0429         the weight for Gaussian i, and weights.sum == 1.
0430         """
0431         return array(self.call("weights"))
0432 
0433     @property
0434     @since('1.4.0')
0435     def gaussians(self):
0436         """
0437         Array of MultivariateGaussian where gaussians[i] represents
0438         the Multivariate Gaussian (Normal) Distribution for Gaussian i.
0439         """
0440         return [
0441             MultivariateGaussian(gaussian[0], gaussian[1])
0442             for gaussian in self.call("gaussians")]
0443 
0444     @property
0445     @since('1.4.0')
0446     def k(self):
0447         """Number of gaussians in mixture."""
0448         return len(self.weights)
0449 
0450     @since('1.3.0')
0451     def predict(self, x):
0452         """
0453         Find the cluster to which the point 'x' or each point in RDD 'x'
0454         has maximum membership in this model.
0455 
0456         :param x:
0457           A feature vector or an RDD of vectors representing data points.
0458         :return:
0459           Predicted cluster label or an RDD of predicted cluster labels
0460           if the input is an RDD.
0461         """
0462         if isinstance(x, RDD):
0463             cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z)))
0464             return cluster_labels
0465         else:
0466             z = self.predictSoft(x)
0467             return z.argmax()
0468 
0469     @since('1.3.0')
0470     def predictSoft(self, x):
0471         """
0472         Find the membership of point 'x' or each point in RDD 'x' to all mixture components.
0473 
0474         :param x:
0475           A feature vector or an RDD of vectors representing data points.
0476         :return:
0477           The membership value to all mixture components for vector 'x'
0478           or each vector in RDD 'x'.
0479         """
0480         if isinstance(x, RDD):
0481             means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
0482             membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
0483                                               _convert_to_vector(self.weights), means, sigmas)
0484             return membership_matrix.map(lambda x: pyarray.array('d', x))
0485         else:
0486             return self.call("predictSoft", _convert_to_vector(x)).toArray()
0487 
0488     @classmethod
0489     @since('1.5.0')
0490     def load(cls, sc, path):
0491         """Load the GaussianMixtureModel from disk.
0492 
0493         :param sc:
0494           SparkContext.
0495         :param path:
0496           Path to where the model is stored.
0497         """
0498         model = cls._load_java(sc, path)
0499         wrapper = sc._jvm.org.apache.spark.mllib.api.python.GaussianMixtureModelWrapper(model)
0500         return cls(wrapper)
0501 
0502 
0503 class GaussianMixture(object):
0504     """
0505     Learning algorithm for Gaussian Mixtures using the expectation-maximization algorithm.
0506 
0507     .. versionadded:: 1.3.0
0508     """
0509     @classmethod
0510     @since('1.3.0')
0511     def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None):
0512         """
0513         Train a Gaussian Mixture clustering model.
0514 
0515         :param rdd:
0516           Training points as an `RDD` of `Vector` or convertible
0517           sequence types.
0518         :param k:
0519           Number of independent Gaussians in the mixture model.
0520         :param convergenceTol:
0521           Maximum change in log-likelihood at which convergence is
0522           considered to have occurred.
0523           (default: 1e-3)
0524         :param maxIterations:
0525           Maximum number of iterations allowed.
0526           (default: 100)
0527         :param seed:
0528           Random seed for initial Gaussian distribution. Set as None to
0529           generate seed based on system time.
0530           (default: None)
0531         :param initialModel:
0532           Initial GMM starting point, bypassing the random
0533           initialization.
0534           (default: None)
0535         """
0536         initialModelWeights = None
0537         initialModelMu = None
0538         initialModelSigma = None
0539         if initialModel is not None:
0540             if initialModel.k != k:
0541                 raise Exception("Mismatched cluster count, initialModel.k = %s, however k = %s"
0542                                 % (initialModel.k, k))
0543             initialModelWeights = list(initialModel.weights)
0544             initialModelMu = [initialModel.gaussians[i].mu for i in range(initialModel.k)]
0545             initialModelSigma = [initialModel.gaussians[i].sigma for i in range(initialModel.k)]
0546         java_model = callMLlibFunc("trainGaussianMixtureModel", rdd.map(_convert_to_vector),
0547                                    k, convergenceTol, maxIterations, seed,
0548                                    initialModelWeights, initialModelMu, initialModelSigma)
0549         return GaussianMixtureModel(java_model)
0550 
0551 
0552 class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
0553 
0554     """
0555     Model produced by [[PowerIterationClustering]].
0556 
0557     >>> import math
0558     >>> def genCircle(r, n):
0559     ...     points = []
0560     ...     for i in range(0, n):
0561     ...         theta = 2.0 * math.pi * i / n
0562     ...         points.append((r * math.cos(theta), r * math.sin(theta)))
0563     ...     return points
0564     >>> def sim(x, y):
0565     ...     dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
0566     ...     return math.exp(-dist2 / 2.0)
0567     >>> r1 = 1.0
0568     >>> n1 = 10
0569     >>> r2 = 4.0
0570     >>> n2 = 40
0571     >>> n = n1 + n2
0572     >>> points = genCircle(r1, n1) + genCircle(r2, n2)
0573     >>> similarities = [(i, j, sim(points[i], points[j])) for i in range(1, n) for j in range(0, i)]
0574     >>> rdd = sc.parallelize(similarities, 2)
0575     >>> model = PowerIterationClustering.train(rdd, 2, 40)
0576     >>> model.k
0577     2
0578     >>> result = sorted(model.assignments().collect(), key=lambda x: x.id)
0579     >>> result[0].cluster == result[1].cluster == result[2].cluster == result[3].cluster
0580     True
0581     >>> result[4].cluster == result[5].cluster == result[6].cluster == result[7].cluster
0582     True
0583     >>> import os, tempfile
0584     >>> path = tempfile.mkdtemp()
0585     >>> model.save(sc, path)
0586     >>> sameModel = PowerIterationClusteringModel.load(sc, path)
0587     >>> sameModel.k
0588     2
0589     >>> result = sorted(model.assignments().collect(), key=lambda x: x.id)
0590     >>> result[0].cluster == result[1].cluster == result[2].cluster == result[3].cluster
0591     True
0592     >>> result[4].cluster == result[5].cluster == result[6].cluster == result[7].cluster
0593     True
0594     >>> from shutil import rmtree
0595     >>> try:
0596     ...     rmtree(path)
0597     ... except OSError:
0598     ...     pass
0599 
0600     .. versionadded:: 1.5.0
0601     """
0602 
0603     @property
0604     @since('1.5.0')
0605     def k(self):
0606         """
0607         Returns the number of clusters.
0608         """
0609         return self.call("k")
0610 
0611     @since('1.5.0')
0612     def assignments(self):
0613         """
0614         Returns the cluster assignments of this model.
0615         """
0616         return self.call("getAssignments").map(
0617             lambda x: (PowerIterationClustering.Assignment(*x)))
0618 
0619     @classmethod
0620     @since('1.5.0')
0621     def load(cls, sc, path):
0622         """
0623         Load a model from the given path.
0624         """
0625         model = cls._load_java(sc, path)
0626         wrapper =\
0627             sc._jvm.org.apache.spark.mllib.api.python.PowerIterationClusteringModelWrapper(model)
0628         return PowerIterationClusteringModel(wrapper)
0629 
0630 
0631 class PowerIterationClustering(object):
0632     """
0633     Power Iteration Clustering (PIC), a scalable graph clustering algorithm
0634     developed by [[http://www.cs.cmu.edu/~frank/papers/icml2010-pic-final.pdf Lin and Cohen]].
0635     From the abstract: PIC finds a very low-dimensional embedding of a
0636     dataset using truncated power iteration on a normalized pair-wise
0637     similarity matrix of the data.
0638 
0639     .. versionadded:: 1.5.0
0640     """
0641 
0642     @classmethod
0643     @since('1.5.0')
0644     def train(cls, rdd, k, maxIterations=100, initMode="random"):
0645         r"""
0646         :param rdd:
0647           An RDD of (i, j, s\ :sub:`ij`\) tuples representing the
0648           affinity matrix, which is the matrix A in the PIC paper.  The
0649           similarity s\ :sub:`ij`\ must be nonnegative.  This is a symmetric
0650           matrix and hence s\ :sub:`ij`\ = s\ :sub:`ji`\  For any (i, j) with
0651           nonzero similarity, there should be either (i, j, s\ :sub:`ij`\) or
0652           (j, i, s\ :sub:`ji`\) in the input.  Tuples with i = j are ignored,
0653           because it is assumed s\ :sub:`ij`\ = 0.0.
0654         :param k:
0655           Number of clusters.
0656         :param maxIterations:
0657           Maximum number of iterations of the PIC algorithm.
0658           (default: 100)
0659         :param initMode:
0660           Initialization mode. This can be either "random" to use
0661           a random vector as vertex properties, or "degree" to use
0662           normalized sum similarities.
0663           (default: "random")
0664         """
0665         model = callMLlibFunc("trainPowerIterationClusteringModel",
0666                               rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
0667         return PowerIterationClusteringModel(model)
0668 
0669     class Assignment(namedtuple("Assignment", ["id", "cluster"])):
0670         """
0671         Represents an (id, cluster) tuple.
0672 
0673         .. versionadded:: 1.5.0
0674         """
0675 
0676 
0677 class StreamingKMeansModel(KMeansModel):
0678     """
0679     Clustering model which can perform an online update of the centroids.
0680 
0681     The update formula for each centroid is given by
0682 
0683     * c_t+1 = ((c_t * n_t * a) + (x_t * m_t)) / (n_t + m_t)
0684     * n_t+1 = n_t * a + m_t
0685 
0686     where
0687 
0688     * c_t: Centroid at the n_th iteration.
0689     * n_t: Number of samples (or) weights associated with the centroid
0690            at the n_th iteration.
0691     * x_t: Centroid of the new data closest to c_t.
0692     * m_t: Number of samples (or) weights of the new data closest to c_t
0693     * c_t+1: New centroid.
0694     * n_t+1: New number of weights.
0695     * a: Decay Factor, which gives the forgetfulness.
0696 
0697     .. note:: If a is set to 1, it is the weighted mean of the previous
0698         and new data. If it set to zero, the old centroids are completely
0699         forgotten.
0700 
0701     :param clusterCenters:
0702       Initial cluster centers.
0703     :param clusterWeights:
0704       List of weights assigned to each cluster.
0705 
0706     >>> initCenters = [[0.0, 0.0], [1.0, 1.0]]
0707     >>> initWeights = [1.0, 1.0]
0708     >>> stkm = StreamingKMeansModel(initCenters, initWeights)
0709     >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1],
0710     ...                        [0.9, 0.9], [1.1, 1.1]])
0711     >>> stkm = stkm.update(data, 1.0, u"batches")
0712     >>> stkm.centers
0713     array([[ 0.,  0.],
0714            [ 1.,  1.]])
0715     >>> stkm.predict([-0.1, -0.1])
0716     0
0717     >>> stkm.predict([0.9, 0.9])
0718     1
0719     >>> stkm.clusterWeights
0720     [3.0, 3.0]
0721     >>> decayFactor = 0.0
0722     >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])])
0723     >>> stkm = stkm.update(data, 0.0, u"batches")
0724     >>> stkm.centers
0725     array([[ 0.2,  0.2],
0726            [ 1.5,  1.5]])
0727     >>> stkm.clusterWeights
0728     [1.0, 1.0]
0729     >>> stkm.predict([0.2, 0.2])
0730     0
0731     >>> stkm.predict([1.5, 1.5])
0732     1
0733 
0734     .. versionadded:: 1.5.0
0735     """
0736     def __init__(self, clusterCenters, clusterWeights):
0737         super(StreamingKMeansModel, self).__init__(centers=clusterCenters)
0738         self._clusterWeights = list(clusterWeights)
0739 
0740     @property
0741     @since('1.5.0')
0742     def clusterWeights(self):
0743         """Return the cluster weights."""
0744         return self._clusterWeights
0745 
0746     @ignore_unicode_prefix
0747     @since('1.5.0')
0748     def update(self, data, decayFactor, timeUnit):
0749         """Update the centroids, according to data
0750 
0751         :param data:
0752           RDD with new data for the model update.
0753         :param decayFactor:
0754           Forgetfulness of the previous centroids.
0755         :param timeUnit:
0756           Can be "batches" or "points". If points, then the decay factor
0757           is raised to the power of number of new points and if batches,
0758           then decay factor will be used as is.
0759         """
0760         if not isinstance(data, RDD):
0761             raise TypeError("Data should be of an RDD, got %s." % type(data))
0762         data = data.map(_convert_to_vector)
0763         decayFactor = float(decayFactor)
0764         if timeUnit not in ["batches", "points"]:
0765             raise ValueError(
0766                 "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
0767         vectorCenters = [_convert_to_vector(center) for center in self.centers]
0768         updatedModel = callMLlibFunc(
0769             "updateStreamingKMeansModel", vectorCenters, self._clusterWeights,
0770             data, decayFactor, timeUnit)
0771         self.centers = array(updatedModel[0])
0772         self._clusterWeights = list(updatedModel[1])
0773         return self
0774 
0775 
0776 class StreamingKMeans(object):
0777     """
0778     Provides methods to set k, decayFactor, timeUnit to configure the
0779     KMeans algorithm for fitting and predicting on incoming dstreams.
0780     More details on how the centroids are updated are provided under the
0781     docs of StreamingKMeansModel.
0782 
0783     :param k:
0784       Number of clusters.
0785       (default: 2)
0786     :param decayFactor:
0787       Forgetfulness of the previous centroids.
0788       (default: 1.0)
0789     :param timeUnit:
0790       Can be "batches" or "points". If points, then the decay factor is
0791       raised to the power of number of new points and if batches, then
0792       decay factor will be used as is.
0793       (default: "batches")
0794 
0795     .. versionadded:: 1.5.0
0796     """
0797     def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"):
0798         self._k = k
0799         self._decayFactor = decayFactor
0800         if timeUnit not in ["batches", "points"]:
0801             raise ValueError(
0802                 "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
0803         self._timeUnit = timeUnit
0804         self._model = None
0805 
0806     @since('1.5.0')
0807     def latestModel(self):
0808         """Return the latest model"""
0809         return self._model
0810 
0811     def _validate(self, dstream):
0812         if self._model is None:
0813             raise ValueError(
0814                 "Initial centers should be set either by setInitialCenters "
0815                 "or setRandomCenters.")
0816         if not isinstance(dstream, DStream):
0817             raise TypeError(
0818                 "Expected dstream to be of type DStream, "
0819                 "got type %s" % type(dstream))
0820 
0821     @since('1.5.0')
0822     def setK(self, k):
0823         """Set number of clusters."""
0824         self._k = k
0825         return self
0826 
0827     @since('1.5.0')
0828     def setDecayFactor(self, decayFactor):
0829         """Set decay factor."""
0830         self._decayFactor = decayFactor
0831         return self
0832 
0833     @since('1.5.0')
0834     def setHalfLife(self, halfLife, timeUnit):
0835         """
0836         Set number of batches after which the centroids of that
0837         particular batch has half the weightage.
0838         """
0839         self._timeUnit = timeUnit
0840         self._decayFactor = exp(log(0.5) / halfLife)
0841         return self
0842 
0843     @since('1.5.0')
0844     def setInitialCenters(self, centers, weights):
0845         """
0846         Set initial centers. Should be set before calling trainOn.
0847         """
0848         self._model = StreamingKMeansModel(centers, weights)
0849         return self
0850 
0851     @since('1.5.0')
0852     def setRandomCenters(self, dim, weight, seed):
0853         """
0854         Set the initial centres to be random samples from
0855         a gaussian population with constant weights.
0856         """
0857         rng = random.RandomState(seed)
0858         clusterCenters = rng.randn(self._k, dim)
0859         clusterWeights = tile(weight, self._k)
0860         self._model = StreamingKMeansModel(clusterCenters, clusterWeights)
0861         return self
0862 
0863     @since('1.5.0')
0864     def trainOn(self, dstream):
0865         """Train the model on the incoming dstream."""
0866         self._validate(dstream)
0867 
0868         def update(rdd):
0869             self._model.update(rdd, self._decayFactor, self._timeUnit)
0870 
0871         dstream.foreachRDD(update)
0872 
0873     @since('1.5.0')
0874     def predictOn(self, dstream):
0875         """
0876         Make predictions on a dstream.
0877         Returns a transformed dstream object
0878         """
0879         self._validate(dstream)
0880         return dstream.map(lambda x: self._model.predict(x))
0881 
0882     @since('1.5.0')
0883     def predictOnValues(self, dstream):
0884         """
0885         Make predictions on a keyed dstream.
0886         Returns a transformed dstream object.
0887         """
0888         self._validate(dstream)
0889         return dstream.mapValues(lambda x: self._model.predict(x))
0890 
0891 
0892 class LDAModel(JavaModelWrapper, JavaSaveable, Loader):
0893 
0894     """ A clustering model derived from the LDA method.
0895 
0896     Latent Dirichlet Allocation (LDA), a topic model designed for text documents.
0897     Terminology
0898     - "word" = "term": an element of the vocabulary
0899     - "token": instance of a term appearing in a document
0900     - "topic": multinomial distribution over words representing some concept
0901     References:
0902     - Original LDA paper (journal version):
0903     Blei, Ng, and Jordan.  "Latent Dirichlet Allocation."  JMLR, 2003.
0904 
0905     >>> from pyspark.mllib.linalg import Vectors
0906     >>> from numpy.testing import assert_almost_equal, assert_equal
0907     >>> data = [
0908     ...     [1, Vectors.dense([0.0, 1.0])],
0909     ...     [2, SparseVector(2, {0: 1.0})],
0910     ... ]
0911     >>> rdd =  sc.parallelize(data)
0912     >>> model = LDA.train(rdd, k=2, seed=1)
0913     >>> model.vocabSize()
0914     2
0915     >>> model.describeTopics()
0916     [([1, 0], [0.5..., 0.49...]), ([0, 1], [0.5..., 0.49...])]
0917     >>> model.describeTopics(1)
0918     [([1], [0.5...]), ([0], [0.5...])]
0919 
0920     >>> topics = model.topicsMatrix()
0921     >>> topics_expect = array([[0.5,  0.5], [0.5, 0.5]])
0922     >>> assert_almost_equal(topics, topics_expect, 1)
0923 
0924     >>> import os, tempfile
0925     >>> from shutil import rmtree
0926     >>> path = tempfile.mkdtemp()
0927     >>> model.save(sc, path)
0928     >>> sameModel = LDAModel.load(sc, path)
0929     >>> assert_equal(sameModel.topicsMatrix(), model.topicsMatrix())
0930     >>> sameModel.vocabSize() == model.vocabSize()
0931     True
0932     >>> try:
0933     ...     rmtree(path)
0934     ... except OSError:
0935     ...     pass
0936 
0937     .. versionadded:: 1.5.0
0938     """
0939 
0940     @since('1.5.0')
0941     def topicsMatrix(self):
0942         """Inferred topics, where each topic is represented by a distribution over terms."""
0943         return self.call("topicsMatrix").toArray()
0944 
0945     @since('1.5.0')
0946     def vocabSize(self):
0947         """Vocabulary size (number of terms or terms in the vocabulary)"""
0948         return self.call("vocabSize")
0949 
0950     @since('1.6.0')
0951     def describeTopics(self, maxTermsPerTopic=None):
0952         """Return the topics described by weighted terms.
0953 
0954         WARNING: If vocabSize and k are large, this can return a large object!
0955 
0956         :param maxTermsPerTopic:
0957           Maximum number of terms to collect for each topic.
0958           (default: vocabulary size)
0959         :return:
0960           Array over topics. Each topic is represented as a pair of
0961           matching arrays: (term indices, term weights in topic).
0962           Each topic's terms are sorted in order of decreasing weight.
0963         """
0964         if maxTermsPerTopic is None:
0965             topics = self.call("describeTopics")
0966         else:
0967             topics = self.call("describeTopics", maxTermsPerTopic)
0968         return topics
0969 
0970     @classmethod
0971     @since('1.5.0')
0972     def load(cls, sc, path):
0973         """Load the LDAModel from disk.
0974 
0975         :param sc:
0976           SparkContext.
0977         :param path:
0978           Path to where the model is stored.
0979         """
0980         if not isinstance(sc, SparkContext):
0981             raise TypeError("sc should be a SparkContext, got type %s" % type(sc))
0982         if not isinstance(path, basestring):
0983             raise TypeError("path should be a basestring, got type %s" % type(path))
0984         model = callMLlibFunc("loadLDAModel", sc, path)
0985         return LDAModel(model)
0986 
0987 
0988 class LDA(object):
0989     """
0990     .. versionadded:: 1.5.0
0991     """
0992 
0993     @classmethod
0994     @since('1.5.0')
0995     def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0,
0996               topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"):
0997         """Train a LDA model.
0998 
0999         :param rdd:
1000           RDD of documents, which are tuples of document IDs and term
1001           (word) count vectors. The term count vectors are "bags of
1002           words" with a fixed-size vocabulary (where the vocabulary size
1003           is the length of the vector). Document IDs must be unique
1004           and >= 0.
1005         :param k:
1006           Number of topics to infer, i.e., the number of soft cluster
1007           centers.
1008           (default: 10)
1009         :param maxIterations:
1010           Maximum number of iterations allowed.
1011           (default: 20)
1012         :param docConcentration:
1013           Concentration parameter (commonly named "alpha") for the prior
1014           placed on documents' distributions over topics ("theta").
1015           (default: -1.0)
1016         :param topicConcentration:
1017           Concentration parameter (commonly named "beta" or "eta") for
1018           the prior placed on topics' distributions over terms.
1019           (default: -1.0)
1020         :param seed:
1021           Random seed for cluster initialization. Set as None to generate
1022           seed based on system time.
1023           (default: None)
1024         :param checkpointInterval:
1025           Period (in iterations) between checkpoints.
1026           (default: 10)
1027         :param optimizer:
1028           LDAOptimizer used to perform the actual calculation. Currently
1029           "em", "online" are supported.
1030           (default: "em")
1031         """
1032         model = callMLlibFunc("trainLDAModel", rdd, k, maxIterations,
1033                               docConcentration, topicConcentration, seed,
1034                               checkpointInterval, optimizer)
1035         return LDAModel(model)
1036 
1037 
1038 def _test():
1039     import doctest
1040     import numpy
1041     import pyspark.mllib.clustering
1042     try:
1043         # Numpy 1.14+ changed it's string format.
1044         numpy.set_printoptions(legacy='1.13')
1045     except TypeError:
1046         pass
1047     globs = pyspark.mllib.clustering.__dict__.copy()
1048     globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
1049     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
1050     globs['sc'].stop()
1051     if failure_count:
1052         sys.exit(-1)
1053 
1054 
1055 if __name__ == "__main__":
1056     _test()