0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import sys
0019 import array as pyarray
0020 import warnings
0021
0022 if sys.version > '3':
0023 xrange = range
0024 basestring = str
0025
0026 from math import exp, log
0027
0028 from numpy import array, random, tile
0029
0030 from collections import namedtuple
0031
0032 from pyspark import SparkContext, since
0033 from pyspark.rdd import RDD, ignore_unicode_prefix
0034 from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
0035 from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
0036 from pyspark.mllib.stat.distribution import MultivariateGaussian
0037 from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
0038 from pyspark.streaming import DStream
0039
0040 __all__ = ['BisectingKMeansModel', 'BisectingKMeans', 'KMeansModel', 'KMeans',
0041 'GaussianMixtureModel', 'GaussianMixture', 'PowerIterationClusteringModel',
0042 'PowerIterationClustering', 'StreamingKMeans', 'StreamingKMeansModel',
0043 'LDA', 'LDAModel']
0044
0045
0046 @inherit_doc
0047 class BisectingKMeansModel(JavaModelWrapper):
0048 """
0049 A clustering model derived from the bisecting k-means method.
0050
0051 >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
0052 >>> bskm = BisectingKMeans()
0053 >>> model = bskm.train(sc.parallelize(data, 2), k=4)
0054 >>> p = array([0.0, 0.0])
0055 >>> model.predict(p)
0056 0
0057 >>> model.k
0058 4
0059 >>> model.computeCost(p)
0060 0.0
0061
0062 .. versionadded:: 2.0.0
0063 """
0064
0065 def __init__(self, java_model):
0066 super(BisectingKMeansModel, self).__init__(java_model)
0067 self.centers = [c.toArray() for c in self.call("clusterCenters")]
0068
0069 @property
0070 @since('2.0.0')
0071 def clusterCenters(self):
0072 """Get the cluster centers, represented as a list of NumPy
0073 arrays."""
0074 return self.centers
0075
0076 @property
0077 @since('2.0.0')
0078 def k(self):
0079 """Get the number of clusters"""
0080 return self.call("k")
0081
0082 @since('2.0.0')
0083 def predict(self, x):
0084 """
0085 Find the cluster that each of the points belongs to in this
0086 model.
0087
0088 :param x:
0089 A data point (or RDD of points) to determine cluster index.
0090 :return:
0091 Predicted cluster index or an RDD of predicted cluster indices
0092 if the input is an RDD.
0093 """
0094 if isinstance(x, RDD):
0095 vecs = x.map(_convert_to_vector)
0096 return self.call("predict", vecs)
0097
0098 x = _convert_to_vector(x)
0099 return self.call("predict", x)
0100
0101 @since('2.0.0')
0102 def computeCost(self, x):
0103 """
0104 Return the Bisecting K-means cost (sum of squared distances of
0105 points to their nearest center) for this model on the given
0106 data. If provided with an RDD of points returns the sum.
0107
0108 :param point:
0109 A data point (or RDD of points) to compute the cost(s).
0110 """
0111 if isinstance(x, RDD):
0112 vecs = x.map(_convert_to_vector)
0113 return self.call("computeCost", vecs)
0114
0115 return self.call("computeCost", _convert_to_vector(x))
0116
0117
0118 class BisectingKMeans(object):
0119 """
0120 A bisecting k-means algorithm based on the paper "A comparison of
0121 document clustering techniques" by Steinbach, Karypis, and Kumar,
0122 with modification to fit Spark.
0123 The algorithm starts from a single cluster that contains all points.
0124 Iteratively it finds divisible clusters on the bottom level and
0125 bisects each of them using k-means, until there are `k` leaf
0126 clusters in total or no leaf clusters are divisible.
0127 The bisecting steps of clusters on the same level are grouped
0128 together to increase parallelism. If bisecting all divisible
0129 clusters on the bottom level would result more than `k` leaf
0130 clusters, larger clusters get higher priority.
0131
0132 Based on
0133 `Steinbach, Karypis, and Kumar, A comparison of document clustering
0134 techniques, KDD Workshop on Text Mining, 2000
0135 <http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf>`_.
0136
0137 .. versionadded:: 2.0.0
0138 """
0139
0140 @classmethod
0141 @since('2.0.0')
0142 def train(self, rdd, k=4, maxIterations=20, minDivisibleClusterSize=1.0, seed=-1888008604):
0143 """
0144 Runs the bisecting k-means algorithm return the model.
0145
0146 :param rdd:
0147 Training points as an `RDD` of `Vector` or convertible
0148 sequence types.
0149 :param k:
0150 The desired number of leaf clusters. The actual number could
0151 be smaller if there are no divisible leaf clusters.
0152 (default: 4)
0153 :param maxIterations:
0154 Maximum number of iterations allowed to split clusters.
0155 (default: 20)
0156 :param minDivisibleClusterSize:
0157 Minimum number of points (if >= 1.0) or the minimum proportion
0158 of points (if < 1.0) of a divisible cluster.
0159 (default: 1)
0160 :param seed:
0161 Random seed value for cluster initialization.
0162 (default: -1888008604 from classOf[BisectingKMeans].getName.##)
0163 """
0164 java_model = callMLlibFunc(
0165 "trainBisectingKMeans", rdd.map(_convert_to_vector),
0166 k, maxIterations, minDivisibleClusterSize, seed)
0167 return BisectingKMeansModel(java_model)
0168
0169
0170 @inherit_doc
0171 class KMeansModel(Saveable, Loader):
0172
0173 """A clustering model derived from the k-means method.
0174
0175 >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
0176 >>> model = KMeans.train(
0177 ... sc.parallelize(data), 2, maxIterations=10, initializationMode="random",
0178 ... seed=50, initializationSteps=5, epsilon=1e-4)
0179 >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
0180 True
0181 >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
0182 True
0183 >>> model.k
0184 2
0185 >>> model.computeCost(sc.parallelize(data))
0186 2.0
0187 >>> model = KMeans.train(sc.parallelize(data), 2)
0188 >>> sparse_data = [
0189 ... SparseVector(3, {1: 1.0}),
0190 ... SparseVector(3, {1: 1.1}),
0191 ... SparseVector(3, {2: 1.0}),
0192 ... SparseVector(3, {2: 1.1})
0193 ... ]
0194 >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||",
0195 ... seed=50, initializationSteps=5, epsilon=1e-4)
0196 >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
0197 True
0198 >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
0199 True
0200 >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1])
0201 True
0202 >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
0203 True
0204 >>> isinstance(model.clusterCenters, list)
0205 True
0206 >>> import os, tempfile
0207 >>> path = tempfile.mkdtemp()
0208 >>> model.save(sc, path)
0209 >>> sameModel = KMeansModel.load(sc, path)
0210 >>> sameModel.predict(sparse_data[0]) == model.predict(sparse_data[0])
0211 True
0212 >>> from shutil import rmtree
0213 >>> try:
0214 ... rmtree(path)
0215 ... except OSError:
0216 ... pass
0217
0218 >>> data = array([-383.1,-382.9, 28.7,31.2, 366.2,367.3]).reshape(3, 2)
0219 >>> model = KMeans.train(sc.parallelize(data), 3, maxIterations=0,
0220 ... initialModel = KMeansModel([(-1000.0,-1000.0),(5.0,5.0),(1000.0,1000.0)]))
0221 >>> model.clusterCenters
0222 [array([-1000., -1000.]), array([ 5., 5.]), array([ 1000., 1000.])]
0223
0224 .. versionadded:: 0.9.0
0225 """
0226
0227 def __init__(self, centers):
0228 self.centers = centers
0229
0230 @property
0231 @since('1.0.0')
0232 def clusterCenters(self):
0233 """Get the cluster centers, represented as a list of NumPy arrays."""
0234 return self.centers
0235
0236 @property
0237 @since('1.4.0')
0238 def k(self):
0239 """Total number of clusters."""
0240 return len(self.centers)
0241
0242 @since('0.9.0')
0243 def predict(self, x):
0244 """
0245 Find the cluster that each of the points belongs to in this
0246 model.
0247
0248 :param x:
0249 A data point (or RDD of points) to determine cluster index.
0250 :return:
0251 Predicted cluster index or an RDD of predicted cluster indices
0252 if the input is an RDD.
0253 """
0254 best = 0
0255 best_distance = float("inf")
0256 if isinstance(x, RDD):
0257 return x.map(self.predict)
0258
0259 x = _convert_to_vector(x)
0260 for i in xrange(len(self.centers)):
0261 distance = x.squared_distance(self.centers[i])
0262 if distance < best_distance:
0263 best = i
0264 best_distance = distance
0265 return best
0266
0267 @since('1.4.0')
0268 def computeCost(self, rdd):
0269 """
0270 Return the K-means cost (sum of squared distances of points to
0271 their nearest center) for this model on the given
0272 data.
0273
0274 :param rdd:
0275 The RDD of points to compute the cost on.
0276 """
0277 cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector),
0278 [_convert_to_vector(c) for c in self.centers])
0279 return cost
0280
0281 @since('1.4.0')
0282 def save(self, sc, path):
0283 """
0284 Save this model to the given path.
0285 """
0286 java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers])
0287 java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers)
0288 java_model.save(sc._jsc.sc(), path)
0289
0290 @classmethod
0291 @since('1.4.0')
0292 def load(cls, sc, path):
0293 """
0294 Load a model from the given path.
0295 """
0296 java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel.load(sc._jsc.sc(), path)
0297 return KMeansModel(_java2py(sc, java_model.clusterCenters()))
0298
0299
0300 class KMeans(object):
0301 """
0302 .. versionadded:: 0.9.0
0303 """
0304
0305 @classmethod
0306 @since('0.9.0')
0307 def train(cls, rdd, k, maxIterations=100, initializationMode="k-means||",
0308 seed=None, initializationSteps=2, epsilon=1e-4, initialModel=None):
0309 """
0310 Train a k-means clustering model.
0311
0312 :param rdd:
0313 Training points as an `RDD` of `Vector` or convertible
0314 sequence types.
0315 :param k:
0316 Number of clusters to create.
0317 :param maxIterations:
0318 Maximum number of iterations allowed.
0319 (default: 100)
0320 :param initializationMode:
0321 The initialization algorithm. This can be either "random" or
0322 "k-means||".
0323 (default: "k-means||")
0324 :param seed:
0325 Random seed value for cluster initialization. Set as None to
0326 generate seed based on system time.
0327 (default: None)
0328 :param initializationSteps:
0329 Number of steps for the k-means|| initialization mode.
0330 This is an advanced setting -- the default of 2 is almost
0331 always enough.
0332 (default: 2)
0333 :param epsilon:
0334 Distance threshold within which a center will be considered to
0335 have converged. If all centers move less than this Euclidean
0336 distance, iterations are stopped.
0337 (default: 1e-4)
0338 :param initialModel:
0339 Initial cluster centers can be provided as a KMeansModel object
0340 rather than using the random or k-means|| initializationModel.
0341 (default: None)
0342 """
0343 clusterInitialModel = []
0344 if initialModel is not None:
0345 if not isinstance(initialModel, KMeansModel):
0346 raise Exception("initialModel is of "+str(type(initialModel))+". It needs "
0347 "to be of <type 'KMeansModel'>")
0348 clusterInitialModel = [_convert_to_vector(c) for c in initialModel.clusterCenters]
0349 model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
0350 initializationMode, seed, initializationSteps, epsilon,
0351 clusterInitialModel)
0352 centers = callJavaFunc(rdd.context, model.clusterCenters)
0353 return KMeansModel([c.toArray() for c in centers])
0354
0355
0356 @inherit_doc
0357 class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
0358
0359 """
0360 A clustering model derived from the Gaussian Mixture Model method.
0361
0362 >>> from pyspark.mllib.linalg import Vectors, DenseMatrix
0363 >>> from numpy.testing import assert_equal
0364 >>> from shutil import rmtree
0365 >>> import os, tempfile
0366
0367 >>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
0368 ... 0.9,0.8,0.75,0.935,
0369 ... -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2), 2)
0370 >>> model = GaussianMixture.train(clusterdata_1, 3, convergenceTol=0.0001,
0371 ... maxIterations=50, seed=10)
0372 >>> labels = model.predict(clusterdata_1).collect()
0373 >>> labels[0]==labels[1]
0374 False
0375 >>> labels[1]==labels[2]
0376 False
0377 >>> labels[4]==labels[5]
0378 True
0379 >>> model.predict([-0.1,-0.05])
0380 0
0381 >>> softPredicted = model.predictSoft([-0.1,-0.05])
0382 >>> abs(softPredicted[0] - 1.0) < 0.03
0383 True
0384 >>> abs(softPredicted[1] - 0.0) < 0.03
0385 True
0386 >>> abs(softPredicted[2] - 0.0) < 0.03
0387 True
0388
0389 >>> path = tempfile.mkdtemp()
0390 >>> model.save(sc, path)
0391 >>> sameModel = GaussianMixtureModel.load(sc, path)
0392 >>> assert_equal(model.weights, sameModel.weights)
0393 >>> mus, sigmas = list(
0394 ... zip(*[(g.mu, g.sigma) for g in model.gaussians]))
0395 >>> sameMus, sameSigmas = list(
0396 ... zip(*[(g.mu, g.sigma) for g in sameModel.gaussians]))
0397 >>> mus == sameMus
0398 True
0399 >>> sigmas == sameSigmas
0400 True
0401 >>> from shutil import rmtree
0402 >>> try:
0403 ... rmtree(path)
0404 ... except OSError:
0405 ... pass
0406
0407 >>> data = array([-5.1971, -2.5359, -3.8220,
0408 ... -5.2211, -5.0602, 4.7118,
0409 ... 6.8989, 3.4592, 4.6322,
0410 ... 5.7048, 4.6567, 5.5026,
0411 ... 4.5605, 5.2043, 6.2734])
0412 >>> clusterdata_2 = sc.parallelize(data.reshape(5,3))
0413 >>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
0414 ... maxIterations=150, seed=4)
0415 >>> labels = model.predict(clusterdata_2).collect()
0416 >>> labels[0]==labels[1]
0417 True
0418 >>> labels[2]==labels[3]==labels[4]
0419 True
0420
0421 .. versionadded:: 1.3.0
0422 """
0423
0424 @property
0425 @since('1.4.0')
0426 def weights(self):
0427 """
0428 Weights for each Gaussian distribution in the mixture, where weights[i] is
0429 the weight for Gaussian i, and weights.sum == 1.
0430 """
0431 return array(self.call("weights"))
0432
0433 @property
0434 @since('1.4.0')
0435 def gaussians(self):
0436 """
0437 Array of MultivariateGaussian where gaussians[i] represents
0438 the Multivariate Gaussian (Normal) Distribution for Gaussian i.
0439 """
0440 return [
0441 MultivariateGaussian(gaussian[0], gaussian[1])
0442 for gaussian in self.call("gaussians")]
0443
0444 @property
0445 @since('1.4.0')
0446 def k(self):
0447 """Number of gaussians in mixture."""
0448 return len(self.weights)
0449
0450 @since('1.3.0')
0451 def predict(self, x):
0452 """
0453 Find the cluster to which the point 'x' or each point in RDD 'x'
0454 has maximum membership in this model.
0455
0456 :param x:
0457 A feature vector or an RDD of vectors representing data points.
0458 :return:
0459 Predicted cluster label or an RDD of predicted cluster labels
0460 if the input is an RDD.
0461 """
0462 if isinstance(x, RDD):
0463 cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z)))
0464 return cluster_labels
0465 else:
0466 z = self.predictSoft(x)
0467 return z.argmax()
0468
0469 @since('1.3.0')
0470 def predictSoft(self, x):
0471 """
0472 Find the membership of point 'x' or each point in RDD 'x' to all mixture components.
0473
0474 :param x:
0475 A feature vector or an RDD of vectors representing data points.
0476 :return:
0477 The membership value to all mixture components for vector 'x'
0478 or each vector in RDD 'x'.
0479 """
0480 if isinstance(x, RDD):
0481 means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
0482 membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
0483 _convert_to_vector(self.weights), means, sigmas)
0484 return membership_matrix.map(lambda x: pyarray.array('d', x))
0485 else:
0486 return self.call("predictSoft", _convert_to_vector(x)).toArray()
0487
0488 @classmethod
0489 @since('1.5.0')
0490 def load(cls, sc, path):
0491 """Load the GaussianMixtureModel from disk.
0492
0493 :param sc:
0494 SparkContext.
0495 :param path:
0496 Path to where the model is stored.
0497 """
0498 model = cls._load_java(sc, path)
0499 wrapper = sc._jvm.org.apache.spark.mllib.api.python.GaussianMixtureModelWrapper(model)
0500 return cls(wrapper)
0501
0502
0503 class GaussianMixture(object):
0504 """
0505 Learning algorithm for Gaussian Mixtures using the expectation-maximization algorithm.
0506
0507 .. versionadded:: 1.3.0
0508 """
0509 @classmethod
0510 @since('1.3.0')
0511 def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None):
0512 """
0513 Train a Gaussian Mixture clustering model.
0514
0515 :param rdd:
0516 Training points as an `RDD` of `Vector` or convertible
0517 sequence types.
0518 :param k:
0519 Number of independent Gaussians in the mixture model.
0520 :param convergenceTol:
0521 Maximum change in log-likelihood at which convergence is
0522 considered to have occurred.
0523 (default: 1e-3)
0524 :param maxIterations:
0525 Maximum number of iterations allowed.
0526 (default: 100)
0527 :param seed:
0528 Random seed for initial Gaussian distribution. Set as None to
0529 generate seed based on system time.
0530 (default: None)
0531 :param initialModel:
0532 Initial GMM starting point, bypassing the random
0533 initialization.
0534 (default: None)
0535 """
0536 initialModelWeights = None
0537 initialModelMu = None
0538 initialModelSigma = None
0539 if initialModel is not None:
0540 if initialModel.k != k:
0541 raise Exception("Mismatched cluster count, initialModel.k = %s, however k = %s"
0542 % (initialModel.k, k))
0543 initialModelWeights = list(initialModel.weights)
0544 initialModelMu = [initialModel.gaussians[i].mu for i in range(initialModel.k)]
0545 initialModelSigma = [initialModel.gaussians[i].sigma for i in range(initialModel.k)]
0546 java_model = callMLlibFunc("trainGaussianMixtureModel", rdd.map(_convert_to_vector),
0547 k, convergenceTol, maxIterations, seed,
0548 initialModelWeights, initialModelMu, initialModelSigma)
0549 return GaussianMixtureModel(java_model)
0550
0551
0552 class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
0553
0554 """
0555 Model produced by [[PowerIterationClustering]].
0556
0557 >>> import math
0558 >>> def genCircle(r, n):
0559 ... points = []
0560 ... for i in range(0, n):
0561 ... theta = 2.0 * math.pi * i / n
0562 ... points.append((r * math.cos(theta), r * math.sin(theta)))
0563 ... return points
0564 >>> def sim(x, y):
0565 ... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
0566 ... return math.exp(-dist2 / 2.0)
0567 >>> r1 = 1.0
0568 >>> n1 = 10
0569 >>> r2 = 4.0
0570 >>> n2 = 40
0571 >>> n = n1 + n2
0572 >>> points = genCircle(r1, n1) + genCircle(r2, n2)
0573 >>> similarities = [(i, j, sim(points[i], points[j])) for i in range(1, n) for j in range(0, i)]
0574 >>> rdd = sc.parallelize(similarities, 2)
0575 >>> model = PowerIterationClustering.train(rdd, 2, 40)
0576 >>> model.k
0577 2
0578 >>> result = sorted(model.assignments().collect(), key=lambda x: x.id)
0579 >>> result[0].cluster == result[1].cluster == result[2].cluster == result[3].cluster
0580 True
0581 >>> result[4].cluster == result[5].cluster == result[6].cluster == result[7].cluster
0582 True
0583 >>> import os, tempfile
0584 >>> path = tempfile.mkdtemp()
0585 >>> model.save(sc, path)
0586 >>> sameModel = PowerIterationClusteringModel.load(sc, path)
0587 >>> sameModel.k
0588 2
0589 >>> result = sorted(model.assignments().collect(), key=lambda x: x.id)
0590 >>> result[0].cluster == result[1].cluster == result[2].cluster == result[3].cluster
0591 True
0592 >>> result[4].cluster == result[5].cluster == result[6].cluster == result[7].cluster
0593 True
0594 >>> from shutil import rmtree
0595 >>> try:
0596 ... rmtree(path)
0597 ... except OSError:
0598 ... pass
0599
0600 .. versionadded:: 1.5.0
0601 """
0602
0603 @property
0604 @since('1.5.0')
0605 def k(self):
0606 """
0607 Returns the number of clusters.
0608 """
0609 return self.call("k")
0610
0611 @since('1.5.0')
0612 def assignments(self):
0613 """
0614 Returns the cluster assignments of this model.
0615 """
0616 return self.call("getAssignments").map(
0617 lambda x: (PowerIterationClustering.Assignment(*x)))
0618
0619 @classmethod
0620 @since('1.5.0')
0621 def load(cls, sc, path):
0622 """
0623 Load a model from the given path.
0624 """
0625 model = cls._load_java(sc, path)
0626 wrapper =\
0627 sc._jvm.org.apache.spark.mllib.api.python.PowerIterationClusteringModelWrapper(model)
0628 return PowerIterationClusteringModel(wrapper)
0629
0630
0631 class PowerIterationClustering(object):
0632 """
0633 Power Iteration Clustering (PIC), a scalable graph clustering algorithm
0634 developed by [[http://www.cs.cmu.edu/~frank/papers/icml2010-pic-final.pdf Lin and Cohen]].
0635 From the abstract: PIC finds a very low-dimensional embedding of a
0636 dataset using truncated power iteration on a normalized pair-wise
0637 similarity matrix of the data.
0638
0639 .. versionadded:: 1.5.0
0640 """
0641
0642 @classmethod
0643 @since('1.5.0')
0644 def train(cls, rdd, k, maxIterations=100, initMode="random"):
0645 r"""
0646 :param rdd:
0647 An RDD of (i, j, s\ :sub:`ij`\) tuples representing the
0648 affinity matrix, which is the matrix A in the PIC paper. The
0649 similarity s\ :sub:`ij`\ must be nonnegative. This is a symmetric
0650 matrix and hence s\ :sub:`ij`\ = s\ :sub:`ji`\ For any (i, j) with
0651 nonzero similarity, there should be either (i, j, s\ :sub:`ij`\) or
0652 (j, i, s\ :sub:`ji`\) in the input. Tuples with i = j are ignored,
0653 because it is assumed s\ :sub:`ij`\ = 0.0.
0654 :param k:
0655 Number of clusters.
0656 :param maxIterations:
0657 Maximum number of iterations of the PIC algorithm.
0658 (default: 100)
0659 :param initMode:
0660 Initialization mode. This can be either "random" to use
0661 a random vector as vertex properties, or "degree" to use
0662 normalized sum similarities.
0663 (default: "random")
0664 """
0665 model = callMLlibFunc("trainPowerIterationClusteringModel",
0666 rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
0667 return PowerIterationClusteringModel(model)
0668
0669 class Assignment(namedtuple("Assignment", ["id", "cluster"])):
0670 """
0671 Represents an (id, cluster) tuple.
0672
0673 .. versionadded:: 1.5.0
0674 """
0675
0676
0677 class StreamingKMeansModel(KMeansModel):
0678 """
0679 Clustering model which can perform an online update of the centroids.
0680
0681 The update formula for each centroid is given by
0682
0683 * c_t+1 = ((c_t * n_t * a) + (x_t * m_t)) / (n_t + m_t)
0684 * n_t+1 = n_t * a + m_t
0685
0686 where
0687
0688 * c_t: Centroid at the n_th iteration.
0689 * n_t: Number of samples (or) weights associated with the centroid
0690 at the n_th iteration.
0691 * x_t: Centroid of the new data closest to c_t.
0692 * m_t: Number of samples (or) weights of the new data closest to c_t
0693 * c_t+1: New centroid.
0694 * n_t+1: New number of weights.
0695 * a: Decay Factor, which gives the forgetfulness.
0696
0697 .. note:: If a is set to 1, it is the weighted mean of the previous
0698 and new data. If it set to zero, the old centroids are completely
0699 forgotten.
0700
0701 :param clusterCenters:
0702 Initial cluster centers.
0703 :param clusterWeights:
0704 List of weights assigned to each cluster.
0705
0706 >>> initCenters = [[0.0, 0.0], [1.0, 1.0]]
0707 >>> initWeights = [1.0, 1.0]
0708 >>> stkm = StreamingKMeansModel(initCenters, initWeights)
0709 >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1],
0710 ... [0.9, 0.9], [1.1, 1.1]])
0711 >>> stkm = stkm.update(data, 1.0, u"batches")
0712 >>> stkm.centers
0713 array([[ 0., 0.],
0714 [ 1., 1.]])
0715 >>> stkm.predict([-0.1, -0.1])
0716 0
0717 >>> stkm.predict([0.9, 0.9])
0718 1
0719 >>> stkm.clusterWeights
0720 [3.0, 3.0]
0721 >>> decayFactor = 0.0
0722 >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])])
0723 >>> stkm = stkm.update(data, 0.0, u"batches")
0724 >>> stkm.centers
0725 array([[ 0.2, 0.2],
0726 [ 1.5, 1.5]])
0727 >>> stkm.clusterWeights
0728 [1.0, 1.0]
0729 >>> stkm.predict([0.2, 0.2])
0730 0
0731 >>> stkm.predict([1.5, 1.5])
0732 1
0733
0734 .. versionadded:: 1.5.0
0735 """
0736 def __init__(self, clusterCenters, clusterWeights):
0737 super(StreamingKMeansModel, self).__init__(centers=clusterCenters)
0738 self._clusterWeights = list(clusterWeights)
0739
0740 @property
0741 @since('1.5.0')
0742 def clusterWeights(self):
0743 """Return the cluster weights."""
0744 return self._clusterWeights
0745
0746 @ignore_unicode_prefix
0747 @since('1.5.0')
0748 def update(self, data, decayFactor, timeUnit):
0749 """Update the centroids, according to data
0750
0751 :param data:
0752 RDD with new data for the model update.
0753 :param decayFactor:
0754 Forgetfulness of the previous centroids.
0755 :param timeUnit:
0756 Can be "batches" or "points". If points, then the decay factor
0757 is raised to the power of number of new points and if batches,
0758 then decay factor will be used as is.
0759 """
0760 if not isinstance(data, RDD):
0761 raise TypeError("Data should be of an RDD, got %s." % type(data))
0762 data = data.map(_convert_to_vector)
0763 decayFactor = float(decayFactor)
0764 if timeUnit not in ["batches", "points"]:
0765 raise ValueError(
0766 "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
0767 vectorCenters = [_convert_to_vector(center) for center in self.centers]
0768 updatedModel = callMLlibFunc(
0769 "updateStreamingKMeansModel", vectorCenters, self._clusterWeights,
0770 data, decayFactor, timeUnit)
0771 self.centers = array(updatedModel[0])
0772 self._clusterWeights = list(updatedModel[1])
0773 return self
0774
0775
0776 class StreamingKMeans(object):
0777 """
0778 Provides methods to set k, decayFactor, timeUnit to configure the
0779 KMeans algorithm for fitting and predicting on incoming dstreams.
0780 More details on how the centroids are updated are provided under the
0781 docs of StreamingKMeansModel.
0782
0783 :param k:
0784 Number of clusters.
0785 (default: 2)
0786 :param decayFactor:
0787 Forgetfulness of the previous centroids.
0788 (default: 1.0)
0789 :param timeUnit:
0790 Can be "batches" or "points". If points, then the decay factor is
0791 raised to the power of number of new points and if batches, then
0792 decay factor will be used as is.
0793 (default: "batches")
0794
0795 .. versionadded:: 1.5.0
0796 """
0797 def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"):
0798 self._k = k
0799 self._decayFactor = decayFactor
0800 if timeUnit not in ["batches", "points"]:
0801 raise ValueError(
0802 "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
0803 self._timeUnit = timeUnit
0804 self._model = None
0805
0806 @since('1.5.0')
0807 def latestModel(self):
0808 """Return the latest model"""
0809 return self._model
0810
0811 def _validate(self, dstream):
0812 if self._model is None:
0813 raise ValueError(
0814 "Initial centers should be set either by setInitialCenters "
0815 "or setRandomCenters.")
0816 if not isinstance(dstream, DStream):
0817 raise TypeError(
0818 "Expected dstream to be of type DStream, "
0819 "got type %s" % type(dstream))
0820
0821 @since('1.5.0')
0822 def setK(self, k):
0823 """Set number of clusters."""
0824 self._k = k
0825 return self
0826
0827 @since('1.5.0')
0828 def setDecayFactor(self, decayFactor):
0829 """Set decay factor."""
0830 self._decayFactor = decayFactor
0831 return self
0832
0833 @since('1.5.0')
0834 def setHalfLife(self, halfLife, timeUnit):
0835 """
0836 Set number of batches after which the centroids of that
0837 particular batch has half the weightage.
0838 """
0839 self._timeUnit = timeUnit
0840 self._decayFactor = exp(log(0.5) / halfLife)
0841 return self
0842
0843 @since('1.5.0')
0844 def setInitialCenters(self, centers, weights):
0845 """
0846 Set initial centers. Should be set before calling trainOn.
0847 """
0848 self._model = StreamingKMeansModel(centers, weights)
0849 return self
0850
0851 @since('1.5.0')
0852 def setRandomCenters(self, dim, weight, seed):
0853 """
0854 Set the initial centres to be random samples from
0855 a gaussian population with constant weights.
0856 """
0857 rng = random.RandomState(seed)
0858 clusterCenters = rng.randn(self._k, dim)
0859 clusterWeights = tile(weight, self._k)
0860 self._model = StreamingKMeansModel(clusterCenters, clusterWeights)
0861 return self
0862
0863 @since('1.5.0')
0864 def trainOn(self, dstream):
0865 """Train the model on the incoming dstream."""
0866 self._validate(dstream)
0867
0868 def update(rdd):
0869 self._model.update(rdd, self._decayFactor, self._timeUnit)
0870
0871 dstream.foreachRDD(update)
0872
0873 @since('1.5.0')
0874 def predictOn(self, dstream):
0875 """
0876 Make predictions on a dstream.
0877 Returns a transformed dstream object
0878 """
0879 self._validate(dstream)
0880 return dstream.map(lambda x: self._model.predict(x))
0881
0882 @since('1.5.0')
0883 def predictOnValues(self, dstream):
0884 """
0885 Make predictions on a keyed dstream.
0886 Returns a transformed dstream object.
0887 """
0888 self._validate(dstream)
0889 return dstream.mapValues(lambda x: self._model.predict(x))
0890
0891
0892 class LDAModel(JavaModelWrapper, JavaSaveable, Loader):
0893
0894 """ A clustering model derived from the LDA method.
0895
0896 Latent Dirichlet Allocation (LDA), a topic model designed for text documents.
0897 Terminology
0898 - "word" = "term": an element of the vocabulary
0899 - "token": instance of a term appearing in a document
0900 - "topic": multinomial distribution over words representing some concept
0901 References:
0902 - Original LDA paper (journal version):
0903 Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003.
0904
0905 >>> from pyspark.mllib.linalg import Vectors
0906 >>> from numpy.testing import assert_almost_equal, assert_equal
0907 >>> data = [
0908 ... [1, Vectors.dense([0.0, 1.0])],
0909 ... [2, SparseVector(2, {0: 1.0})],
0910 ... ]
0911 >>> rdd = sc.parallelize(data)
0912 >>> model = LDA.train(rdd, k=2, seed=1)
0913 >>> model.vocabSize()
0914 2
0915 >>> model.describeTopics()
0916 [([1, 0], [0.5..., 0.49...]), ([0, 1], [0.5..., 0.49...])]
0917 >>> model.describeTopics(1)
0918 [([1], [0.5...]), ([0], [0.5...])]
0919
0920 >>> topics = model.topicsMatrix()
0921 >>> topics_expect = array([[0.5, 0.5], [0.5, 0.5]])
0922 >>> assert_almost_equal(topics, topics_expect, 1)
0923
0924 >>> import os, tempfile
0925 >>> from shutil import rmtree
0926 >>> path = tempfile.mkdtemp()
0927 >>> model.save(sc, path)
0928 >>> sameModel = LDAModel.load(sc, path)
0929 >>> assert_equal(sameModel.topicsMatrix(), model.topicsMatrix())
0930 >>> sameModel.vocabSize() == model.vocabSize()
0931 True
0932 >>> try:
0933 ... rmtree(path)
0934 ... except OSError:
0935 ... pass
0936
0937 .. versionadded:: 1.5.0
0938 """
0939
0940 @since('1.5.0')
0941 def topicsMatrix(self):
0942 """Inferred topics, where each topic is represented by a distribution over terms."""
0943 return self.call("topicsMatrix").toArray()
0944
0945 @since('1.5.0')
0946 def vocabSize(self):
0947 """Vocabulary size (number of terms or terms in the vocabulary)"""
0948 return self.call("vocabSize")
0949
0950 @since('1.6.0')
0951 def describeTopics(self, maxTermsPerTopic=None):
0952 """Return the topics described by weighted terms.
0953
0954 WARNING: If vocabSize and k are large, this can return a large object!
0955
0956 :param maxTermsPerTopic:
0957 Maximum number of terms to collect for each topic.
0958 (default: vocabulary size)
0959 :return:
0960 Array over topics. Each topic is represented as a pair of
0961 matching arrays: (term indices, term weights in topic).
0962 Each topic's terms are sorted in order of decreasing weight.
0963 """
0964 if maxTermsPerTopic is None:
0965 topics = self.call("describeTopics")
0966 else:
0967 topics = self.call("describeTopics", maxTermsPerTopic)
0968 return topics
0969
0970 @classmethod
0971 @since('1.5.0')
0972 def load(cls, sc, path):
0973 """Load the LDAModel from disk.
0974
0975 :param sc:
0976 SparkContext.
0977 :param path:
0978 Path to where the model is stored.
0979 """
0980 if not isinstance(sc, SparkContext):
0981 raise TypeError("sc should be a SparkContext, got type %s" % type(sc))
0982 if not isinstance(path, basestring):
0983 raise TypeError("path should be a basestring, got type %s" % type(path))
0984 model = callMLlibFunc("loadLDAModel", sc, path)
0985 return LDAModel(model)
0986
0987
0988 class LDA(object):
0989 """
0990 .. versionadded:: 1.5.0
0991 """
0992
0993 @classmethod
0994 @since('1.5.0')
0995 def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0,
0996 topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"):
0997 """Train a LDA model.
0998
0999 :param rdd:
1000 RDD of documents, which are tuples of document IDs and term
1001 (word) count vectors. The term count vectors are "bags of
1002 words" with a fixed-size vocabulary (where the vocabulary size
1003 is the length of the vector). Document IDs must be unique
1004 and >= 0.
1005 :param k:
1006 Number of topics to infer, i.e., the number of soft cluster
1007 centers.
1008 (default: 10)
1009 :param maxIterations:
1010 Maximum number of iterations allowed.
1011 (default: 20)
1012 :param docConcentration:
1013 Concentration parameter (commonly named "alpha") for the prior
1014 placed on documents' distributions over topics ("theta").
1015 (default: -1.0)
1016 :param topicConcentration:
1017 Concentration parameter (commonly named "beta" or "eta") for
1018 the prior placed on topics' distributions over terms.
1019 (default: -1.0)
1020 :param seed:
1021 Random seed for cluster initialization. Set as None to generate
1022 seed based on system time.
1023 (default: None)
1024 :param checkpointInterval:
1025 Period (in iterations) between checkpoints.
1026 (default: 10)
1027 :param optimizer:
1028 LDAOptimizer used to perform the actual calculation. Currently
1029 "em", "online" are supported.
1030 (default: "em")
1031 """
1032 model = callMLlibFunc("trainLDAModel", rdd, k, maxIterations,
1033 docConcentration, topicConcentration, seed,
1034 checkpointInterval, optimizer)
1035 return LDAModel(model)
1036
1037
1038 def _test():
1039 import doctest
1040 import numpy
1041 import pyspark.mllib.clustering
1042 try:
1043
1044 numpy.set_printoptions(legacy='1.13')
1045 except TypeError:
1046 pass
1047 globs = pyspark.mllib.clustering.__dict__.copy()
1048 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
1049 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
1050 globs['sc'].stop()
1051 if failure_count:
1052 sys.exit(-1)
1053
1054
1055 if __name__ == "__main__":
1056 _test()