0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 Python package for feature in MLlib.
0020 """
0021 from __future__ import absolute_import
0022
0023 import sys
0024 import warnings
0025 if sys.version >= '3':
0026 basestring = str
0027 unicode = str
0028
0029 from py4j.protocol import Py4JJavaError
0030
0031 from pyspark import since
0032 from pyspark.rdd import RDD, ignore_unicode_prefix
0033 from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
0034 from pyspark.mllib.linalg import (
0035 Vector, Vectors, DenseVector, SparseVector, _convert_to_vector)
0036 from pyspark.mllib.regression import LabeledPoint
0037 from pyspark.mllib.util import JavaLoader, JavaSaveable
0038
0039 __all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
0040 'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel',
0041 'ChiSqSelector', 'ChiSqSelectorModel', 'ElementwiseProduct']
0042
0043
0044 class VectorTransformer(object):
0045 """
0046 Base class for transformation of a vector or RDD of vector
0047 """
0048 def transform(self, vector):
0049 """
0050 Applies transformation on a vector.
0051
0052 :param vector: vector to be transformed.
0053 """
0054 raise NotImplementedError
0055
0056
0057 class Normalizer(VectorTransformer):
0058 r"""
0059 Normalizes samples individually to unit L\ :sup:`p`\ norm
0060
0061 For any 1 <= `p` < float('inf'), normalizes samples using
0062 sum(abs(vector) :sup:`p`) :sup:`(1/p)` as norm.
0063
0064 For `p` = float('inf'), max(abs(vector)) will be used as norm for
0065 normalization.
0066
0067 :param p: Normalization in L^p^ space, p = 2 by default.
0068
0069 >>> v = Vectors.dense(range(3))
0070 >>> nor = Normalizer(1)
0071 >>> nor.transform(v)
0072 DenseVector([0.0, 0.3333, 0.6667])
0073
0074 >>> rdd = sc.parallelize([v])
0075 >>> nor.transform(rdd).collect()
0076 [DenseVector([0.0, 0.3333, 0.6667])]
0077
0078 >>> nor2 = Normalizer(float("inf"))
0079 >>> nor2.transform(v)
0080 DenseVector([0.0, 0.5, 1.0])
0081
0082 .. versionadded:: 1.2.0
0083 """
0084 def __init__(self, p=2.0):
0085 assert p >= 1.0, "p should be greater than 1.0"
0086 self.p = float(p)
0087
0088 @since('1.2.0')
0089 def transform(self, vector):
0090 """
0091 Applies unit length normalization on a vector.
0092
0093 :param vector: vector or RDD of vector to be normalized.
0094 :return: normalized vector. If the norm of the input is zero, it
0095 will return the input vector.
0096 """
0097 if isinstance(vector, RDD):
0098 vector = vector.map(_convert_to_vector)
0099 else:
0100 vector = _convert_to_vector(vector)
0101 return callMLlibFunc("normalizeVector", self.p, vector)
0102
0103
0104 class JavaVectorTransformer(JavaModelWrapper, VectorTransformer):
0105 """
0106 Wrapper for the model in JVM
0107 """
0108
0109 def transform(self, vector):
0110 """
0111 Applies transformation on a vector or an RDD[Vector].
0112
0113 .. note:: In Python, transform cannot currently be used within
0114 an RDD transformation or action.
0115 Call transform directly on the RDD instead.
0116
0117 :param vector: Vector or RDD of Vector to be transformed.
0118 """
0119 if isinstance(vector, RDD):
0120 vector = vector.map(_convert_to_vector)
0121 else:
0122 vector = _convert_to_vector(vector)
0123 return self.call("transform", vector)
0124
0125
0126 class StandardScalerModel(JavaVectorTransformer):
0127 """
0128 Represents a StandardScaler model that can transform vectors.
0129
0130 .. versionadded:: 1.2.0
0131 """
0132
0133 @since('1.2.0')
0134 def transform(self, vector):
0135 """
0136 Applies standardization transformation on a vector.
0137
0138 .. note:: In Python, transform cannot currently be used within
0139 an RDD transformation or action.
0140 Call transform directly on the RDD instead.
0141
0142 :param vector: Vector or RDD of Vector to be standardized.
0143 :return: Standardized vector. If the variance of a column is
0144 zero, it will return default `0.0` for the column with
0145 zero variance.
0146 """
0147 return JavaVectorTransformer.transform(self, vector)
0148
0149 @since('1.4.0')
0150 def setWithMean(self, withMean):
0151 """
0152 Setter of the boolean which decides
0153 whether it uses mean or not
0154 """
0155 self.call("setWithMean", withMean)
0156 return self
0157
0158 @since('1.4.0')
0159 def setWithStd(self, withStd):
0160 """
0161 Setter of the boolean which decides
0162 whether it uses std or not
0163 """
0164 self.call("setWithStd", withStd)
0165 return self
0166
0167 @property
0168 @since('2.0.0')
0169 def withStd(self):
0170 """
0171 Returns if the model scales the data to unit standard deviation.
0172 """
0173 return self.call("withStd")
0174
0175 @property
0176 @since('2.0.0')
0177 def withMean(self):
0178 """
0179 Returns if the model centers the data before scaling.
0180 """
0181 return self.call("withMean")
0182
0183 @property
0184 @since('2.0.0')
0185 def std(self):
0186 """
0187 Return the column standard deviation values.
0188 """
0189 return self.call("std")
0190
0191 @property
0192 @since('2.0.0')
0193 def mean(self):
0194 """
0195 Return the column mean values.
0196 """
0197 return self.call("mean")
0198
0199
0200 class StandardScaler(object):
0201 """
0202 Standardizes features by removing the mean and scaling to unit
0203 variance using column summary statistics on the samples in the
0204 training set.
0205
0206 :param withMean: False by default. Centers the data with mean
0207 before scaling. It will build a dense output, so take
0208 care when applying to sparse input.
0209 :param withStd: True by default. Scales the data to unit
0210 standard deviation.
0211
0212 >>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])]
0213 >>> dataset = sc.parallelize(vs)
0214 >>> standardizer = StandardScaler(True, True)
0215 >>> model = standardizer.fit(dataset)
0216 >>> result = model.transform(dataset)
0217 >>> for r in result.collect(): r
0218 DenseVector([-0.7071, 0.7071, -0.7071])
0219 DenseVector([0.7071, -0.7071, 0.7071])
0220 >>> int(model.std[0])
0221 4
0222 >>> int(model.mean[0]*10)
0223 9
0224 >>> model.withStd
0225 True
0226 >>> model.withMean
0227 True
0228
0229 .. versionadded:: 1.2.0
0230 """
0231 def __init__(self, withMean=False, withStd=True):
0232 if not (withMean or withStd):
0233 warnings.warn("Both withMean and withStd are false. The model does nothing.")
0234 self.withMean = withMean
0235 self.withStd = withStd
0236
0237 @since('1.2.0')
0238 def fit(self, dataset):
0239 """
0240 Computes the mean and variance and stores as a model to be used
0241 for later scaling.
0242
0243 :param dataset: The data used to compute the mean and variance
0244 to build the transformation model.
0245 :return: a StandardScalarModel
0246 """
0247 dataset = dataset.map(_convert_to_vector)
0248 jmodel = callMLlibFunc("fitStandardScaler", self.withMean, self.withStd, dataset)
0249 return StandardScalerModel(jmodel)
0250
0251
0252 class ChiSqSelectorModel(JavaVectorTransformer):
0253 """
0254 Represents a Chi Squared selector model.
0255
0256 .. versionadded:: 1.4.0
0257 """
0258
0259 @since('1.4.0')
0260 def transform(self, vector):
0261 """
0262 Applies transformation on a vector.
0263
0264 :param vector: Vector or RDD of Vector to be transformed.
0265 :return: transformed vector.
0266 """
0267 return JavaVectorTransformer.transform(self, vector)
0268
0269
0270 class ChiSqSelector(object):
0271 """
0272 Creates a ChiSquared feature selector.
0273 The selector supports different selection methods: `numTopFeatures`, `percentile`, `fpr`,
0274 `fdr`, `fwe`.
0275
0276 * `numTopFeatures` chooses a fixed number of top features according to a chi-squared test.
0277
0278 * `percentile` is similar but chooses a fraction of all features
0279 instead of a fixed number.
0280
0281 * `fpr` chooses all features whose p-values are below a threshold,
0282 thus controlling the false positive rate of selection.
0283
0284 * `fdr` uses the `Benjamini-Hochberg procedure <https://en.wikipedia.org/wiki/
0285 False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure>`_
0286 to choose all features whose false discovery rate is below a threshold.
0287
0288 * `fwe` chooses all features whose p-values are below a threshold. The threshold is scaled by
0289 1/numFeatures, thus controlling the family-wise error rate of selection.
0290
0291 By default, the selection method is `numTopFeatures`, with the default number of top features
0292 set to 50.
0293
0294 >>> data = sc.parallelize([
0295 ... LabeledPoint(0.0, SparseVector(3, {0: 8.0, 1: 7.0})),
0296 ... LabeledPoint(1.0, SparseVector(3, {1: 9.0, 2: 6.0})),
0297 ... LabeledPoint(1.0, [0.0, 9.0, 8.0]),
0298 ... LabeledPoint(2.0, [7.0, 9.0, 5.0]),
0299 ... LabeledPoint(2.0, [8.0, 7.0, 3.0])
0300 ... ])
0301 >>> model = ChiSqSelector(numTopFeatures=1).fit(data)
0302 >>> model.transform(SparseVector(3, {1: 9.0, 2: 6.0}))
0303 SparseVector(1, {})
0304 >>> model.transform(DenseVector([7.0, 9.0, 5.0]))
0305 DenseVector([7.0])
0306 >>> model = ChiSqSelector(selectorType="fpr", fpr=0.2).fit(data)
0307 >>> model.transform(SparseVector(3, {1: 9.0, 2: 6.0}))
0308 SparseVector(1, {})
0309 >>> model.transform(DenseVector([7.0, 9.0, 5.0]))
0310 DenseVector([7.0])
0311 >>> model = ChiSqSelector(selectorType="percentile", percentile=0.34).fit(data)
0312 >>> model.transform(DenseVector([7.0, 9.0, 5.0]))
0313 DenseVector([7.0])
0314
0315 .. versionadded:: 1.4.0
0316 """
0317 def __init__(self, numTopFeatures=50, selectorType="numTopFeatures", percentile=0.1, fpr=0.05,
0318 fdr=0.05, fwe=0.05):
0319 self.numTopFeatures = numTopFeatures
0320 self.selectorType = selectorType
0321 self.percentile = percentile
0322 self.fpr = fpr
0323 self.fdr = fdr
0324 self.fwe = fwe
0325
0326 @since('2.1.0')
0327 def setNumTopFeatures(self, numTopFeatures):
0328 """
0329 set numTopFeature for feature selection by number of top features.
0330 Only applicable when selectorType = "numTopFeatures".
0331 """
0332 self.numTopFeatures = int(numTopFeatures)
0333 return self
0334
0335 @since('2.1.0')
0336 def setPercentile(self, percentile):
0337 """
0338 set percentile [0.0, 1.0] for feature selection by percentile.
0339 Only applicable when selectorType = "percentile".
0340 """
0341 self.percentile = float(percentile)
0342 return self
0343
0344 @since('2.1.0')
0345 def setFpr(self, fpr):
0346 """
0347 set FPR [0.0, 1.0] for feature selection by FPR.
0348 Only applicable when selectorType = "fpr".
0349 """
0350 self.fpr = float(fpr)
0351 return self
0352
0353 @since('2.2.0')
0354 def setFdr(self, fdr):
0355 """
0356 set FDR [0.0, 1.0] for feature selection by FDR.
0357 Only applicable when selectorType = "fdr".
0358 """
0359 self.fdr = float(fdr)
0360 return self
0361
0362 @since('2.2.0')
0363 def setFwe(self, fwe):
0364 """
0365 set FWE [0.0, 1.0] for feature selection by FWE.
0366 Only applicable when selectorType = "fwe".
0367 """
0368 self.fwe = float(fwe)
0369 return self
0370
0371 @since('2.1.0')
0372 def setSelectorType(self, selectorType):
0373 """
0374 set the selector type of the ChisqSelector.
0375 Supported options: "numTopFeatures" (default), "percentile", "fpr", "fdr", "fwe".
0376 """
0377 self.selectorType = str(selectorType)
0378 return self
0379
0380 @since('1.4.0')
0381 def fit(self, data):
0382 """
0383 Returns a ChiSquared feature selector.
0384
0385 :param data: an `RDD[LabeledPoint]` containing the labeled dataset
0386 with categorical features. Real-valued features will be
0387 treated as categorical for each distinct value.
0388 Apply feature discretizer before using this function.
0389 """
0390 jmodel = callMLlibFunc("fitChiSqSelector", self.selectorType, self.numTopFeatures,
0391 self.percentile, self.fpr, self.fdr, self.fwe, data)
0392 return ChiSqSelectorModel(jmodel)
0393
0394
0395 class PCAModel(JavaVectorTransformer):
0396 """
0397 Model fitted by [[PCA]] that can project vectors to a low-dimensional space using PCA.
0398
0399 .. versionadded:: 1.5.0
0400 """
0401
0402
0403 class PCA(object):
0404 """
0405 A feature transformer that projects vectors to a low-dimensional space using PCA.
0406
0407 >>> data = [Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),
0408 ... Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),
0409 ... Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0])]
0410 >>> model = PCA(2).fit(sc.parallelize(data))
0411 >>> pcArray = model.transform(Vectors.sparse(5, [(1, 1.0), (3, 7.0)])).toArray()
0412 >>> pcArray[0]
0413 1.648...
0414 >>> pcArray[1]
0415 -4.013...
0416
0417 .. versionadded:: 1.5.0
0418 """
0419 def __init__(self, k):
0420 """
0421 :param k: number of principal components.
0422 """
0423 self.k = int(k)
0424
0425 @since('1.5.0')
0426 def fit(self, data):
0427 """
0428 Computes a [[PCAModel]] that contains the principal components of the input vectors.
0429 :param data: source vectors
0430 """
0431 jmodel = callMLlibFunc("fitPCA", self.k, data)
0432 return PCAModel(jmodel)
0433
0434
0435 class HashingTF(object):
0436 """
0437 Maps a sequence of terms to their term frequencies using the hashing
0438 trick.
0439
0440 .. note:: The terms must be hashable (can not be dict/set/list...).
0441
0442 :param numFeatures: number of features (default: 2^20)
0443
0444 >>> htf = HashingTF(100)
0445 >>> doc = "a a b b c d".split(" ")
0446 >>> htf.transform(doc)
0447 SparseVector(100, {...})
0448
0449 .. versionadded:: 1.2.0
0450 """
0451 def __init__(self, numFeatures=1 << 20):
0452 self.numFeatures = numFeatures
0453 self.binary = False
0454
0455 @since("2.0.0")
0456 def setBinary(self, value):
0457 """
0458 If True, term frequency vector will be binary such that non-zero
0459 term counts will be set to 1
0460 (default: False)
0461 """
0462 self.binary = value
0463 return self
0464
0465 @since('1.2.0')
0466 def indexOf(self, term):
0467 """ Returns the index of the input term. """
0468 return hash(term) % self.numFeatures
0469
0470 @since('1.2.0')
0471 def transform(self, document):
0472 """
0473 Transforms the input document (list of terms) to term frequency
0474 vectors, or transform the RDD of document to RDD of term
0475 frequency vectors.
0476 """
0477 if isinstance(document, RDD):
0478 return document.map(self.transform)
0479
0480 freq = {}
0481 for term in document:
0482 i = self.indexOf(term)
0483 freq[i] = 1.0 if self.binary else freq.get(i, 0) + 1.0
0484 return Vectors.sparse(self.numFeatures, freq.items())
0485
0486
0487 class IDFModel(JavaVectorTransformer):
0488 """
0489 Represents an IDF model that can transform term frequency vectors.
0490
0491 .. versionadded:: 1.2.0
0492 """
0493 @since('1.2.0')
0494 def transform(self, x):
0495 """
0496 Transforms term frequency (TF) vectors to TF-IDF vectors.
0497
0498 If `minDocFreq` was set for the IDF calculation,
0499 the terms which occur in fewer than `minDocFreq`
0500 documents will have an entry of 0.
0501
0502 .. note:: In Python, transform cannot currently be used within
0503 an RDD transformation or action.
0504 Call transform directly on the RDD instead.
0505
0506 :param x: an RDD of term frequency vectors or a term frequency
0507 vector
0508 :return: an RDD of TF-IDF vectors or a TF-IDF vector
0509 """
0510 return JavaVectorTransformer.transform(self, x)
0511
0512 @since('1.4.0')
0513 def idf(self):
0514 """
0515 Returns the current IDF vector.
0516 """
0517 return self.call('idf')
0518
0519 @since('3.0.0')
0520 def docFreq(self):
0521 """
0522 Returns the document frequency.
0523 """
0524 return self.call('docFreq')
0525
0526 @since('3.0.0')
0527 def numDocs(self):
0528 """
0529 Returns number of documents evaluated to compute idf
0530 """
0531 return self.call('numDocs')
0532
0533
0534 class IDF(object):
0535 """
0536 Inverse document frequency (IDF).
0537
0538 The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`,
0539 where `m` is the total number of documents and `d(t)` is the number
0540 of documents that contain term `t`.
0541
0542 This implementation supports filtering out terms which do not appear
0543 in a minimum number of documents (controlled by the variable
0544 `minDocFreq`). For terms that are not in at least `minDocFreq`
0545 documents, the IDF is found as 0, resulting in TF-IDFs of 0.
0546
0547 :param minDocFreq: minimum of documents in which a term
0548 should appear for filtering
0549
0550 >>> n = 4
0551 >>> freqs = [Vectors.sparse(n, (1, 3), (1.0, 2.0)),
0552 ... Vectors.dense([0.0, 1.0, 2.0, 3.0]),
0553 ... Vectors.sparse(n, [1], [1.0])]
0554 >>> data = sc.parallelize(freqs)
0555 >>> idf = IDF()
0556 >>> model = idf.fit(data)
0557 >>> tfidf = model.transform(data)
0558 >>> for r in tfidf.collect(): r
0559 SparseVector(4, {1: 0.0, 3: 0.5754})
0560 DenseVector([0.0, 0.0, 1.3863, 0.863])
0561 SparseVector(4, {1: 0.0})
0562 >>> model.transform(Vectors.dense([0.0, 1.0, 2.0, 3.0]))
0563 DenseVector([0.0, 0.0, 1.3863, 0.863])
0564 >>> model.transform([0.0, 1.0, 2.0, 3.0])
0565 DenseVector([0.0, 0.0, 1.3863, 0.863])
0566 >>> model.transform(Vectors.sparse(n, (1, 3), (1.0, 2.0)))
0567 SparseVector(4, {1: 0.0, 3: 0.5754})
0568
0569 .. versionadded:: 1.2.0
0570 """
0571 def __init__(self, minDocFreq=0):
0572 self.minDocFreq = minDocFreq
0573
0574 @since('1.2.0')
0575 def fit(self, dataset):
0576 """
0577 Computes the inverse document frequency.
0578
0579 :param dataset: an RDD of term frequency vectors
0580 """
0581 if not isinstance(dataset, RDD):
0582 raise TypeError("dataset should be an RDD of term frequency vectors")
0583 jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset.map(_convert_to_vector))
0584 return IDFModel(jmodel)
0585
0586
0587 class Word2VecModel(JavaVectorTransformer, JavaSaveable, JavaLoader):
0588 """
0589 class for Word2Vec model
0590
0591 .. versionadded:: 1.2.0
0592 """
0593 @since('1.2.0')
0594 def transform(self, word):
0595 """
0596 Transforms a word to its vector representation
0597
0598 .. note:: Local use only
0599
0600 :param word: a word
0601 :return: vector representation of word(s)
0602 """
0603 try:
0604 return self.call("transform", word)
0605 except Py4JJavaError:
0606 raise ValueError("%s not found" % word)
0607
0608 @since('1.2.0')
0609 def findSynonyms(self, word, num):
0610 """
0611 Find synonyms of a word
0612
0613 :param word: a word or a vector representation of word
0614 :param num: number of synonyms to find
0615 :return: array of (word, cosineSimilarity)
0616
0617 .. note:: Local use only
0618 """
0619 if not isinstance(word, basestring):
0620 word = _convert_to_vector(word)
0621 words, similarity = self.call("findSynonyms", word, num)
0622 return zip(words, similarity)
0623
0624 @since('1.4.0')
0625 def getVectors(self):
0626 """
0627 Returns a map of words to their vector representations.
0628 """
0629 return self.call("getVectors")
0630
0631 @classmethod
0632 @since('1.5.0')
0633 def load(cls, sc, path):
0634 """
0635 Load a model from the given path.
0636 """
0637 jmodel = sc._jvm.org.apache.spark.mllib.feature \
0638 .Word2VecModel.load(sc._jsc.sc(), path)
0639 model = sc._jvm.org.apache.spark.mllib.api.python.Word2VecModelWrapper(jmodel)
0640 return Word2VecModel(model)
0641
0642
0643 @ignore_unicode_prefix
0644 class Word2Vec(object):
0645 """Word2Vec creates vector representation of words in a text corpus.
0646 The algorithm first constructs a vocabulary from the corpus
0647 and then learns vector representation of words in the vocabulary.
0648 The vector representation can be used as features in
0649 natural language processing and machine learning algorithms.
0650
0651 We used skip-gram model in our implementation and hierarchical
0652 softmax method to train the model. The variable names in the
0653 implementation matches the original C implementation.
0654
0655 For original C implementation,
0656 see https://code.google.com/p/word2vec/
0657 For research papers, see
0658 Efficient Estimation of Word Representations in Vector Space
0659 and Distributed Representations of Words and Phrases and their
0660 Compositionality.
0661
0662 >>> sentence = "a b " * 100 + "a c " * 10
0663 >>> localDoc = [sentence, sentence]
0664 >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
0665 >>> model = Word2Vec().setVectorSize(10).setSeed(42).fit(doc)
0666
0667 Querying for synonyms of a word will not return that word:
0668
0669 >>> syms = model.findSynonyms("a", 2)
0670 >>> [s[0] for s in syms]
0671 [u'b', u'c']
0672
0673 But querying for synonyms of a vector may return the word whose
0674 representation is that vector:
0675
0676 >>> vec = model.transform("a")
0677 >>> syms = model.findSynonyms(vec, 2)
0678 >>> [s[0] for s in syms]
0679 [u'a', u'b']
0680
0681 >>> import os, tempfile
0682 >>> path = tempfile.mkdtemp()
0683 >>> model.save(sc, path)
0684 >>> sameModel = Word2VecModel.load(sc, path)
0685 >>> model.transform("a") == sameModel.transform("a")
0686 True
0687 >>> syms = sameModel.findSynonyms("a", 2)
0688 >>> [s[0] for s in syms]
0689 [u'b', u'c']
0690 >>> from shutil import rmtree
0691 >>> try:
0692 ... rmtree(path)
0693 ... except OSError:
0694 ... pass
0695
0696 .. versionadded:: 1.2.0
0697
0698 """
0699 def __init__(self):
0700 """
0701 Construct Word2Vec instance
0702 """
0703 self.vectorSize = 100
0704 self.learningRate = 0.025
0705 self.numPartitions = 1
0706 self.numIterations = 1
0707 self.seed = None
0708 self.minCount = 5
0709 self.windowSize = 5
0710
0711 @since('1.2.0')
0712 def setVectorSize(self, vectorSize):
0713 """
0714 Sets vector size (default: 100).
0715 """
0716 self.vectorSize = vectorSize
0717 return self
0718
0719 @since('1.2.0')
0720 def setLearningRate(self, learningRate):
0721 """
0722 Sets initial learning rate (default: 0.025).
0723 """
0724 self.learningRate = learningRate
0725 return self
0726
0727 @since('1.2.0')
0728 def setNumPartitions(self, numPartitions):
0729 """
0730 Sets number of partitions (default: 1). Use a small number for
0731 accuracy.
0732 """
0733 self.numPartitions = numPartitions
0734 return self
0735
0736 @since('1.2.0')
0737 def setNumIterations(self, numIterations):
0738 """
0739 Sets number of iterations (default: 1), which should be smaller
0740 than or equal to number of partitions.
0741 """
0742 self.numIterations = numIterations
0743 return self
0744
0745 @since('1.2.0')
0746 def setSeed(self, seed):
0747 """
0748 Sets random seed.
0749 """
0750 self.seed = seed
0751 return self
0752
0753 @since('1.4.0')
0754 def setMinCount(self, minCount):
0755 """
0756 Sets minCount, the minimum number of times a token must appear
0757 to be included in the word2vec model's vocabulary (default: 5).
0758 """
0759 self.minCount = minCount
0760 return self
0761
0762 @since('2.0.0')
0763 def setWindowSize(self, windowSize):
0764 """
0765 Sets window size (default: 5).
0766 """
0767 self.windowSize = windowSize
0768 return self
0769
0770 @since('1.2.0')
0771 def fit(self, data):
0772 """
0773 Computes the vector representation of each word in vocabulary.
0774
0775 :param data: training data. RDD of list of string
0776 :return: Word2VecModel instance
0777 """
0778 if not isinstance(data, RDD):
0779 raise TypeError("data should be an RDD of list of string")
0780 jmodel = callMLlibFunc("trainWord2VecModel", data, int(self.vectorSize),
0781 float(self.learningRate), int(self.numPartitions),
0782 int(self.numIterations), self.seed,
0783 int(self.minCount), int(self.windowSize))
0784 return Word2VecModel(jmodel)
0785
0786
0787 class ElementwiseProduct(VectorTransformer):
0788 """
0789 Scales each column of the vector, with the supplied weight vector.
0790 i.e the elementwise product.
0791
0792 >>> weight = Vectors.dense([1.0, 2.0, 3.0])
0793 >>> eprod = ElementwiseProduct(weight)
0794 >>> a = Vectors.dense([2.0, 1.0, 3.0])
0795 >>> eprod.transform(a)
0796 DenseVector([2.0, 2.0, 9.0])
0797 >>> b = Vectors.dense([9.0, 3.0, 4.0])
0798 >>> rdd = sc.parallelize([a, b])
0799 >>> eprod.transform(rdd).collect()
0800 [DenseVector([2.0, 2.0, 9.0]), DenseVector([9.0, 6.0, 12.0])]
0801
0802 .. versionadded:: 1.5.0
0803 """
0804 def __init__(self, scalingVector):
0805 self.scalingVector = _convert_to_vector(scalingVector)
0806
0807 @since('1.5.0')
0808 def transform(self, vector):
0809 """
0810 Computes the Hadamard product of the vector.
0811 """
0812 if isinstance(vector, RDD):
0813 vector = vector.map(_convert_to_vector)
0814
0815 else:
0816 vector = _convert_to_vector(vector)
0817 return callMLlibFunc("elementwiseProductVector", self.scalingVector, vector)
0818
0819
0820 def _test():
0821 import doctest
0822 from pyspark.sql import SparkSession
0823 globs = globals().copy()
0824 spark = SparkSession.builder\
0825 .master("local[4]")\
0826 .appName("mllib.feature tests")\
0827 .getOrCreate()
0828 globs['sc'] = spark.sparkContext
0829 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0830 spark.stop()
0831 if failure_count:
0832 sys.exit(-1)
0833
0834 if __name__ == "__main__":
0835 sys.path.pop(0)
0836 _test()