Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 import os
0018 import shutil
0019 import sys
0020 import tempfile
0021 import unittest
0022 from array import array
0023 
0024 from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME
0025 
0026 
0027 class InputFormatTests(ReusedPySparkTestCase):
0028 
0029     @classmethod
0030     def setUpClass(cls):
0031         ReusedPySparkTestCase.setUpClass()
0032         cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
0033         os.unlink(cls.tempdir.name)
0034         cls.sc._jvm.WriteInputFormatTestDataGenerator.generateData(cls.tempdir.name, cls.sc._jsc)
0035 
0036     @classmethod
0037     def tearDownClass(cls):
0038         ReusedPySparkTestCase.tearDownClass()
0039         shutil.rmtree(cls.tempdir.name)
0040 
0041     @unittest.skipIf(sys.version >= "3", "serialize array of byte")
0042     def test_sequencefiles(self):
0043         basepath = self.tempdir.name
0044         ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/",
0045                                            "org.apache.hadoop.io.IntWritable",
0046                                            "org.apache.hadoop.io.Text").collect())
0047         ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
0048         self.assertEqual(ints, ei)
0049 
0050         doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/",
0051                                               "org.apache.hadoop.io.DoubleWritable",
0052                                               "org.apache.hadoop.io.Text").collect())
0053         ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
0054         self.assertEqual(doubles, ed)
0055 
0056         bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
0057                                             "org.apache.hadoop.io.IntWritable",
0058                                             "org.apache.hadoop.io.BytesWritable").collect())
0059         ebs = [(1, bytearray('aa', 'utf-8')),
0060                (1, bytearray('aa', 'utf-8')),
0061                (2, bytearray('aa', 'utf-8')),
0062                (2, bytearray('bb', 'utf-8')),
0063                (2, bytearray('bb', 'utf-8')),
0064                (3, bytearray('cc', 'utf-8'))]
0065         self.assertEqual(bytes, ebs)
0066 
0067         text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/",
0068                                            "org.apache.hadoop.io.Text",
0069                                            "org.apache.hadoop.io.Text").collect())
0070         et = [(u'1', u'aa'),
0071               (u'1', u'aa'),
0072               (u'2', u'aa'),
0073               (u'2', u'bb'),
0074               (u'2', u'bb'),
0075               (u'3', u'cc')]
0076         self.assertEqual(text, et)
0077 
0078         bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/",
0079                                             "org.apache.hadoop.io.IntWritable",
0080                                             "org.apache.hadoop.io.BooleanWritable").collect())
0081         eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
0082         self.assertEqual(bools, eb)
0083 
0084         nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/",
0085                                             "org.apache.hadoop.io.IntWritable",
0086                                             "org.apache.hadoop.io.BooleanWritable").collect())
0087         en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
0088         self.assertEqual(nulls, en)
0089 
0090         maps = self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
0091                                     "org.apache.hadoop.io.IntWritable",
0092                                     "org.apache.hadoop.io.MapWritable").collect()
0093         em = [(1, {}),
0094               (1, {3.0: u'bb'}),
0095               (2, {1.0: u'aa'}),
0096               (2, {1.0: u'cc'}),
0097               (3, {2.0: u'dd'})]
0098         for v in maps:
0099             self.assertTrue(v in em)
0100 
0101         # arrays get pickled to tuples by default
0102         tuples = sorted(self.sc.sequenceFile(
0103             basepath + "/sftestdata/sfarray/",
0104             "org.apache.hadoop.io.IntWritable",
0105             "org.apache.spark.api.python.DoubleArrayWritable").collect())
0106         et = [(1, ()),
0107               (2, (3.0, 4.0, 5.0)),
0108               (3, (4.0, 5.0, 6.0))]
0109         self.assertEqual(tuples, et)
0110 
0111         # with custom converters, primitive arrays can stay as arrays
0112         arrays = sorted(self.sc.sequenceFile(
0113             basepath + "/sftestdata/sfarray/",
0114             "org.apache.hadoop.io.IntWritable",
0115             "org.apache.spark.api.python.DoubleArrayWritable",
0116             valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
0117         ea = [(1, array('d')),
0118               (2, array('d', [3.0, 4.0, 5.0])),
0119               (3, array('d', [4.0, 5.0, 6.0]))]
0120         self.assertEqual(arrays, ea)
0121 
0122         clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
0123                                             "org.apache.hadoop.io.Text",
0124                                             "org.apache.spark.api.python.TestWritable").collect())
0125         cname = u'org.apache.spark.api.python.TestWritable'
0126         ec = [(u'1', {u'__class__': cname, u'double': 1.0, u'int': 1, u'str': u'test1'}),
0127               (u'2', {u'__class__': cname, u'double': 2.3, u'int': 2, u'str': u'test2'}),
0128               (u'3', {u'__class__': cname, u'double': 3.1, u'int': 3, u'str': u'test3'}),
0129               (u'4', {u'__class__': cname, u'double': 4.2, u'int': 4, u'str': u'test4'}),
0130               (u'5', {u'__class__': cname, u'double': 5.5, u'int': 5, u'str': u'test56'})]
0131         self.assertEqual(clazz, ec)
0132 
0133         unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
0134                                                       "org.apache.hadoop.io.Text",
0135                                                       "org.apache.spark.api.python.TestWritable",
0136                                                       ).collect())
0137         self.assertEqual(unbatched_clazz, ec)
0138 
0139     def test_oldhadoop(self):
0140         basepath = self.tempdir.name
0141         ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
0142                                          "org.apache.hadoop.mapred.SequenceFileInputFormat",
0143                                          "org.apache.hadoop.io.IntWritable",
0144                                          "org.apache.hadoop.io.Text").collect())
0145         ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
0146         self.assertEqual(ints, ei)
0147 
0148         hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
0149         oldconf = {"mapreduce.input.fileinputformat.inputdir": hellopath}
0150         hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
0151                                   "org.apache.hadoop.io.LongWritable",
0152                                   "org.apache.hadoop.io.Text",
0153                                   conf=oldconf).collect()
0154         result = [(0, u'Hello World!')]
0155         self.assertEqual(hello, result)
0156 
0157     def test_newhadoop(self):
0158         basepath = self.tempdir.name
0159         ints = sorted(self.sc.newAPIHadoopFile(
0160             basepath + "/sftestdata/sfint/",
0161             "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0162             "org.apache.hadoop.io.IntWritable",
0163             "org.apache.hadoop.io.Text").collect())
0164         ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
0165         self.assertEqual(ints, ei)
0166 
0167         hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
0168         newconf = {"mapreduce.input.fileinputformat.inputdir": hellopath}
0169         hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
0170                                         "org.apache.hadoop.io.LongWritable",
0171                                         "org.apache.hadoop.io.Text",
0172                                         conf=newconf).collect()
0173         result = [(0, u'Hello World!')]
0174         self.assertEqual(hello, result)
0175 
0176     def test_newolderror(self):
0177         basepath = self.tempdir.name
0178         self.assertRaises(Exception, lambda: self.sc.hadoopFile(
0179             basepath + "/sftestdata/sfint/",
0180             "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0181             "org.apache.hadoop.io.IntWritable",
0182             "org.apache.hadoop.io.Text"))
0183 
0184         self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile(
0185             basepath + "/sftestdata/sfint/",
0186             "org.apache.hadoop.mapred.SequenceFileInputFormat",
0187             "org.apache.hadoop.io.IntWritable",
0188             "org.apache.hadoop.io.Text"))
0189 
0190     def test_bad_inputs(self):
0191         basepath = self.tempdir.name
0192         self.assertRaises(Exception, lambda: self.sc.sequenceFile(
0193             basepath + "/sftestdata/sfint/",
0194             "org.apache.hadoop.io.NotValidWritable",
0195             "org.apache.hadoop.io.Text"))
0196         self.assertRaises(Exception, lambda: self.sc.hadoopFile(
0197             basepath + "/sftestdata/sfint/",
0198             "org.apache.hadoop.mapred.NotValidInputFormat",
0199             "org.apache.hadoop.io.IntWritable",
0200             "org.apache.hadoop.io.Text"))
0201         self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile(
0202             basepath + "/sftestdata/sfint/",
0203             "org.apache.hadoop.mapreduce.lib.input.NotValidInputFormat",
0204             "org.apache.hadoop.io.IntWritable",
0205             "org.apache.hadoop.io.Text"))
0206 
0207     def test_converters(self):
0208         # use of custom converters
0209         basepath = self.tempdir.name
0210         maps = sorted(self.sc.sequenceFile(
0211             basepath + "/sftestdata/sfmap/",
0212             "org.apache.hadoop.io.IntWritable",
0213             "org.apache.hadoop.io.MapWritable",
0214             keyConverter="org.apache.spark.api.python.TestInputKeyConverter",
0215             valueConverter="org.apache.spark.api.python.TestInputValueConverter").collect())
0216         em = [(u'\x01', []),
0217               (u'\x01', [3.0]),
0218               (u'\x02', [1.0]),
0219               (u'\x02', [1.0]),
0220               (u'\x03', [2.0])]
0221         self.assertEqual(maps, em)
0222 
0223     def test_binary_files(self):
0224         path = os.path.join(self.tempdir.name, "binaryfiles")
0225         os.mkdir(path)
0226         data = b"short binary data"
0227         with open(os.path.join(path, "part-0000"), 'wb') as f:
0228             f.write(data)
0229         [(p, d)] = self.sc.binaryFiles(path).collect()
0230         self.assertTrue(p.endswith("part-0000"))
0231         self.assertEqual(d, data)
0232 
0233     def test_binary_records(self):
0234         path = os.path.join(self.tempdir.name, "binaryrecords")
0235         os.mkdir(path)
0236         with open(os.path.join(path, "part-0000"), 'w') as f:
0237             for i in range(100):
0238                 f.write('%04d' % i)
0239         result = self.sc.binaryRecords(path, 4).map(int).collect()
0240         self.assertEqual(list(range(100)), result)
0241 
0242 
0243 class OutputFormatTests(ReusedPySparkTestCase):
0244 
0245     def setUp(self):
0246         self.tempdir = tempfile.NamedTemporaryFile(delete=False)
0247         os.unlink(self.tempdir.name)
0248 
0249     def tearDown(self):
0250         shutil.rmtree(self.tempdir.name, ignore_errors=True)
0251 
0252     @unittest.skipIf(sys.version >= "3", "serialize array of byte")
0253     def test_sequencefiles(self):
0254         basepath = self.tempdir.name
0255         ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
0256         self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/")
0257         ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect())
0258         self.assertEqual(ints, ei)
0259 
0260         ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
0261         self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/")
0262         doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect())
0263         self.assertEqual(doubles, ed)
0264 
0265         ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))]
0266         self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/")
0267         bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect())
0268         self.assertEqual(bytes, ebs)
0269 
0270         et = [(u'1', u'aa'),
0271               (u'2', u'bb'),
0272               (u'3', u'cc')]
0273         self.sc.parallelize(et).saveAsSequenceFile(basepath + "/sftext/")
0274         text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect())
0275         self.assertEqual(text, et)
0276 
0277         eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
0278         self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/")
0279         bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect())
0280         self.assertEqual(bools, eb)
0281 
0282         en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
0283         self.sc.parallelize(en).saveAsSequenceFile(basepath + "/sfnull/")
0284         nulls = sorted(self.sc.sequenceFile(basepath + "/sfnull/").collect())
0285         self.assertEqual(nulls, en)
0286 
0287         em = [(1, {}),
0288               (1, {3.0: u'bb'}),
0289               (2, {1.0: u'aa'}),
0290               (2, {1.0: u'cc'}),
0291               (3, {2.0: u'dd'})]
0292         self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/")
0293         maps = self.sc.sequenceFile(basepath + "/sfmap/").collect()
0294         for v in maps:
0295             self.assertTrue(v, em)
0296 
0297     def test_oldhadoop(self):
0298         basepath = self.tempdir.name
0299         dict_data = [(1, {}),
0300                      (1, {"row1": 1.0}),
0301                      (2, {"row2": 2.0})]
0302         self.sc.parallelize(dict_data).saveAsHadoopFile(
0303             basepath + "/oldhadoop/",
0304             "org.apache.hadoop.mapred.SequenceFileOutputFormat",
0305             "org.apache.hadoop.io.IntWritable",
0306             "org.apache.hadoop.io.MapWritable")
0307         result = self.sc.hadoopFile(
0308             basepath + "/oldhadoop/",
0309             "org.apache.hadoop.mapred.SequenceFileInputFormat",
0310             "org.apache.hadoop.io.IntWritable",
0311             "org.apache.hadoop.io.MapWritable").collect()
0312         for v in result:
0313             self.assertTrue(v, dict_data)
0314 
0315         conf = {
0316             "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
0317             "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0318             "mapreduce.job.output.value.class": "org.apache.hadoop.io.MapWritable",
0319             "mapreduce.output.fileoutputformat.outputdir": basepath + "/olddataset/"
0320         }
0321         self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
0322         input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/olddataset/"}
0323         result = self.sc.hadoopRDD(
0324             "org.apache.hadoop.mapred.SequenceFileInputFormat",
0325             "org.apache.hadoop.io.IntWritable",
0326             "org.apache.hadoop.io.MapWritable",
0327             conf=input_conf).collect()
0328         for v in result:
0329             self.assertTrue(v, dict_data)
0330 
0331     def test_newhadoop(self):
0332         basepath = self.tempdir.name
0333         data = [(1, ""),
0334                 (1, "a"),
0335                 (2, "bcdf")]
0336         self.sc.parallelize(data).saveAsNewAPIHadoopFile(
0337             basepath + "/newhadoop/",
0338             "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0339             "org.apache.hadoop.io.IntWritable",
0340             "org.apache.hadoop.io.Text")
0341         result = sorted(self.sc.newAPIHadoopFile(
0342             basepath + "/newhadoop/",
0343             "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0344             "org.apache.hadoop.io.IntWritable",
0345             "org.apache.hadoop.io.Text").collect())
0346         self.assertEqual(result, data)
0347 
0348         conf = {
0349             "mapreduce.job.outputformat.class":
0350                 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0351             "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0352             "mapreduce.job.output.value.class": "org.apache.hadoop.io.Text",
0353             "mapreduce.output.fileoutputformat.outputdir": basepath + "/newdataset/"
0354         }
0355         self.sc.parallelize(data).saveAsNewAPIHadoopDataset(conf)
0356         input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/newdataset/"}
0357         new_dataset = sorted(self.sc.newAPIHadoopRDD(
0358             "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0359             "org.apache.hadoop.io.IntWritable",
0360             "org.apache.hadoop.io.Text",
0361             conf=input_conf).collect())
0362         self.assertEqual(new_dataset, data)
0363 
0364     @unittest.skipIf(sys.version >= "3", "serialize of array")
0365     def test_newhadoop_with_array(self):
0366         basepath = self.tempdir.name
0367         # use custom ArrayWritable types and converters to handle arrays
0368         array_data = [(1, array('d')),
0369                       (1, array('d', [1.0, 2.0, 3.0])),
0370                       (2, array('d', [3.0, 4.0, 5.0]))]
0371         self.sc.parallelize(array_data).saveAsNewAPIHadoopFile(
0372             basepath + "/newhadoop/",
0373             "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0374             "org.apache.hadoop.io.IntWritable",
0375             "org.apache.spark.api.python.DoubleArrayWritable",
0376             valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
0377         result = sorted(self.sc.newAPIHadoopFile(
0378             basepath + "/newhadoop/",
0379             "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0380             "org.apache.hadoop.io.IntWritable",
0381             "org.apache.spark.api.python.DoubleArrayWritable",
0382             valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
0383         self.assertEqual(result, array_data)
0384 
0385         conf = {
0386             "mapreduce.job.outputformat.class":
0387                 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0388             "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0389             "mapreduce.job.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable",
0390             "mapreduce.output.fileoutputformat.outputdir": basepath + "/newdataset/"
0391         }
0392         self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(
0393             conf,
0394             valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
0395         input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/newdataset/"}
0396         new_dataset = sorted(self.sc.newAPIHadoopRDD(
0397             "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0398             "org.apache.hadoop.io.IntWritable",
0399             "org.apache.spark.api.python.DoubleArrayWritable",
0400             valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter",
0401             conf=input_conf).collect())
0402         self.assertEqual(new_dataset, array_data)
0403 
0404     def test_newolderror(self):
0405         basepath = self.tempdir.name
0406         rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
0407         self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
0408             basepath + "/newolderror/saveAsHadoopFile/",
0409             "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
0410         self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
0411             basepath + "/newolderror/saveAsNewAPIHadoopFile/",
0412             "org.apache.hadoop.mapred.SequenceFileOutputFormat"))
0413 
0414     def test_bad_inputs(self):
0415         basepath = self.tempdir.name
0416         rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
0417         self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
0418             basepath + "/badinputs/saveAsHadoopFile/",
0419             "org.apache.hadoop.mapred.NotValidOutputFormat"))
0420         self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
0421             basepath + "/badinputs/saveAsNewAPIHadoopFile/",
0422             "org.apache.hadoop.mapreduce.lib.output.NotValidOutputFormat"))
0423 
0424     def test_converters(self):
0425         # use of custom converters
0426         basepath = self.tempdir.name
0427         data = [(1, {3.0: u'bb'}),
0428                 (2, {1.0: u'aa'}),
0429                 (3, {2.0: u'dd'})]
0430         self.sc.parallelize(data).saveAsNewAPIHadoopFile(
0431             basepath + "/converters/",
0432             "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0433             keyConverter="org.apache.spark.api.python.TestOutputKeyConverter",
0434             valueConverter="org.apache.spark.api.python.TestOutputValueConverter")
0435         converted = sorted(self.sc.sequenceFile(basepath + "/converters/").collect())
0436         expected = [(u'1', 3.0),
0437                     (u'2', 1.0),
0438                     (u'3', 2.0)]
0439         self.assertEqual(converted, expected)
0440 
0441     def test_reserialization(self):
0442         basepath = self.tempdir.name
0443         x = range(1, 5)
0444         y = range(1001, 1005)
0445         data = list(zip(x, y))
0446         rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y))
0447         rdd.saveAsSequenceFile(basepath + "/reserialize/sequence")
0448         result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
0449         self.assertEqual(result1, data)
0450 
0451         rdd.saveAsHadoopFile(
0452             basepath + "/reserialize/hadoop",
0453             "org.apache.hadoop.mapred.SequenceFileOutputFormat")
0454         result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect())
0455         self.assertEqual(result2, data)
0456 
0457         rdd.saveAsNewAPIHadoopFile(
0458             basepath + "/reserialize/newhadoop",
0459             "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
0460         result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect())
0461         self.assertEqual(result3, data)
0462 
0463         conf4 = {
0464             "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
0465             "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0466             "mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
0467             "mapreduce.output.fileoutputformat.outputdir": basepath + "/reserialize/dataset"}
0468         rdd.saveAsHadoopDataset(conf4)
0469         result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
0470         self.assertEqual(result4, data)
0471 
0472         conf5 = {"mapreduce.job.outputformat.class":
0473                  "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0474                  "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0475                  "mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
0476                  "mapreduce.output.fileoutputformat.outputdir": basepath + "/reserialize/newdataset"
0477                  }
0478         rdd.saveAsNewAPIHadoopDataset(conf5)
0479         result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
0480         self.assertEqual(result5, data)
0481 
0482     def test_malformed_RDD(self):
0483         basepath = self.tempdir.name
0484         # non-batch-serialized RDD[[(K, V)]] should be rejected
0485         data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]]
0486         rdd = self.sc.parallelize(data, len(data))
0487         self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
0488             basepath + "/malformed/sequence"))
0489 
0490 
0491 if __name__ == "__main__":
0492     from pyspark.tests.test_readwrite import *
0493 
0494     try:
0495         import xmlrunner
0496         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0497     except ImportError:
0498         testRunner = None
0499     unittest.main(testRunner=testRunner, verbosity=2)