Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: Clustering - RDD-based API
0004 displayTitle: Clustering - RDD-based API
0005 license: |
0006   Licensed to the Apache Software Foundation (ASF) under one or more
0007   contributor license agreements.  See the NOTICE file distributed with
0008   this work for additional information regarding copyright ownership.
0009   The ASF licenses this file to You under the Apache License, Version 2.0
0010   (the "License"); you may not use this file except in compliance with
0011   the License.  You may obtain a copy of the License at
0012  
0013      http://www.apache.org/licenses/LICENSE-2.0
0014  
0015   Unless required by applicable law or agreed to in writing, software
0016   distributed under the License is distributed on an "AS IS" BASIS,
0017   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018   See the License for the specific language governing permissions and
0019   limitations under the License.
0020 ---
0021 
0022 [Clustering](https://en.wikipedia.org/wiki/Cluster_analysis) is an unsupervised learning problem whereby we aim to group subsets
0023 of entities with one another based on some notion of similarity.  Clustering is
0024 often used for exploratory analysis and/or as a component of a hierarchical
0025 [supervised learning](https://en.wikipedia.org/wiki/Supervised_learning) pipeline (in which distinct classifiers or regression
0026 models are trained for each cluster).
0027 
0028 The `spark.mllib` package supports the following models:
0029 
0030 * Table of contents
0031 {:toc}
0032 
0033 ## K-means
0034 
0035 [K-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the
0036 most commonly used clustering algorithms that clusters the data points into a
0037 predefined number of clusters. The `spark.mllib` implementation includes a parallelized
0038 variant of the [k-means++](http://en.wikipedia.org/wiki/K-means%2B%2B) method
0039 called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf).
0040 The implementation in `spark.mllib` has the following parameters:
0041 
0042 * *k* is the number of desired clusters. Note that it is possible for fewer than k clusters to be returned, for example, if there are fewer than k distinct points to cluster.
0043 * *maxIterations* is the maximum number of iterations to run.
0044 * *initializationMode* specifies either random initialization or
0045 initialization via k-means\|\|.
0046 * *runs* This param has no effect since Spark 2.0.0.
0047 * *initializationSteps* determines the number of steps in the k-means\|\| algorithm.
0048 * *epsilon* determines the distance threshold within which we consider k-means to have converged.
0049 * *initialModel* is an optional set of cluster centers used for initialization. If this parameter is supplied, only one run is performed.
0050 
0051 **Examples**
0052 
0053 <div class="codetabs">
0054 <div data-lang="scala" markdown="1">
0055 The following code snippets can be executed in `spark-shell`.
0056 
0057 In the following example after loading and parsing data, we use the
0058 [`KMeans`](api/scala/org/apache/spark/mllib/clustering/KMeans.html) object to cluster the data
0059 into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within
0060 Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing *k*. In fact, the
0061 optimal *k* is usually one where there is an "elbow" in the WSSSE graph.
0062 
0063 Refer to the [`KMeans` Scala docs](api/scala/org/apache/spark/mllib/clustering/KMeans.html) and [`KMeansModel` Scala docs](api/scala/org/apache/spark/mllib/clustering/KMeansModel.html) for details on the API.
0064 
0065 {% include_example scala/org/apache/spark/examples/mllib/KMeansExample.scala %}
0066 </div>
0067 
0068 <div data-lang="java" markdown="1">
0069 All of MLlib's methods use Java-friendly types, so you can import and call them there the same
0070 way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the
0071 Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by
0072 calling `.rdd()` on your `JavaRDD` object. A self-contained application example
0073 that is equivalent to the provided example in Scala is given below:
0074 
0075 Refer to the [`KMeans` Java docs](api/java/org/apache/spark/mllib/clustering/KMeans.html) and [`KMeansModel` Java docs](api/java/org/apache/spark/mllib/clustering/KMeansModel.html) for details on the API.
0076 
0077 {% include_example java/org/apache/spark/examples/mllib/JavaKMeansExample.java %}
0078 </div>
0079 
0080 <div data-lang="python" markdown="1">
0081 The following examples can be tested in the PySpark shell.
0082 
0083 In the following example after loading and parsing data, we use the KMeans object to cluster the
0084 data into two clusters. The number of desired clusters is passed to the algorithm. We then compute
0085 Within Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing *k*. In
0086 fact the optimal *k* is usually one where there is an "elbow" in the WSSSE graph.
0087 
0088 Refer to the [`KMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeans) and [`KMeansModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeansModel) for more details on the API.
0089 
0090 {% include_example python/mllib/k_means_example.py %}
0091 </div>
0092 
0093 </div>
0094 
0095 ## Gaussian mixture
0096 
0097 A [Gaussian Mixture Model](http://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model)
0098 represents a composite distribution whereby points are drawn from one of *k* Gaussian sub-distributions,
0099 each with its own probability.  The `spark.mllib` implementation uses the
0100 [expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm)
0101  algorithm to induce the maximum-likelihood model given a set of samples.  The implementation
0102 has the following parameters:
0103 
0104 * *k* is the number of desired clusters.
0105 * *convergenceTol* is the maximum change in log-likelihood at which we consider convergence achieved.
0106 * *maxIterations* is the maximum number of iterations to perform without reaching convergence.
0107 * *initialModel* is an optional starting point from which to start the EM algorithm. If this parameter is omitted, a random starting point will be constructed from the data.
0108 
0109 **Examples**
0110 
0111 <div class="codetabs">
0112 <div data-lang="scala" markdown="1">
0113 In the following example after loading and parsing data, we use a
0114 [GaussianMixture](api/scala/org/apache/spark/mllib/clustering/GaussianMixture.html)
0115 object to cluster the data into two clusters. The number of desired clusters is passed
0116 to the algorithm. We then output the parameters of the mixture model.
0117 
0118 Refer to the [`GaussianMixture` Scala docs](api/scala/org/apache/spark/mllib/clustering/GaussianMixture.html) and [`GaussianMixtureModel` Scala docs](api/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.html) for details on the API.
0119 
0120 {% include_example scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala %}
0121 </div>
0122 
0123 <div data-lang="java" markdown="1">
0124 All of MLlib's methods use Java-friendly types, so you can import and call them there the same
0125 way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the
0126 Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by
0127 calling `.rdd()` on your `JavaRDD` object. A self-contained application example
0128 that is equivalent to the provided example in Scala is given below:
0129 
0130 Refer to the [`GaussianMixture` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixture.html) and [`GaussianMixtureModel` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixtureModel.html) for details on the API.
0131 
0132 {% include_example java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java %}
0133 </div>
0134 
0135 <div data-lang="python" markdown="1">
0136 In the following example after loading and parsing data, we use a
0137 [GaussianMixture](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixture)
0138 object to cluster the data into two clusters. The number of desired clusters is passed
0139 to the algorithm. We then output the parameters of the mixture model.
0140 
0141 Refer to the [`GaussianMixture` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixture) and [`GaussianMixtureModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixtureModel) for more details on the API.
0142 
0143 {% include_example python/mllib/gaussian_mixture_example.py %}
0144 </div>
0145 
0146 </div>
0147 
0148 ## Power iteration clustering (PIC)
0149 
0150 Power iteration clustering (PIC) is a scalable and efficient algorithm for clustering vertices of a
0151 graph given pairwise similarities as edge properties,
0152 described in [Lin and Cohen, Power Iteration Clustering](http://www.cs.cmu.edu/~frank/papers/icml2010-pic-final.pdf).
0153 It computes a pseudo-eigenvector of the normalized affinity matrix of the graph via
0154 [power iteration](http://en.wikipedia.org/wiki/Power_iteration)  and uses it to cluster vertices.
0155 `spark.mllib` includes an implementation of PIC using GraphX as its backend.
0156 It takes an `RDD` of `(srcId, dstId, similarity)` tuples and outputs a model with the clustering assignments.
0157 The similarities must be nonnegative.
0158 PIC assumes that the similarity measure is symmetric.
0159 A pair `(srcId, dstId)` regardless of the ordering should appear at most once in the input data.
0160 If a pair is missing from input, their similarity is treated as zero.
0161 `spark.mllib`'s PIC implementation takes the following (hyper-)parameters:
0162 
0163 * `k`: number of clusters
0164 * `maxIterations`: maximum number of power iterations
0165 * `initializationMode`: initialization model. This can be either "random", which is the default,
0166   to use a random vector as vertex properties, or "degree" to use normalized sum similarities.
0167 
0168 **Examples**
0169 
0170 In the following, we show code snippets to demonstrate how to use PIC in `spark.mllib`.
0171 
0172 <div class="codetabs">
0173 <div data-lang="scala" markdown="1">
0174 
0175 [`PowerIterationClustering`](api/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.html) 
0176 implements the PIC algorithm.
0177 It takes an `RDD` of `(srcId: Long, dstId: Long, similarity: Double)` tuples representing the
0178 affinity matrix.
0179 Calling `PowerIterationClustering.run` returns a
0180 [`PowerIterationClusteringModel`](api/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringModel.html),
0181 which contains the computed clustering assignments.
0182 
0183 Refer to the [`PowerIterationClustering` Scala docs](api/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.html) and [`PowerIterationClusteringModel` Scala docs](api/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringModel.html) for details on the API.
0184 
0185 {% include_example scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala %}
0186 </div>
0187 
0188 <div data-lang="java" markdown="1">
0189 
0190 [`PowerIterationClustering`](api/java/org/apache/spark/mllib/clustering/PowerIterationClustering.html)
0191 implements the PIC algorithm.
0192 It takes an `JavaRDD` of `(srcId: Long, dstId: Long, similarity: Double)` tuples representing the
0193 affinity matrix.
0194 Calling `PowerIterationClustering.run` returns a
0195 [`PowerIterationClusteringModel`](api/java/org/apache/spark/mllib/clustering/PowerIterationClusteringModel.html)
0196 which contains the computed clustering assignments.
0197 
0198 Refer to the [`PowerIterationClustering` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClustering.html) and [`PowerIterationClusteringModel` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClusteringModel.html) for details on the API.
0199 
0200 {% include_example java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java %}
0201 </div>
0202 
0203 <div data-lang="python" markdown="1">
0204 
0205 [`PowerIterationClustering`](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClustering)
0206 implements the PIC algorithm.
0207 It takes an `RDD` of `(srcId: Long, dstId: Long, similarity: Double)` tuples representing the
0208 affinity matrix.
0209 Calling `PowerIterationClustering.run` returns a
0210 [`PowerIterationClusteringModel`](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClustering),
0211 which contains the computed clustering assignments.
0212 
0213 Refer to the [`PowerIterationClustering` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClustering) and [`PowerIterationClusteringModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClusteringModel) for more details on the API.
0214 
0215 {% include_example python/mllib/power_iteration_clustering_example.py %}
0216 </div>
0217 
0218 </div>
0219 
0220 ## Latent Dirichlet allocation (LDA)
0221 
0222 [Latent Dirichlet allocation (LDA)](http://en.wikipedia.org/wiki/Latent_Dirichlet_allocation)
0223 is a topic model which infers topics from a collection of text documents.
0224 LDA can be thought of as a clustering algorithm as follows:
0225 
0226 * Topics correspond to cluster centers, and documents correspond to
0227 examples (rows) in a dataset.
0228 * Topics and documents both exist in a feature space, where feature
0229 vectors are vectors of word counts (bag of words).
0230 * Rather than estimating a clustering using a traditional distance, LDA
0231 uses a function based on a statistical model of how text documents are
0232 generated.
0233 
0234 LDA supports different inference algorithms via `setOptimizer` function.
0235 `EMLDAOptimizer` learns clustering using
0236 [expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm)
0237 on the likelihood function and yields comprehensive results, while
0238 `OnlineLDAOptimizer` uses iterative mini-batch sampling for [online
0239 variational
0240 inference](https://mimno.infosci.cornell.edu/info6150/readings/HoffmanBleiBach2010b.pdf)
0241 and is generally memory friendly.
0242 
0243 LDA takes in a collection of documents as vectors of word counts and the
0244 following parameters (set using the builder pattern):
0245 
0246 * `k`: Number of topics (i.e., cluster centers)
0247 * `optimizer`: Optimizer to use for learning the LDA model, either
0248 `EMLDAOptimizer` or `OnlineLDAOptimizer`
0249 * `docConcentration`: Dirichlet parameter for prior over documents'
0250 distributions over topics. Larger values encourage smoother inferred
0251 distributions.
0252 * `topicConcentration`: Dirichlet parameter for prior over topics'
0253 distributions over terms (words). Larger values encourage smoother
0254 inferred distributions.
0255 * `maxIterations`: Limit on the number of iterations.
0256 * `checkpointInterval`: If using checkpointing (set in the Spark
0257 configuration), this parameter specifies the frequency with which
0258 checkpoints will be created.  If `maxIterations` is large, using
0259 checkpointing can help reduce shuffle file sizes on disk and help with
0260 failure recovery.
0261 
0262 
0263 All of `spark.mllib`'s LDA models support:
0264 
0265 * `describeTopics`: Returns topics as arrays of most important terms and
0266 term weights
0267 * `topicsMatrix`: Returns a `vocabSize` by `k` matrix where each column
0268 is a topic
0269 
0270 *Note*: LDA is still an experimental feature under active development.
0271 As a result, certain features are only available in one of the two
0272 optimizers / models generated by the optimizer. Currently, a distributed
0273 model can be converted into a local model, but not vice-versa.
0274 
0275 The following discussion will describe each optimizer/model pair
0276 separately.
0277 
0278 **Expectation Maximization**
0279 
0280 Implemented in
0281 [`EMLDAOptimizer`](api/scala/org/apache/spark/mllib/clustering/EMLDAOptimizer.html)
0282 and
0283 [`DistributedLDAModel`](api/scala/org/apache/spark/mllib/clustering/DistributedLDAModel.html).
0284 
0285 For the parameters provided to `LDA`:
0286 
0287 * `docConcentration`: Only symmetric priors are supported, so all values
0288 in the provided `k`-dimensional vector must be identical. All values
0289 must also be $> 1.0$. Providing `Vector(-1)` results in default behavior
0290 (uniform `k` dimensional vector with value $(50 / k) + 1$
0291 * `topicConcentration`: Only symmetric priors supported. Values must be
0292 $> 1.0$. Providing `-1` results in defaulting to a value of $0.1 + 1$.
0293 * `maxIterations`: The maximum number of EM iterations.
0294 
0295 *Note*: It is important to do enough iterations.  In early iterations, EM often has useless topics,
0296 but those topics improve dramatically after more iterations.  Using at least 20 and possibly
0297 50-100 iterations is often reasonable, depending on your dataset.
0298 
0299 `EMLDAOptimizer` produces a `DistributedLDAModel`, which stores not only
0300 the inferred topics but also the full training corpus and topic
0301 distributions for each document in the training corpus. A
0302 `DistributedLDAModel` supports:
0303 
0304  * `topTopicsPerDocument`: The top topics and their weights for
0305  each document in the training corpus
0306  * `topDocumentsPerTopic`: The top documents for each topic and
0307  the corresponding weight of the topic in the documents.
0308  * `logPrior`: log probability of the estimated topics and
0309  document-topic distributions given the hyperparameters
0310  `docConcentration` and `topicConcentration`
0311  * `logLikelihood`: log likelihood of the training corpus, given the
0312  inferred topics and document-topic distributions
0313 
0314 **Online Variational Bayes**
0315 
0316 Implemented in
0317 [`OnlineLDAOptimizer`](api/scala/org/apache/spark/mllib/clustering/OnlineLDAOptimizer.html)
0318 and
0319 [`LocalLDAModel`](api/scala/org/apache/spark/mllib/clustering/LocalLDAModel.html).
0320 
0321 For the parameters provided to `LDA`:
0322 
0323 * `docConcentration`: Asymmetric priors can be used by passing in a
0324 vector with values equal to the Dirichlet parameter in each of the `k`
0325 dimensions. Values should be $>= 0$. Providing `Vector(-1)` results in
0326 default behavior (uniform `k` dimensional vector with value $(1.0 / k)$)
0327 * `topicConcentration`: Only symmetric priors supported. Values must be
0328 $>= 0$. Providing `-1` results in defaulting to a value of $(1.0 / k)$.
0329 * `maxIterations`: Maximum number of minibatches to submit.
0330 
0331 In addition, `OnlineLDAOptimizer` accepts the following parameters:
0332 
0333 * `miniBatchFraction`: Fraction of corpus sampled and used at each
0334 iteration
0335 * `optimizeDocConcentration`: If set to true, performs maximum-likelihood
0336 estimation of the hyperparameter `docConcentration` (aka `alpha`)
0337 after each minibatch and sets the optimized `docConcentration` in the
0338 returned `LocalLDAModel`
0339 * `tau0` and `kappa`: Used for learning-rate decay, which is computed by
0340 $(\tau_0 + iter)^{-\kappa}$ where $iter$ is the current number of iterations.
0341 
0342 `OnlineLDAOptimizer` produces a `LocalLDAModel`, which only stores the
0343 inferred topics. A `LocalLDAModel` supports:
0344 
0345 * `logLikelihood(documents)`: Calculates a lower bound on the provided
0346 `documents` given the inferred topics.
0347 * `logPerplexity(documents)`: Calculates an upper bound on the
0348 perplexity of the provided `documents` given the inferred topics.
0349 
0350 **Examples**
0351 
0352 In the following example, we load word count vectors representing a corpus of documents.
0353 We then use [LDA](api/scala/org/apache/spark/mllib/clustering/LDA.html)
0354 to infer three topics from the documents. The number of desired clusters is passed
0355 to the algorithm. We then output the topics, represented as probability distributions over words.
0356 
0357 <div class="codetabs">
0358 <div data-lang="scala" markdown="1">
0359 Refer to the [`LDA` Scala docs](api/scala/org/apache/spark/mllib/clustering/LDA.html) and [`DistributedLDAModel` Scala docs](api/scala/org/apache/spark/mllib/clustering/DistributedLDAModel.html) for details on the API.
0360 
0361 {% include_example scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala %}
0362 </div>
0363 
0364 <div data-lang="java" markdown="1">
0365 Refer to the [`LDA` Java docs](api/java/org/apache/spark/mllib/clustering/LDA.html) and [`DistributedLDAModel` Java docs](api/java/org/apache/spark/mllib/clustering/DistributedLDAModel.html) for details on the API.
0366 
0367 {% include_example java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java %}
0368 </div>
0369 
0370 <div data-lang="python" markdown="1">
0371 Refer to the [`LDA` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA) and [`LDAModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDAModel) for more details on the API.
0372 
0373 {% include_example python/mllib/latent_dirichlet_allocation_example.py %}
0374 </div>
0375 
0376 </div>
0377 
0378 ## Bisecting k-means
0379 
0380 Bisecting K-means can often be much faster than regular K-means, but it will generally produce a different clustering.
0381 
0382 Bisecting k-means is a kind of [hierarchical clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering).
0383 Hierarchical clustering is one of the most commonly used  method of cluster analysis which seeks to build a hierarchy of clusters.
0384 Strategies for hierarchical clustering generally fall into two types:
0385 
0386 - Agglomerative: This is a "bottom up" approach: each observation starts in its own cluster, and pairs of clusters are merged as one moves up the hierarchy.
0387 - Divisive: This is a "top down" approach: all observations start in one cluster, and splits are performed recursively as one moves down the hierarchy.
0388 
0389 Bisecting k-means algorithm is a kind of divisive algorithms.
0390 The implementation in MLlib has the following parameters:
0391 
0392 * *k*: the desired number of leaf clusters (default: 4). The actual number could be smaller if there are no divisible leaf clusters.
0393 * *maxIterations*: the max number of k-means iterations to split clusters (default: 20)
0394 * *minDivisibleClusterSize*: the minimum number of points (if >= 1.0) or the minimum proportion of points (if < 1.0) of a divisible cluster (default: 1)
0395 * *seed*: a random seed (default: hash value of the class name)
0396 
0397 **Examples**
0398 
0399 <div class="codetabs">
0400 <div data-lang="scala" markdown="1">
0401 Refer to the [`BisectingKMeans` Scala docs](api/scala/org/apache/spark/mllib/clustering/BisectingKMeans.html) and [`BisectingKMeansModel` Scala docs](api/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.html) for details on the API.
0402 
0403 {% include_example scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala %}
0404 </div>
0405 
0406 <div data-lang="java" markdown="1">
0407 Refer to the [`BisectingKMeans` Java docs](api/java/org/apache/spark/mllib/clustering/BisectingKMeans.html) and [`BisectingKMeansModel` Java docs](api/java/org/apache/spark/mllib/clustering/BisectingKMeansModel.html) for details on the API.
0408 
0409 {% include_example java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java %}
0410 </div>
0411 
0412 <div data-lang="python" markdown="1">
0413 Refer to the [`BisectingKMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.BisectingKMeans) and [`BisectingKMeansModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.BisectingKMeansModel) for more details on the API.
0414 
0415 {% include_example python/mllib/bisecting_k_means_example.py %}
0416 </div>
0417 </div>
0418 
0419 ## Streaming k-means
0420 
0421 When data arrive in a stream, we may want to estimate clusters dynamically,
0422 updating them as new data arrive. `spark.mllib` provides support for streaming k-means clustering,
0423 with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm
0424 uses a generalization of the mini-batch k-means update rule. For each batch of data, we assign
0425 all points to their nearest cluster, compute new cluster centers, then update each cluster using:
0426 
0427 `\begin{equation}
0428     c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
0429 \end{equation}`
0430 `\begin{equation}
0431     n_{t+1} = n_t + m_t
0432 \end{equation}`
0433 
0434 Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned
0435 to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$`
0436 is the number of points added to the cluster in the current batch. The decay factor `$\alpha$`
0437 can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning;
0438 with `$\alpha$=0` only the most recent data will be used. This is analogous to an
0439 exponentially-weighted moving average.
0440 
0441 The decay can be specified using a `halfLife` parameter, which determines the
0442 correct decay factor `a` such that, for data acquired
0443 at time `t`, its contribution by time `t + halfLife` will have dropped to 0.5.
0444 The unit of time can be specified either as `batches` or `points` and the update rule
0445 will be adjusted accordingly.
0446 
0447 **Examples**
0448 
0449 This example shows how to estimate clusters on streaming data.
0450 
0451 <div class="codetabs">
0452 
0453 <div data-lang="scala" markdown="1">
0454 Refer to the [`StreamingKMeans` Scala docs](api/scala/org/apache/spark/mllib/clustering/StreamingKMeans.html) for details on the API.
0455 And Refer to [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for details on StreamingContext.
0456 
0457 {% include_example scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala %}
0458 </div>
0459 
0460 <div data-lang="python" markdown="1">
0461 Refer to the [`StreamingKMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.StreamingKMeans) for more details on the API.
0462 And Refer to [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for details on StreamingContext.
0463 
0464 {% include_example python/mllib/streaming_k_means_example.py %}
0465 </div>
0466 
0467 </div>
0468 
0469 As you add new text files with data the cluster centers will update. Each training
0470 point should be formatted as `[x1, x2, x3]`, and each test data point
0471 should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or identifier
0472 (e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir`
0473 the model will update. Anytime a text file is placed in `/testing/data/dir`
0474 you will see predictions. With new data, the cluster centers will change!