0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 from time import time, sleep
0019 import unittest
0020
0021 from numpy import array, random, exp, dot, all, mean, abs
0022 from numpy import sum as array_sum
0023
0024 from pyspark import SparkContext
0025 from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
0026 from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
0027 from pyspark.mllib.linalg import Vectors
0028 from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
0029 from pyspark.mllib.util import LinearDataGenerator
0030 from pyspark.streaming import StreamingContext
0031 from pyspark.testing.utils import eventually
0032
0033
0034 class MLLibStreamingTestCase(unittest.TestCase):
0035 def setUp(self):
0036 self.sc = SparkContext('local[4]', "MLlib tests")
0037 self.ssc = StreamingContext(self.sc, 1.0)
0038
0039 def tearDown(self):
0040 self.ssc.stop(False)
0041 self.sc.stop()
0042
0043
0044 class StreamingKMeansTest(MLLibStreamingTestCase):
0045 def test_model_params(self):
0046 """Test that the model params are set correctly"""
0047 stkm = StreamingKMeans()
0048 stkm.setK(5).setDecayFactor(0.0)
0049 self.assertEqual(stkm._k, 5)
0050 self.assertEqual(stkm._decayFactor, 0.0)
0051
0052
0053 self.assertIsNone(stkm.latestModel())
0054 self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0])
0055
0056 stkm.setInitialCenters(
0057 centers=[[0.0, 0.0], [1.0, 1.0]], weights=[1.0, 1.0])
0058 self.assertEqual(
0059 stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]])
0060 self.assertEqual(stkm.latestModel().clusterWeights, [1.0, 1.0])
0061
0062 def test_accuracy_for_single_center(self):
0063 """Test that parameters obtained are correct for a single center."""
0064 centers, batches = self.streamingKMeansDataGenerator(
0065 batches=5, numPoints=5, k=1, d=5, r=0.1, seed=0)
0066 stkm = StreamingKMeans(1)
0067 stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.])
0068 input_stream = self.ssc.queueStream(
0069 [self.sc.parallelize(batch, 1) for batch in batches])
0070 stkm.trainOn(input_stream)
0071
0072 self.ssc.start()
0073
0074 def condition():
0075 self.assertEqual(stkm.latestModel().clusterWeights, [25.0])
0076 return True
0077 eventually(condition, catch_assertions=True)
0078
0079 realCenters = array_sum(array(centers), axis=0)
0080 for i in range(5):
0081 modelCenters = stkm.latestModel().centers[0][i]
0082 self.assertAlmostEqual(centers[0][i], modelCenters, 1)
0083 self.assertAlmostEqual(realCenters[i], modelCenters, 1)
0084
0085 def streamingKMeansDataGenerator(self, batches, numPoints,
0086 k, d, r, seed, centers=None):
0087 rng = random.RandomState(seed)
0088
0089
0090 centers = [rng.randn(d) for i in range(k)]
0091
0092 return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d))
0093 for j in range(numPoints)]
0094 for i in range(batches)]
0095
0096 def test_trainOn_model(self):
0097 """Test the model on toy data with four clusters."""
0098 stkm = StreamingKMeans()
0099 initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
0100 stkm.setInitialCenters(
0101 centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])
0102
0103
0104 offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
0105 batches = []
0106 for offset in offsets:
0107 batches.append([[offset[0] + center[0], offset[1] + center[1]]
0108 for center in initCenters])
0109
0110 batches = [self.sc.parallelize(batch, 1) for batch in batches]
0111 input_stream = self.ssc.queueStream(batches)
0112 stkm.trainOn(input_stream)
0113 self.ssc.start()
0114
0115
0116 def condition():
0117 finalModel = stkm.latestModel()
0118 self.assertTrue(all(finalModel.centers == array(initCenters)))
0119 self.assertEqual(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
0120 return True
0121 eventually(condition, catch_assertions=True)
0122
0123 def test_predictOn_model(self):
0124 """Test that the model predicts correctly on toy data."""
0125 stkm = StreamingKMeans()
0126 stkm._model = StreamingKMeansModel(
0127 clusterCenters=[[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]],
0128 clusterWeights=[1.0, 1.0, 1.0, 1.0])
0129
0130 predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
0131 predict_data = [self.sc.parallelize(batch, 1) for batch in predict_data]
0132 predict_stream = self.ssc.queueStream(predict_data)
0133 predict_val = stkm.predictOn(predict_stream)
0134
0135 result = []
0136
0137 def update(rdd):
0138 rdd_collect = rdd.collect()
0139 if rdd_collect:
0140 result.append(rdd_collect)
0141
0142 predict_val.foreachRDD(update)
0143 self.ssc.start()
0144
0145 def condition():
0146 self.assertEqual(result, [[0], [1], [2], [3]])
0147 return True
0148
0149 eventually(condition, catch_assertions=True)
0150
0151 @unittest.skip("SPARK-10086: Flaky StreamingKMeans test in PySpark")
0152 def test_trainOn_predictOn(self):
0153 """Test that prediction happens on the updated model."""
0154 stkm = StreamingKMeans(decayFactor=0.0, k=2)
0155 stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0])
0156
0157
0158
0159
0160
0161
0162 batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
0163 batches = [self.sc.parallelize(batch) for batch in batches]
0164 input_stream = self.ssc.queueStream(batches)
0165 predict_results = []
0166
0167 def collect(rdd):
0168 rdd_collect = rdd.collect()
0169 if rdd_collect:
0170 predict_results.append(rdd_collect)
0171
0172 stkm.trainOn(input_stream)
0173 predict_stream = stkm.predictOn(input_stream)
0174 predict_stream.foreachRDD(collect)
0175
0176 self.ssc.start()
0177
0178 def condition():
0179 self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
0180 return True
0181
0182 eventually(condition, catch_assertions=True)
0183
0184
0185 class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
0186
0187 @staticmethod
0188 def generateLogisticInput(offset, scale, nPoints, seed):
0189 """
0190 Generate 1 / (1 + exp(-x * scale + offset))
0191
0192 where,
0193 x is randomnly distributed and the threshold
0194 and labels for each sample in x is obtained from a random uniform
0195 distribution.
0196 """
0197 rng = random.RandomState(seed)
0198 x = rng.randn(nPoints)
0199 sigmoid = 1. / (1 + exp(-(dot(x, scale) + offset)))
0200 y_p = rng.rand(nPoints)
0201 cut_off = y_p <= sigmoid
0202 y_p[cut_off] = 1.0
0203 y_p[~cut_off] = 0.0
0204 return [
0205 LabeledPoint(y_p[i], Vectors.dense([x[i]]))
0206 for i in range(nPoints)]
0207
0208 def test_parameter_accuracy(self):
0209 """
0210 Test that the final value of weights is close to the desired value.
0211 """
0212 input_batches = [
0213 self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
0214 for i in range(20)]
0215 input_stream = self.ssc.queueStream(input_batches)
0216
0217 slr = StreamingLogisticRegressionWithSGD(
0218 stepSize=0.2, numIterations=25)
0219 slr.setInitialWeights([0.0])
0220 slr.trainOn(input_stream)
0221
0222 self.ssc.start()
0223
0224 def condition():
0225 rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
0226 self.assertAlmostEqual(rel, 0.1, 1)
0227 return True
0228
0229 eventually(condition, timeout=60.0, catch_assertions=True)
0230
0231 def test_convergence(self):
0232 """
0233 Test that weights converge to the required value on toy data.
0234 """
0235 input_batches = [
0236 self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
0237 for i in range(20)]
0238 input_stream = self.ssc.queueStream(input_batches)
0239 models = []
0240
0241 slr = StreamingLogisticRegressionWithSGD(
0242 stepSize=0.2, numIterations=25)
0243 slr.setInitialWeights([0.0])
0244 slr.trainOn(input_stream)
0245 input_stream.foreachRDD(
0246 lambda x: models.append(slr.latestModel().weights[0]))
0247
0248 self.ssc.start()
0249
0250 def condition():
0251 self.assertEqual(len(models), len(input_batches))
0252 return True
0253
0254
0255 eventually(condition, 60.0, catch_assertions=True)
0256
0257 t_models = array(models)
0258 diff = t_models[1:] - t_models[:-1]
0259
0260 self.assertTrue(all(diff >= -0.1))
0261 self.assertTrue(array_sum(diff > 0) > 1)
0262
0263 @staticmethod
0264 def calculate_accuracy_error(true, predicted):
0265 return sum(abs(array(true) - array(predicted))) / len(true)
0266
0267 def test_predictions(self):
0268 """Test predicted values on a toy model."""
0269 input_batches = []
0270 for i in range(20):
0271 batch = self.sc.parallelize(
0272 self.generateLogisticInput(0, 1.5, 100, 42 + i))
0273 input_batches.append(batch.map(lambda x: (x.label, x.features)))
0274 input_stream = self.ssc.queueStream(input_batches)
0275
0276 slr = StreamingLogisticRegressionWithSGD(
0277 stepSize=0.2, numIterations=25)
0278 slr.setInitialWeights([1.5])
0279 predict_stream = slr.predictOnValues(input_stream)
0280 true_predicted = []
0281 predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
0282 self.ssc.start()
0283
0284 def condition():
0285 self.assertEqual(len(true_predicted), len(input_batches))
0286 return True
0287
0288 eventually(condition, catch_assertions=True)
0289
0290
0291 for batch in true_predicted:
0292 true, predicted = zip(*batch)
0293 self.assertTrue(
0294 self.calculate_accuracy_error(true, predicted) < 0.4)
0295
0296 def test_training_and_prediction(self):
0297 """Test that the model improves on toy data with no. of batches"""
0298 input_batches = [
0299 self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
0300 for i in range(40)]
0301 predict_batches = [
0302 b.map(lambda lp: (lp.label, lp.features)) for b in input_batches]
0303
0304 slr = StreamingLogisticRegressionWithSGD(
0305 stepSize=0.01, numIterations=25)
0306 slr.setInitialWeights([-0.1])
0307 errors = []
0308
0309 def collect_errors(rdd):
0310 true, predicted = zip(*rdd.collect())
0311 errors.append(self.calculate_accuracy_error(true, predicted))
0312
0313 true_predicted = []
0314 input_stream = self.ssc.queueStream(input_batches)
0315 predict_stream = self.ssc.queueStream(predict_batches)
0316 slr.trainOn(input_stream)
0317 ps = slr.predictOnValues(predict_stream)
0318 ps.foreachRDD(lambda x: collect_errors(x))
0319
0320 self.ssc.start()
0321
0322 def condition():
0323
0324 if len(errors) == len(predict_batches):
0325 self.assertGreater(errors[1] - errors[-1], 0.3)
0326 if len(errors) >= 3 and errors[1] - errors[-1] > 0.3:
0327 return True
0328 return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
0329
0330 eventually(condition, timeout=60.0)
0331
0332
0333 class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
0334
0335 def assertArrayAlmostEqual(self, array1, array2, dec):
0336 for i, j in array1, array2:
0337 self.assertAlmostEqual(i, j, dec)
0338
0339 def test_parameter_accuracy(self):
0340 """Test that coefs are predicted accurately by fitting on toy data."""
0341
0342
0343
0344 slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
0345 slr.setInitialWeights([0.0, 0.0])
0346 xMean = [0.0, 0.0]
0347 xVariance = [1.0 / 3.0, 1.0 / 3.0]
0348
0349
0350 batches = []
0351 for i in range(10):
0352 batch = LinearDataGenerator.generateLinearInput(
0353 0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1)
0354 batches.append(self.sc.parallelize(batch))
0355
0356 input_stream = self.ssc.queueStream(batches)
0357 slr.trainOn(input_stream)
0358 self.ssc.start()
0359
0360 def condition():
0361 self.assertArrayAlmostEqual(
0362 slr.latestModel().weights.array, [10., 10.], 1)
0363 self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
0364 return True
0365
0366 eventually(condition, catch_assertions=True)
0367
0368 def test_parameter_convergence(self):
0369 """Test that the model parameters improve with streaming data."""
0370 slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
0371 slr.setInitialWeights([0.0])
0372
0373
0374 batches = []
0375 for i in range(10):
0376 batch = LinearDataGenerator.generateLinearInput(
0377 0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
0378 batches.append(self.sc.parallelize(batch))
0379
0380 model_weights = []
0381 input_stream = self.ssc.queueStream(batches)
0382 input_stream.foreachRDD(
0383 lambda x: model_weights.append(slr.latestModel().weights[0]))
0384 slr.trainOn(input_stream)
0385 self.ssc.start()
0386
0387 def condition():
0388 self.assertEqual(len(model_weights), len(batches))
0389 return True
0390
0391
0392 eventually(condition, catch_assertions=True)
0393
0394 w = array(model_weights)
0395 diff = w[1:] - w[:-1]
0396 self.assertTrue(all(diff >= -0.1))
0397
0398 def test_prediction(self):
0399 """Test prediction on a model with weights already set."""
0400
0401 slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
0402 slr.setInitialWeights([10.0, 10.0])
0403
0404
0405 batches = []
0406 for i in range(10):
0407 batch = LinearDataGenerator.generateLinearInput(
0408 0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0],
0409 100, 42 + i, 0.1)
0410 batches.append(
0411 self.sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
0412
0413 input_stream = self.ssc.queueStream(batches)
0414 output_stream = slr.predictOnValues(input_stream)
0415 samples = []
0416 output_stream.foreachRDD(lambda x: samples.append(x.collect()))
0417
0418 self.ssc.start()
0419
0420 def condition():
0421 self.assertEqual(len(samples), len(batches))
0422 return True
0423
0424
0425 eventually(condition, catch_assertions=True)
0426
0427
0428 for batch in samples:
0429 true, predicted = zip(*batch)
0430 self.assertTrue(mean(abs(array(true) - array(predicted))) < 0.1)
0431
0432 def test_train_prediction(self):
0433 """Test that error on test data improves as model is trained."""
0434 slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
0435 slr.setInitialWeights([0.0])
0436
0437
0438 batches = []
0439 for i in range(10):
0440 batch = LinearDataGenerator.generateLinearInput(
0441 0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
0442 batches.append(self.sc.parallelize(batch))
0443
0444 predict_batches = [
0445 b.map(lambda lp: (lp.label, lp.features)) for b in batches]
0446 errors = []
0447
0448 def func(rdd):
0449 true, predicted = zip(*rdd.collect())
0450 errors.append(mean(abs(true) - abs(predicted)))
0451
0452 input_stream = self.ssc.queueStream(batches)
0453 output_stream = self.ssc.queueStream(predict_batches)
0454 slr.trainOn(input_stream)
0455 output_stream = slr.predictOnValues(output_stream)
0456 output_stream.foreachRDD(func)
0457 self.ssc.start()
0458
0459 def condition():
0460 if len(errors) == len(predict_batches):
0461 self.assertGreater(errors[1] - errors[-1], 2)
0462 if len(errors) >= 3 and errors[1] - errors[-1] > 2:
0463 return True
0464 return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
0465
0466 eventually(condition, timeout=180.0)
0467
0468
0469 if __name__ == "__main__":
0470 from pyspark.mllib.tests.test_streaming_algorithms import *
0471
0472 try:
0473 import xmlrunner
0474 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0475 except ImportError:
0476 testRunner = None
0477 unittest.main(testRunner=testRunner, verbosity=2)