0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 import sys
0020 import unittest
0021
0022 if sys.version > '3':
0023 basestring = str
0024
0025 from pyspark.ml.feature import Binarizer, CountVectorizer, CountVectorizerModel, HashingTF, IDF, \
0026 NGram, RFormula, StopWordsRemover, StringIndexer, StringIndexerModel, VectorSizeHint
0027 from pyspark.ml.linalg import DenseVector, SparseVector, Vectors
0028 from pyspark.sql import Row
0029 from pyspark.testing.utils import QuietTest
0030 from pyspark.testing.mlutils import check_params, SparkSessionTestCase
0031
0032
0033 class FeatureTests(SparkSessionTestCase):
0034
0035 def test_binarizer(self):
0036 b0 = Binarizer()
0037 self.assertListEqual(b0.params, [b0.inputCol, b0.inputCols, b0.outputCol,
0038 b0.outputCols, b0.threshold, b0.thresholds])
0039 self.assertTrue(all([~b0.isSet(p) for p in b0.params]))
0040 self.assertTrue(b0.hasDefault(b0.threshold))
0041 self.assertEqual(b0.getThreshold(), 0.0)
0042 b0.setParams(inputCol="input", outputCol="output").setThreshold(1.0)
0043 self.assertTrue(not all([b0.isSet(p) for p in b0.params]))
0044 self.assertEqual(b0.getThreshold(), 1.0)
0045 self.assertEqual(b0.getInputCol(), "input")
0046 self.assertEqual(b0.getOutputCol(), "output")
0047
0048 b0c = b0.copy({b0.threshold: 2.0})
0049 self.assertEqual(b0c.uid, b0.uid)
0050 self.assertListEqual(b0c.params, b0.params)
0051 self.assertEqual(b0c.getThreshold(), 2.0)
0052
0053 b1 = Binarizer(threshold=2.0, inputCol="input", outputCol="output")
0054 self.assertNotEqual(b1.uid, b0.uid)
0055 self.assertEqual(b1.getThreshold(), 2.0)
0056 self.assertEqual(b1.getInputCol(), "input")
0057 self.assertEqual(b1.getOutputCol(), "output")
0058
0059 def test_idf(self):
0060 dataset = self.spark.createDataFrame([
0061 (DenseVector([1.0, 2.0]),),
0062 (DenseVector([0.0, 1.0]),),
0063 (DenseVector([3.0, 0.2]),)], ["tf"])
0064 idf0 = IDF(inputCol="tf")
0065 self.assertListEqual(idf0.params, [idf0.inputCol, idf0.minDocFreq, idf0.outputCol])
0066 idf0m = idf0.fit(dataset, {idf0.outputCol: "idf"})
0067 self.assertEqual(idf0m.uid, idf0.uid,
0068 "Model should inherit the UID from its parent estimator.")
0069 output = idf0m.transform(dataset)
0070 self.assertIsNotNone(output.head().idf)
0071 self.assertIsNotNone(idf0m.docFreq)
0072 self.assertEqual(idf0m.numDocs, 3)
0073
0074 check_params(self, idf0m)
0075
0076 def test_ngram(self):
0077 dataset = self.spark.createDataFrame([
0078 Row(input=["a", "b", "c", "d", "e"])])
0079 ngram0 = NGram(n=4, inputCol="input", outputCol="output")
0080 self.assertEqual(ngram0.getN(), 4)
0081 self.assertEqual(ngram0.getInputCol(), "input")
0082 self.assertEqual(ngram0.getOutputCol(), "output")
0083 transformedDF = ngram0.transform(dataset)
0084 self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"])
0085
0086 def test_stopwordsremover(self):
0087 dataset = self.spark.createDataFrame([Row(input=["a", "panda"])])
0088 stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output")
0089
0090 self.assertEqual(stopWordRemover.getInputCol(), "input")
0091 transformedDF = stopWordRemover.transform(dataset)
0092 self.assertEqual(transformedDF.head().output, ["panda"])
0093 self.assertEqual(type(stopWordRemover.getStopWords()), list)
0094 self.assertTrue(isinstance(stopWordRemover.getStopWords()[0], basestring))
0095
0096 stopwords = ["panda"]
0097 stopWordRemover.setStopWords(stopwords)
0098 self.assertEqual(stopWordRemover.getInputCol(), "input")
0099 self.assertEqual(stopWordRemover.getStopWords(), stopwords)
0100 transformedDF = stopWordRemover.transform(dataset)
0101 self.assertEqual(transformedDF.head().output, ["a"])
0102
0103 stopwords = StopWordsRemover.loadDefaultStopWords("turkish")
0104 dataset = self.spark.createDataFrame([Row(input=["acaba", "ama", "biri"])])
0105 stopWordRemover.setStopWords(stopwords)
0106 self.assertEqual(stopWordRemover.getStopWords(), stopwords)
0107 transformedDF = stopWordRemover.transform(dataset)
0108 self.assertEqual(transformedDF.head().output, [])
0109
0110 stopwords = ["BELKÄ°"]
0111 dataset = self.spark.createDataFrame([Row(input=["belki"])])
0112 stopWordRemover.setStopWords(stopwords).setLocale("tr")
0113 self.assertEqual(stopWordRemover.getStopWords(), stopwords)
0114 transformedDF = stopWordRemover.transform(dataset)
0115 self.assertEqual(transformedDF.head().output, [])
0116
0117 def test_count_vectorizer_with_binary(self):
0118 dataset = self.spark.createDataFrame([
0119 (0, "a a a b b c".split(' '), SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),),
0120 (1, "a a".split(' '), SparseVector(3, {0: 1.0}),),
0121 (2, "a b".split(' '), SparseVector(3, {0: 1.0, 1: 1.0}),),
0122 (3, "c".split(' '), SparseVector(3, {2: 1.0}),)], ["id", "words", "expected"])
0123 cv = CountVectorizer(binary=True, inputCol="words", outputCol="features")
0124 model = cv.fit(dataset)
0125
0126 transformedList = model.transform(dataset).select("features", "expected").collect()
0127
0128 for r in transformedList:
0129 feature, expected = r
0130 self.assertEqual(feature, expected)
0131
0132 def test_count_vectorizer_with_maxDF(self):
0133 dataset = self.spark.createDataFrame([
0134 (0, "a b c d".split(' '), SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),),
0135 (1, "a b c".split(' '), SparseVector(3, {0: 1.0, 1: 1.0}),),
0136 (2, "a b".split(' '), SparseVector(3, {0: 1.0}),),
0137 (3, "a".split(' '), SparseVector(3, {}),)], ["id", "words", "expected"])
0138 cv = CountVectorizer(inputCol="words", outputCol="features")
0139 model1 = cv.setMaxDF(3).fit(dataset)
0140 self.assertEqual(model1.vocabulary, ['b', 'c', 'd'])
0141
0142 transformedList1 = model1.transform(dataset).select("features", "expected").collect()
0143
0144 for r in transformedList1:
0145 feature, expected = r
0146 self.assertEqual(feature, expected)
0147
0148 model2 = cv.setMaxDF(0.75).fit(dataset)
0149 self.assertEqual(model2.vocabulary, ['b', 'c', 'd'])
0150
0151 transformedList2 = model2.transform(dataset).select("features", "expected").collect()
0152
0153 for r in transformedList2:
0154 feature, expected = r
0155 self.assertEqual(feature, expected)
0156
0157 def test_count_vectorizer_from_vocab(self):
0158 model = CountVectorizerModel.from_vocabulary(["a", "b", "c"], inputCol="words",
0159 outputCol="features", minTF=2)
0160 self.assertEqual(model.vocabulary, ["a", "b", "c"])
0161 self.assertEqual(model.getMinTF(), 2)
0162
0163 dataset = self.spark.createDataFrame([
0164 (0, "a a a b b c".split(' '), SparseVector(3, {0: 3.0, 1: 2.0}),),
0165 (1, "a a".split(' '), SparseVector(3, {0: 2.0}),),
0166 (2, "a b".split(' '), SparseVector(3, {}),)], ["id", "words", "expected"])
0167
0168 transformed_list = model.transform(dataset).select("features", "expected").collect()
0169
0170 for r in transformed_list:
0171 feature, expected = r
0172 self.assertEqual(feature, expected)
0173
0174
0175 with QuietTest(self.sc):
0176 with self.assertRaisesRegexp(Exception, "vocabSize.*invalid.*0"):
0177 CountVectorizerModel.from_vocabulary([], inputCol="words")
0178
0179
0180 model_default = CountVectorizerModel.from_vocabulary(["a", "b", "c"], inputCol="words")
0181 transformed_list = model_default.transform(dataset) \
0182 .select(model_default.getOrDefault(model_default.outputCol)).collect()
0183 self.assertEqual(len(transformed_list), 3)
0184
0185 def test_rformula_force_index_label(self):
0186 df = self.spark.createDataFrame([
0187 (1.0, 1.0, "a"),
0188 (0.0, 2.0, "b"),
0189 (1.0, 0.0, "a")], ["y", "x", "s"])
0190
0191 rf = RFormula(formula="y ~ x + s")
0192 model = rf.fit(df)
0193 transformedDF = model.transform(df)
0194 self.assertEqual(transformedDF.head().label, 1.0)
0195
0196 rf2 = RFormula(formula="y ~ x + s").setForceIndexLabel(True)
0197 model2 = rf2.fit(df)
0198 transformedDF2 = model2.transform(df)
0199 self.assertEqual(transformedDF2.head().label, 0.0)
0200
0201 def test_rformula_string_indexer_order_type(self):
0202 df = self.spark.createDataFrame([
0203 (1.0, 1.0, "a"),
0204 (0.0, 2.0, "b"),
0205 (1.0, 0.0, "a")], ["y", "x", "s"])
0206 rf = RFormula(formula="y ~ x + s", stringIndexerOrderType="alphabetDesc")
0207 self.assertEqual(rf.getStringIndexerOrderType(), 'alphabetDesc')
0208 transformedDF = rf.fit(df).transform(df)
0209 observed = transformedDF.select("features").collect()
0210 expected = [[1.0, 0.0], [2.0, 1.0], [0.0, 0.0]]
0211 for i in range(0, len(expected)):
0212 self.assertTrue(all(observed[i]["features"].toArray() == expected[i]))
0213
0214 def test_string_indexer_handle_invalid(self):
0215 df = self.spark.createDataFrame([
0216 (0, "a"),
0217 (1, "d"),
0218 (2, None)], ["id", "label"])
0219
0220 si1 = StringIndexer(inputCol="label", outputCol="indexed", handleInvalid="keep",
0221 stringOrderType="alphabetAsc")
0222 model1 = si1.fit(df)
0223 td1 = model1.transform(df)
0224 actual1 = td1.select("id", "indexed").collect()
0225 expected1 = [Row(id=0, indexed=0.0), Row(id=1, indexed=1.0), Row(id=2, indexed=2.0)]
0226 self.assertEqual(actual1, expected1)
0227
0228 si2 = si1.setHandleInvalid("skip")
0229 model2 = si2.fit(df)
0230 td2 = model2.transform(df)
0231 actual2 = td2.select("id", "indexed").collect()
0232 expected2 = [Row(id=0, indexed=0.0), Row(id=1, indexed=1.0)]
0233 self.assertEqual(actual2, expected2)
0234
0235 def test_string_indexer_from_labels(self):
0236 model = StringIndexerModel.from_labels(["a", "b", "c"], inputCol="label",
0237 outputCol="indexed", handleInvalid="keep")
0238 self.assertEqual(model.labels, ["a", "b", "c"])
0239
0240 df1 = self.spark.createDataFrame([
0241 (0, "a"),
0242 (1, "c"),
0243 (2, None),
0244 (3, "b"),
0245 (4, "b")], ["id", "label"])
0246
0247 result1 = model.transform(df1)
0248 actual1 = result1.select("id", "indexed").collect()
0249 expected1 = [Row(id=0, indexed=0.0), Row(id=1, indexed=2.0), Row(id=2, indexed=3.0),
0250 Row(id=3, indexed=1.0), Row(id=4, indexed=1.0)]
0251 self.assertEqual(actual1, expected1)
0252
0253 model_empty_labels = StringIndexerModel.from_labels(
0254 [], inputCol="label", outputCol="indexed", handleInvalid="keep")
0255 actual2 = model_empty_labels.transform(df1).select("id", "indexed").collect()
0256 expected2 = [Row(id=0, indexed=0.0), Row(id=1, indexed=0.0), Row(id=2, indexed=0.0),
0257 Row(id=3, indexed=0.0), Row(id=4, indexed=0.0)]
0258 self.assertEqual(actual2, expected2)
0259
0260
0261 model_default = StringIndexerModel.from_labels(["a", "b", "c"], inputCol="label")
0262 df2 = self.spark.createDataFrame([
0263 (0, "a"),
0264 (1, "c"),
0265 (2, "b"),
0266 (3, "b"),
0267 (4, "b")], ["id", "label"])
0268 transformed_list = model_default.transform(df2) \
0269 .select(model_default.getOrDefault(model_default.outputCol)).collect()
0270 self.assertEqual(len(transformed_list), 5)
0271
0272 def test_vector_size_hint(self):
0273 df = self.spark.createDataFrame(
0274 [(0, Vectors.dense([0.0, 10.0, 0.5])),
0275 (1, Vectors.dense([1.0, 11.0, 0.5, 0.6])),
0276 (2, Vectors.dense([2.0, 12.0]))],
0277 ["id", "vector"])
0278
0279 sizeHint = VectorSizeHint(
0280 inputCol="vector",
0281 handleInvalid="skip")
0282 sizeHint.setSize(3)
0283 self.assertEqual(sizeHint.getSize(), 3)
0284
0285 output = sizeHint.transform(df).head().vector
0286 expected = DenseVector([0.0, 10.0, 0.5])
0287 self.assertEqual(output, expected)
0288
0289
0290 class HashingTFTest(SparkSessionTestCase):
0291
0292 def test_apply_binary_term_freqs(self):
0293
0294 df = self.spark.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], ["id", "words"])
0295 n = 10
0296 hashingTF = HashingTF()
0297 hashingTF.setInputCol("words").setOutputCol("features").setNumFeatures(n).setBinary(True)
0298 output = hashingTF.transform(df)
0299 features = output.select("features").first().features.toArray()
0300 expected = Vectors.dense([0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0]).toArray()
0301 for i in range(0, n):
0302 self.assertAlmostEqual(features[i], expected[i], 14, "Error at " + str(i) +
0303 ": expected " + str(expected[i]) + ", got " + str(features[i]))
0304
0305
0306 if __name__ == "__main__":
0307 from pyspark.ml.tests.test_feature import *
0308
0309 try:
0310 import xmlrunner
0311 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0312 except ImportError:
0313 testRunner = None
0314 unittest.main(testRunner=testRunner, verbosity=2)