0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 from math import sqrt
0019 import unittest
0020
0021 from numpy import array, random, exp, abs, tile
0022
0023 from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, Vectors
0024 from pyspark.mllib.linalg.distributed import RowMatrix
0025 from pyspark.mllib.feature import HashingTF, IDF, StandardScaler, ElementwiseProduct, Word2Vec
0026 from pyspark.testing.mllibutils import MLlibTestCase
0027
0028
0029 class FeatureTest(MLlibTestCase):
0030 def test_idf_model(self):
0031 data = [
0032 Vectors.dense([1, 2, 6, 0, 2, 3, 1, 1, 0, 0, 3]),
0033 Vectors.dense([1, 3, 0, 1, 3, 0, 0, 2, 0, 0, 1]),
0034 Vectors.dense([1, 4, 1, 0, 0, 4, 9, 0, 1, 2, 0]),
0035 Vectors.dense([2, 1, 0, 3, 0, 0, 5, 0, 2, 3, 9])
0036 ]
0037 model = IDF().fit(self.sc.parallelize(data, 2))
0038 idf = model.idf()
0039 self.assertEqual(len(idf), 11)
0040
0041
0042 class Word2VecTests(MLlibTestCase):
0043 def test_word2vec_setters(self):
0044 model = Word2Vec() \
0045 .setVectorSize(2) \
0046 .setLearningRate(0.01) \
0047 .setNumPartitions(2) \
0048 .setNumIterations(10) \
0049 .setSeed(1024) \
0050 .setMinCount(3) \
0051 .setWindowSize(6)
0052 self.assertEqual(model.vectorSize, 2)
0053 self.assertTrue(model.learningRate < 0.02)
0054 self.assertEqual(model.numPartitions, 2)
0055 self.assertEqual(model.numIterations, 10)
0056 self.assertEqual(model.seed, 1024)
0057 self.assertEqual(model.minCount, 3)
0058 self.assertEqual(model.windowSize, 6)
0059
0060 def test_word2vec_get_vectors(self):
0061 data = [
0062 ["a", "b", "c", "d", "e", "f", "g"],
0063 ["a", "b", "c", "d", "e", "f"],
0064 ["a", "b", "c", "d", "e"],
0065 ["a", "b", "c", "d"],
0066 ["a", "b", "c"],
0067 ["a", "b"],
0068 ["a"]
0069 ]
0070 model = Word2Vec().fit(self.sc.parallelize(data))
0071 self.assertEqual(len(model.getVectors()), 3)
0072
0073
0074 class StandardScalerTests(MLlibTestCase):
0075 def test_model_setters(self):
0076 data = [
0077 [1.0, 2.0, 3.0],
0078 [2.0, 3.0, 4.0],
0079 [3.0, 4.0, 5.0]
0080 ]
0081 model = StandardScaler().fit(self.sc.parallelize(data))
0082 self.assertIsNotNone(model.setWithMean(True))
0083 self.assertIsNotNone(model.setWithStd(True))
0084 self.assertEqual(model.transform([1.0, 2.0, 3.0]), DenseVector([-1.0, -1.0, -1.0]))
0085
0086 def test_model_transform(self):
0087 data = [
0088 [1.0, 2.0, 3.0],
0089 [2.0, 3.0, 4.0],
0090 [3.0, 4.0, 5.0]
0091 ]
0092 model = StandardScaler().fit(self.sc.parallelize(data))
0093 self.assertEqual(model.transform([1.0, 2.0, 3.0]), DenseVector([1.0, 2.0, 3.0]))
0094
0095
0096 class ElementwiseProductTests(MLlibTestCase):
0097 def test_model_transform(self):
0098 weight = Vectors.dense([3, 2, 1])
0099
0100 densevec = Vectors.dense([4, 5, 6])
0101 sparsevec = Vectors.sparse(3, [0], [1])
0102 eprod = ElementwiseProduct(weight)
0103 self.assertEqual(eprod.transform(densevec), DenseVector([12, 10, 6]))
0104 self.assertEqual(
0105 eprod.transform(sparsevec), SparseVector(3, [0], [3]))
0106
0107
0108 class HashingTFTest(MLlibTestCase):
0109
0110 def test_binary_term_freqs(self):
0111 hashingTF = HashingTF(100).setBinary(True)
0112 doc = "a a b c c c".split(" ")
0113 n = hashingTF.numFeatures
0114 output = hashingTF.transform(doc).toArray()
0115 expected = Vectors.sparse(n, {hashingTF.indexOf("a"): 1.0,
0116 hashingTF.indexOf("b"): 1.0,
0117 hashingTF.indexOf("c"): 1.0}).toArray()
0118 for i in range(0, n):
0119 self.assertAlmostEqual(output[i], expected[i], 14, "Error at " + str(i) +
0120 ": expected " + str(expected[i]) + ", got " + str(output[i]))
0121
0122
0123 class DimensionalityReductionTests(MLlibTestCase):
0124
0125 denseData = [
0126 Vectors.dense([0.0, 1.0, 2.0]),
0127 Vectors.dense([3.0, 4.0, 5.0]),
0128 Vectors.dense([6.0, 7.0, 8.0]),
0129 Vectors.dense([9.0, 0.0, 1.0])
0130 ]
0131 sparseData = [
0132 Vectors.sparse(3, [(1, 1.0), (2, 2.0)]),
0133 Vectors.sparse(3, [(0, 3.0), (1, 4.0), (2, 5.0)]),
0134 Vectors.sparse(3, [(0, 6.0), (1, 7.0), (2, 8.0)]),
0135 Vectors.sparse(3, [(0, 9.0), (2, 1.0)])
0136 ]
0137
0138 def assertEqualUpToSign(self, vecA, vecB):
0139 eq1 = vecA - vecB
0140 eq2 = vecA + vecB
0141 self.assertTrue(sum(abs(eq1)) < 1e-6 or sum(abs(eq2)) < 1e-6)
0142
0143 def test_svd(self):
0144 denseMat = RowMatrix(self.sc.parallelize(self.denseData))
0145 sparseMat = RowMatrix(self.sc.parallelize(self.sparseData))
0146 m = 4
0147 n = 3
0148 for mat in [denseMat, sparseMat]:
0149 for k in range(1, 4):
0150 rm = mat.computeSVD(k, computeU=True)
0151 self.assertEqual(rm.s.size, k)
0152 self.assertEqual(rm.U.numRows(), m)
0153 self.assertEqual(rm.U.numCols(), k)
0154 self.assertEqual(rm.V.numRows, n)
0155 self.assertEqual(rm.V.numCols, k)
0156
0157
0158 self.assertEqual(mat.computeSVD(1).U, None)
0159
0160
0161
0162 rm = RowMatrix(self.sc.parallelize(tile([1, 2, 3], (3, 1))))
0163 self.assertEqual(rm.computeSVD(3, False, 1e-6).s.size, 1)
0164
0165 def test_pca(self):
0166 expected_pcs = array([
0167 [0.0, 1.0, 0.0],
0168 [sqrt(2.0) / 2.0, 0.0, sqrt(2.0) / 2.0],
0169 [sqrt(2.0) / 2.0, 0.0, -sqrt(2.0) / 2.0]
0170 ])
0171 n = 3
0172 denseMat = RowMatrix(self.sc.parallelize(self.denseData))
0173 sparseMat = RowMatrix(self.sc.parallelize(self.sparseData))
0174 for mat in [denseMat, sparseMat]:
0175 for k in range(1, 4):
0176 pcs = mat.computePrincipalComponents(k)
0177 self.assertEqual(pcs.numRows, n)
0178 self.assertEqual(pcs.numCols, k)
0179
0180
0181 self.assertEqualUpToSign(pcs.toArray()[:, k - 1], expected_pcs[:, k - 1])
0182
0183
0184 if __name__ == "__main__":
0185 from pyspark.mllib.tests.test_feature import *
0186
0187 try:
0188 import xmlrunner
0189 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0190 except ImportError:
0191 testRunner = None
0192 unittest.main(testRunner=testRunner, verbosity=2)