Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 from time import time, sleep
0019 import unittest
0020 
0021 from numpy import array, random, exp, dot, all, mean, abs
0022 from numpy import sum as array_sum
0023 
0024 from pyspark import SparkContext
0025 from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
0026 from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
0027 from pyspark.mllib.linalg import Vectors
0028 from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
0029 from pyspark.mllib.util import LinearDataGenerator
0030 from pyspark.streaming import StreamingContext
0031 from pyspark.testing.utils import eventually
0032 
0033 
0034 class MLLibStreamingTestCase(unittest.TestCase):
0035     def setUp(self):
0036         self.sc = SparkContext('local[4]', "MLlib tests")
0037         self.ssc = StreamingContext(self.sc, 1.0)
0038 
0039     def tearDown(self):
0040         self.ssc.stop(False)
0041         self.sc.stop()
0042 
0043 
0044 class StreamingKMeansTest(MLLibStreamingTestCase):
0045     def test_model_params(self):
0046         """Test that the model params are set correctly"""
0047         stkm = StreamingKMeans()
0048         stkm.setK(5).setDecayFactor(0.0)
0049         self.assertEqual(stkm._k, 5)
0050         self.assertEqual(stkm._decayFactor, 0.0)
0051 
0052         # Model not set yet.
0053         self.assertIsNone(stkm.latestModel())
0054         self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0])
0055 
0056         stkm.setInitialCenters(
0057             centers=[[0.0, 0.0], [1.0, 1.0]], weights=[1.0, 1.0])
0058         self.assertEqual(
0059             stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]])
0060         self.assertEqual(stkm.latestModel().clusterWeights, [1.0, 1.0])
0061 
0062     def test_accuracy_for_single_center(self):
0063         """Test that parameters obtained are correct for a single center."""
0064         centers, batches = self.streamingKMeansDataGenerator(
0065             batches=5, numPoints=5, k=1, d=5, r=0.1, seed=0)
0066         stkm = StreamingKMeans(1)
0067         stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.])
0068         input_stream = self.ssc.queueStream(
0069             [self.sc.parallelize(batch, 1) for batch in batches])
0070         stkm.trainOn(input_stream)
0071 
0072         self.ssc.start()
0073 
0074         def condition():
0075             self.assertEqual(stkm.latestModel().clusterWeights, [25.0])
0076             return True
0077         eventually(condition, catch_assertions=True)
0078 
0079         realCenters = array_sum(array(centers), axis=0)
0080         for i in range(5):
0081             modelCenters = stkm.latestModel().centers[0][i]
0082             self.assertAlmostEqual(centers[0][i], modelCenters, 1)
0083             self.assertAlmostEqual(realCenters[i], modelCenters, 1)
0084 
0085     def streamingKMeansDataGenerator(self, batches, numPoints,
0086                                      k, d, r, seed, centers=None):
0087         rng = random.RandomState(seed)
0088 
0089         # Generate centers.
0090         centers = [rng.randn(d) for i in range(k)]
0091 
0092         return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d))
0093                           for j in range(numPoints)]
0094                          for i in range(batches)]
0095 
0096     def test_trainOn_model(self):
0097         """Test the model on toy data with four clusters."""
0098         stkm = StreamingKMeans()
0099         initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
0100         stkm.setInitialCenters(
0101             centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])
0102 
0103         # Create a toy dataset by setting a tiny offset for each point.
0104         offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
0105         batches = []
0106         for offset in offsets:
0107             batches.append([[offset[0] + center[0], offset[1] + center[1]]
0108                             for center in initCenters])
0109 
0110         batches = [self.sc.parallelize(batch, 1) for batch in batches]
0111         input_stream = self.ssc.queueStream(batches)
0112         stkm.trainOn(input_stream)
0113         self.ssc.start()
0114 
0115         # Give enough time to train the model.
0116         def condition():
0117             finalModel = stkm.latestModel()
0118             self.assertTrue(all(finalModel.centers == array(initCenters)))
0119             self.assertEqual(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
0120             return True
0121         eventually(condition, catch_assertions=True)
0122 
0123     def test_predictOn_model(self):
0124         """Test that the model predicts correctly on toy data."""
0125         stkm = StreamingKMeans()
0126         stkm._model = StreamingKMeansModel(
0127             clusterCenters=[[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]],
0128             clusterWeights=[1.0, 1.0, 1.0, 1.0])
0129 
0130         predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
0131         predict_data = [self.sc.parallelize(batch, 1) for batch in predict_data]
0132         predict_stream = self.ssc.queueStream(predict_data)
0133         predict_val = stkm.predictOn(predict_stream)
0134 
0135         result = []
0136 
0137         def update(rdd):
0138             rdd_collect = rdd.collect()
0139             if rdd_collect:
0140                 result.append(rdd_collect)
0141 
0142         predict_val.foreachRDD(update)
0143         self.ssc.start()
0144 
0145         def condition():
0146             self.assertEqual(result, [[0], [1], [2], [3]])
0147             return True
0148 
0149         eventually(condition, catch_assertions=True)
0150 
0151     @unittest.skip("SPARK-10086: Flaky StreamingKMeans test in PySpark")
0152     def test_trainOn_predictOn(self):
0153         """Test that prediction happens on the updated model."""
0154         stkm = StreamingKMeans(decayFactor=0.0, k=2)
0155         stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0])
0156 
0157         # Since decay factor is set to zero, once the first batch
0158         # is passed the clusterCenters are updated to [-0.5, 0.7]
0159         # which causes 0.2 & 0.3 to be classified as 1, even though the
0160         # classification based in the initial model would have been 0
0161         # proving that the model is updated.
0162         batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
0163         batches = [self.sc.parallelize(batch) for batch in batches]
0164         input_stream = self.ssc.queueStream(batches)
0165         predict_results = []
0166 
0167         def collect(rdd):
0168             rdd_collect = rdd.collect()
0169             if rdd_collect:
0170                 predict_results.append(rdd_collect)
0171 
0172         stkm.trainOn(input_stream)
0173         predict_stream = stkm.predictOn(input_stream)
0174         predict_stream.foreachRDD(collect)
0175 
0176         self.ssc.start()
0177 
0178         def condition():
0179             self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
0180             return True
0181 
0182         eventually(condition, catch_assertions=True)
0183 
0184 
0185 class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
0186 
0187     @staticmethod
0188     def generateLogisticInput(offset, scale, nPoints, seed):
0189         """
0190         Generate 1 / (1 + exp(-x * scale + offset))
0191 
0192         where,
0193         x is randomnly distributed and the threshold
0194         and labels for each sample in x is obtained from a random uniform
0195         distribution.
0196         """
0197         rng = random.RandomState(seed)
0198         x = rng.randn(nPoints)
0199         sigmoid = 1. / (1 + exp(-(dot(x, scale) + offset)))
0200         y_p = rng.rand(nPoints)
0201         cut_off = y_p <= sigmoid
0202         y_p[cut_off] = 1.0
0203         y_p[~cut_off] = 0.0
0204         return [
0205             LabeledPoint(y_p[i], Vectors.dense([x[i]]))
0206             for i in range(nPoints)]
0207 
0208     def test_parameter_accuracy(self):
0209         """
0210         Test that the final value of weights is close to the desired value.
0211         """
0212         input_batches = [
0213             self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
0214             for i in range(20)]
0215         input_stream = self.ssc.queueStream(input_batches)
0216 
0217         slr = StreamingLogisticRegressionWithSGD(
0218             stepSize=0.2, numIterations=25)
0219         slr.setInitialWeights([0.0])
0220         slr.trainOn(input_stream)
0221 
0222         self.ssc.start()
0223 
0224         def condition():
0225             rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
0226             self.assertAlmostEqual(rel, 0.1, 1)
0227             return True
0228 
0229         eventually(condition, timeout=60.0, catch_assertions=True)
0230 
0231     def test_convergence(self):
0232         """
0233         Test that weights converge to the required value on toy data.
0234         """
0235         input_batches = [
0236             self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
0237             for i in range(20)]
0238         input_stream = self.ssc.queueStream(input_batches)
0239         models = []
0240 
0241         slr = StreamingLogisticRegressionWithSGD(
0242             stepSize=0.2, numIterations=25)
0243         slr.setInitialWeights([0.0])
0244         slr.trainOn(input_stream)
0245         input_stream.foreachRDD(
0246             lambda x: models.append(slr.latestModel().weights[0]))
0247 
0248         self.ssc.start()
0249 
0250         def condition():
0251             self.assertEqual(len(models), len(input_batches))
0252             return True
0253 
0254         # We want all batches to finish for this test.
0255         eventually(condition, 60.0, catch_assertions=True)
0256 
0257         t_models = array(models)
0258         diff = t_models[1:] - t_models[:-1]
0259         # Test that weights improve with a small tolerance
0260         self.assertTrue(all(diff >= -0.1))
0261         self.assertTrue(array_sum(diff > 0) > 1)
0262 
0263     @staticmethod
0264     def calculate_accuracy_error(true, predicted):
0265         return sum(abs(array(true) - array(predicted))) / len(true)
0266 
0267     def test_predictions(self):
0268         """Test predicted values on a toy model."""
0269         input_batches = []
0270         for i in range(20):
0271             batch = self.sc.parallelize(
0272                 self.generateLogisticInput(0, 1.5, 100, 42 + i))
0273             input_batches.append(batch.map(lambda x: (x.label, x.features)))
0274         input_stream = self.ssc.queueStream(input_batches)
0275 
0276         slr = StreamingLogisticRegressionWithSGD(
0277             stepSize=0.2, numIterations=25)
0278         slr.setInitialWeights([1.5])
0279         predict_stream = slr.predictOnValues(input_stream)
0280         true_predicted = []
0281         predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
0282         self.ssc.start()
0283 
0284         def condition():
0285             self.assertEqual(len(true_predicted), len(input_batches))
0286             return True
0287 
0288         eventually(condition, catch_assertions=True)
0289 
0290         # Test that the accuracy error is no more than 0.4 on each batch.
0291         for batch in true_predicted:
0292             true, predicted = zip(*batch)
0293             self.assertTrue(
0294                 self.calculate_accuracy_error(true, predicted) < 0.4)
0295 
0296     def test_training_and_prediction(self):
0297         """Test that the model improves on toy data with no. of batches"""
0298         input_batches = [
0299             self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
0300             for i in range(40)]
0301         predict_batches = [
0302             b.map(lambda lp: (lp.label, lp.features)) for b in input_batches]
0303 
0304         slr = StreamingLogisticRegressionWithSGD(
0305             stepSize=0.01, numIterations=25)
0306         slr.setInitialWeights([-0.1])
0307         errors = []
0308 
0309         def collect_errors(rdd):
0310             true, predicted = zip(*rdd.collect())
0311             errors.append(self.calculate_accuracy_error(true, predicted))
0312 
0313         true_predicted = []
0314         input_stream = self.ssc.queueStream(input_batches)
0315         predict_stream = self.ssc.queueStream(predict_batches)
0316         slr.trainOn(input_stream)
0317         ps = slr.predictOnValues(predict_stream)
0318         ps.foreachRDD(lambda x: collect_errors(x))
0319 
0320         self.ssc.start()
0321 
0322         def condition():
0323             # Test that the improvement in error is > 0.3
0324             if len(errors) == len(predict_batches):
0325                 self.assertGreater(errors[1] - errors[-1], 0.3)
0326             if len(errors) >= 3 and errors[1] - errors[-1] > 0.3:
0327                 return True
0328             return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
0329 
0330         eventually(condition, timeout=60.0)
0331 
0332 
0333 class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
0334 
0335     def assertArrayAlmostEqual(self, array1, array2, dec):
0336         for i, j in array1, array2:
0337             self.assertAlmostEqual(i, j, dec)
0338 
0339     def test_parameter_accuracy(self):
0340         """Test that coefs are predicted accurately by fitting on toy data."""
0341 
0342         # Test that fitting (10*X1 + 10*X2), (X1, X2) gives coefficients
0343         # (10, 10)
0344         slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
0345         slr.setInitialWeights([0.0, 0.0])
0346         xMean = [0.0, 0.0]
0347         xVariance = [1.0 / 3.0, 1.0 / 3.0]
0348 
0349         # Create ten batches with 100 sample points in each.
0350         batches = []
0351         for i in range(10):
0352             batch = LinearDataGenerator.generateLinearInput(
0353                 0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1)
0354             batches.append(self.sc.parallelize(batch))
0355 
0356         input_stream = self.ssc.queueStream(batches)
0357         slr.trainOn(input_stream)
0358         self.ssc.start()
0359 
0360         def condition():
0361             self.assertArrayAlmostEqual(
0362                 slr.latestModel().weights.array, [10., 10.], 1)
0363             self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
0364             return True
0365 
0366         eventually(condition, catch_assertions=True)
0367 
0368     def test_parameter_convergence(self):
0369         """Test that the model parameters improve with streaming data."""
0370         slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
0371         slr.setInitialWeights([0.0])
0372 
0373         # Create ten batches with 100 sample points in each.
0374         batches = []
0375         for i in range(10):
0376             batch = LinearDataGenerator.generateLinearInput(
0377                 0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
0378             batches.append(self.sc.parallelize(batch))
0379 
0380         model_weights = []
0381         input_stream = self.ssc.queueStream(batches)
0382         input_stream.foreachRDD(
0383             lambda x: model_weights.append(slr.latestModel().weights[0]))
0384         slr.trainOn(input_stream)
0385         self.ssc.start()
0386 
0387         def condition():
0388             self.assertEqual(len(model_weights), len(batches))
0389             return True
0390 
0391         # We want all batches to finish for this test.
0392         eventually(condition, catch_assertions=True)
0393 
0394         w = array(model_weights)
0395         diff = w[1:] - w[:-1]
0396         self.assertTrue(all(diff >= -0.1))
0397 
0398     def test_prediction(self):
0399         """Test prediction on a model with weights already set."""
0400         # Create a model with initial Weights equal to coefs
0401         slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
0402         slr.setInitialWeights([10.0, 10.0])
0403 
0404         # Create ten batches with 100 sample points in each.
0405         batches = []
0406         for i in range(10):
0407             batch = LinearDataGenerator.generateLinearInput(
0408                 0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0],
0409                 100, 42 + i, 0.1)
0410             batches.append(
0411                 self.sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
0412 
0413         input_stream = self.ssc.queueStream(batches)
0414         output_stream = slr.predictOnValues(input_stream)
0415         samples = []
0416         output_stream.foreachRDD(lambda x: samples.append(x.collect()))
0417 
0418         self.ssc.start()
0419 
0420         def condition():
0421             self.assertEqual(len(samples), len(batches))
0422             return True
0423 
0424         # We want all batches to finish for this test.
0425         eventually(condition, catch_assertions=True)
0426 
0427         # Test that mean absolute error on each batch is less than 0.1
0428         for batch in samples:
0429             true, predicted = zip(*batch)
0430             self.assertTrue(mean(abs(array(true) - array(predicted))) < 0.1)
0431 
0432     def test_train_prediction(self):
0433         """Test that error on test data improves as model is trained."""
0434         slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
0435         slr.setInitialWeights([0.0])
0436 
0437         # Create ten batches with 100 sample points in each.
0438         batches = []
0439         for i in range(10):
0440             batch = LinearDataGenerator.generateLinearInput(
0441                 0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
0442             batches.append(self.sc.parallelize(batch))
0443 
0444         predict_batches = [
0445             b.map(lambda lp: (lp.label, lp.features)) for b in batches]
0446         errors = []
0447 
0448         def func(rdd):
0449             true, predicted = zip(*rdd.collect())
0450             errors.append(mean(abs(true) - abs(predicted)))
0451 
0452         input_stream = self.ssc.queueStream(batches)
0453         output_stream = self.ssc.queueStream(predict_batches)
0454         slr.trainOn(input_stream)
0455         output_stream = slr.predictOnValues(output_stream)
0456         output_stream.foreachRDD(func)
0457         self.ssc.start()
0458 
0459         def condition():
0460             if len(errors) == len(predict_batches):
0461                 self.assertGreater(errors[1] - errors[-1], 2)
0462             if len(errors) >= 3 and errors[1] - errors[-1] > 2:
0463                 return True
0464             return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
0465 
0466         eventually(condition, timeout=180.0)
0467 
0468 
0469 if __name__ == "__main__":
0470     from pyspark.mllib.tests.test_streaming_algorithms import *
0471 
0472     try:
0473         import xmlrunner
0474         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0475     except ImportError:
0476         testRunner = None
0477     unittest.main(testRunner=testRunner, verbosity=2)