0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 import os
0018 import shutil
0019 import sys
0020 import tempfile
0021 import unittest
0022 from array import array
0023
0024 from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME
0025
0026
0027 class InputFormatTests(ReusedPySparkTestCase):
0028
0029 @classmethod
0030 def setUpClass(cls):
0031 ReusedPySparkTestCase.setUpClass()
0032 cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
0033 os.unlink(cls.tempdir.name)
0034 cls.sc._jvm.WriteInputFormatTestDataGenerator.generateData(cls.tempdir.name, cls.sc._jsc)
0035
0036 @classmethod
0037 def tearDownClass(cls):
0038 ReusedPySparkTestCase.tearDownClass()
0039 shutil.rmtree(cls.tempdir.name)
0040
0041 @unittest.skipIf(sys.version >= "3", "serialize array of byte")
0042 def test_sequencefiles(self):
0043 basepath = self.tempdir.name
0044 ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/",
0045 "org.apache.hadoop.io.IntWritable",
0046 "org.apache.hadoop.io.Text").collect())
0047 ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
0048 self.assertEqual(ints, ei)
0049
0050 doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/",
0051 "org.apache.hadoop.io.DoubleWritable",
0052 "org.apache.hadoop.io.Text").collect())
0053 ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
0054 self.assertEqual(doubles, ed)
0055
0056 bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
0057 "org.apache.hadoop.io.IntWritable",
0058 "org.apache.hadoop.io.BytesWritable").collect())
0059 ebs = [(1, bytearray('aa', 'utf-8')),
0060 (1, bytearray('aa', 'utf-8')),
0061 (2, bytearray('aa', 'utf-8')),
0062 (2, bytearray('bb', 'utf-8')),
0063 (2, bytearray('bb', 'utf-8')),
0064 (3, bytearray('cc', 'utf-8'))]
0065 self.assertEqual(bytes, ebs)
0066
0067 text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/",
0068 "org.apache.hadoop.io.Text",
0069 "org.apache.hadoop.io.Text").collect())
0070 et = [(u'1', u'aa'),
0071 (u'1', u'aa'),
0072 (u'2', u'aa'),
0073 (u'2', u'bb'),
0074 (u'2', u'bb'),
0075 (u'3', u'cc')]
0076 self.assertEqual(text, et)
0077
0078 bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/",
0079 "org.apache.hadoop.io.IntWritable",
0080 "org.apache.hadoop.io.BooleanWritable").collect())
0081 eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
0082 self.assertEqual(bools, eb)
0083
0084 nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/",
0085 "org.apache.hadoop.io.IntWritable",
0086 "org.apache.hadoop.io.BooleanWritable").collect())
0087 en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
0088 self.assertEqual(nulls, en)
0089
0090 maps = self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
0091 "org.apache.hadoop.io.IntWritable",
0092 "org.apache.hadoop.io.MapWritable").collect()
0093 em = [(1, {}),
0094 (1, {3.0: u'bb'}),
0095 (2, {1.0: u'aa'}),
0096 (2, {1.0: u'cc'}),
0097 (3, {2.0: u'dd'})]
0098 for v in maps:
0099 self.assertTrue(v in em)
0100
0101
0102 tuples = sorted(self.sc.sequenceFile(
0103 basepath + "/sftestdata/sfarray/",
0104 "org.apache.hadoop.io.IntWritable",
0105 "org.apache.spark.api.python.DoubleArrayWritable").collect())
0106 et = [(1, ()),
0107 (2, (3.0, 4.0, 5.0)),
0108 (3, (4.0, 5.0, 6.0))]
0109 self.assertEqual(tuples, et)
0110
0111
0112 arrays = sorted(self.sc.sequenceFile(
0113 basepath + "/sftestdata/sfarray/",
0114 "org.apache.hadoop.io.IntWritable",
0115 "org.apache.spark.api.python.DoubleArrayWritable",
0116 valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
0117 ea = [(1, array('d')),
0118 (2, array('d', [3.0, 4.0, 5.0])),
0119 (3, array('d', [4.0, 5.0, 6.0]))]
0120 self.assertEqual(arrays, ea)
0121
0122 clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
0123 "org.apache.hadoop.io.Text",
0124 "org.apache.spark.api.python.TestWritable").collect())
0125 cname = u'org.apache.spark.api.python.TestWritable'
0126 ec = [(u'1', {u'__class__': cname, u'double': 1.0, u'int': 1, u'str': u'test1'}),
0127 (u'2', {u'__class__': cname, u'double': 2.3, u'int': 2, u'str': u'test2'}),
0128 (u'3', {u'__class__': cname, u'double': 3.1, u'int': 3, u'str': u'test3'}),
0129 (u'4', {u'__class__': cname, u'double': 4.2, u'int': 4, u'str': u'test4'}),
0130 (u'5', {u'__class__': cname, u'double': 5.5, u'int': 5, u'str': u'test56'})]
0131 self.assertEqual(clazz, ec)
0132
0133 unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
0134 "org.apache.hadoop.io.Text",
0135 "org.apache.spark.api.python.TestWritable",
0136 ).collect())
0137 self.assertEqual(unbatched_clazz, ec)
0138
0139 def test_oldhadoop(self):
0140 basepath = self.tempdir.name
0141 ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
0142 "org.apache.hadoop.mapred.SequenceFileInputFormat",
0143 "org.apache.hadoop.io.IntWritable",
0144 "org.apache.hadoop.io.Text").collect())
0145 ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
0146 self.assertEqual(ints, ei)
0147
0148 hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
0149 oldconf = {"mapreduce.input.fileinputformat.inputdir": hellopath}
0150 hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
0151 "org.apache.hadoop.io.LongWritable",
0152 "org.apache.hadoop.io.Text",
0153 conf=oldconf).collect()
0154 result = [(0, u'Hello World!')]
0155 self.assertEqual(hello, result)
0156
0157 def test_newhadoop(self):
0158 basepath = self.tempdir.name
0159 ints = sorted(self.sc.newAPIHadoopFile(
0160 basepath + "/sftestdata/sfint/",
0161 "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0162 "org.apache.hadoop.io.IntWritable",
0163 "org.apache.hadoop.io.Text").collect())
0164 ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
0165 self.assertEqual(ints, ei)
0166
0167 hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
0168 newconf = {"mapreduce.input.fileinputformat.inputdir": hellopath}
0169 hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
0170 "org.apache.hadoop.io.LongWritable",
0171 "org.apache.hadoop.io.Text",
0172 conf=newconf).collect()
0173 result = [(0, u'Hello World!')]
0174 self.assertEqual(hello, result)
0175
0176 def test_newolderror(self):
0177 basepath = self.tempdir.name
0178 self.assertRaises(Exception, lambda: self.sc.hadoopFile(
0179 basepath + "/sftestdata/sfint/",
0180 "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0181 "org.apache.hadoop.io.IntWritable",
0182 "org.apache.hadoop.io.Text"))
0183
0184 self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile(
0185 basepath + "/sftestdata/sfint/",
0186 "org.apache.hadoop.mapred.SequenceFileInputFormat",
0187 "org.apache.hadoop.io.IntWritable",
0188 "org.apache.hadoop.io.Text"))
0189
0190 def test_bad_inputs(self):
0191 basepath = self.tempdir.name
0192 self.assertRaises(Exception, lambda: self.sc.sequenceFile(
0193 basepath + "/sftestdata/sfint/",
0194 "org.apache.hadoop.io.NotValidWritable",
0195 "org.apache.hadoop.io.Text"))
0196 self.assertRaises(Exception, lambda: self.sc.hadoopFile(
0197 basepath + "/sftestdata/sfint/",
0198 "org.apache.hadoop.mapred.NotValidInputFormat",
0199 "org.apache.hadoop.io.IntWritable",
0200 "org.apache.hadoop.io.Text"))
0201 self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile(
0202 basepath + "/sftestdata/sfint/",
0203 "org.apache.hadoop.mapreduce.lib.input.NotValidInputFormat",
0204 "org.apache.hadoop.io.IntWritable",
0205 "org.apache.hadoop.io.Text"))
0206
0207 def test_converters(self):
0208
0209 basepath = self.tempdir.name
0210 maps = sorted(self.sc.sequenceFile(
0211 basepath + "/sftestdata/sfmap/",
0212 "org.apache.hadoop.io.IntWritable",
0213 "org.apache.hadoop.io.MapWritable",
0214 keyConverter="org.apache.spark.api.python.TestInputKeyConverter",
0215 valueConverter="org.apache.spark.api.python.TestInputValueConverter").collect())
0216 em = [(u'\x01', []),
0217 (u'\x01', [3.0]),
0218 (u'\x02', [1.0]),
0219 (u'\x02', [1.0]),
0220 (u'\x03', [2.0])]
0221 self.assertEqual(maps, em)
0222
0223 def test_binary_files(self):
0224 path = os.path.join(self.tempdir.name, "binaryfiles")
0225 os.mkdir(path)
0226 data = b"short binary data"
0227 with open(os.path.join(path, "part-0000"), 'wb') as f:
0228 f.write(data)
0229 [(p, d)] = self.sc.binaryFiles(path).collect()
0230 self.assertTrue(p.endswith("part-0000"))
0231 self.assertEqual(d, data)
0232
0233 def test_binary_records(self):
0234 path = os.path.join(self.tempdir.name, "binaryrecords")
0235 os.mkdir(path)
0236 with open(os.path.join(path, "part-0000"), 'w') as f:
0237 for i in range(100):
0238 f.write('%04d' % i)
0239 result = self.sc.binaryRecords(path, 4).map(int).collect()
0240 self.assertEqual(list(range(100)), result)
0241
0242
0243 class OutputFormatTests(ReusedPySparkTestCase):
0244
0245 def setUp(self):
0246 self.tempdir = tempfile.NamedTemporaryFile(delete=False)
0247 os.unlink(self.tempdir.name)
0248
0249 def tearDown(self):
0250 shutil.rmtree(self.tempdir.name, ignore_errors=True)
0251
0252 @unittest.skipIf(sys.version >= "3", "serialize array of byte")
0253 def test_sequencefiles(self):
0254 basepath = self.tempdir.name
0255 ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
0256 self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/")
0257 ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect())
0258 self.assertEqual(ints, ei)
0259
0260 ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
0261 self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/")
0262 doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect())
0263 self.assertEqual(doubles, ed)
0264
0265 ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))]
0266 self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/")
0267 bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect())
0268 self.assertEqual(bytes, ebs)
0269
0270 et = [(u'1', u'aa'),
0271 (u'2', u'bb'),
0272 (u'3', u'cc')]
0273 self.sc.parallelize(et).saveAsSequenceFile(basepath + "/sftext/")
0274 text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect())
0275 self.assertEqual(text, et)
0276
0277 eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
0278 self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/")
0279 bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect())
0280 self.assertEqual(bools, eb)
0281
0282 en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
0283 self.sc.parallelize(en).saveAsSequenceFile(basepath + "/sfnull/")
0284 nulls = sorted(self.sc.sequenceFile(basepath + "/sfnull/").collect())
0285 self.assertEqual(nulls, en)
0286
0287 em = [(1, {}),
0288 (1, {3.0: u'bb'}),
0289 (2, {1.0: u'aa'}),
0290 (2, {1.0: u'cc'}),
0291 (3, {2.0: u'dd'})]
0292 self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/")
0293 maps = self.sc.sequenceFile(basepath + "/sfmap/").collect()
0294 for v in maps:
0295 self.assertTrue(v, em)
0296
0297 def test_oldhadoop(self):
0298 basepath = self.tempdir.name
0299 dict_data = [(1, {}),
0300 (1, {"row1": 1.0}),
0301 (2, {"row2": 2.0})]
0302 self.sc.parallelize(dict_data).saveAsHadoopFile(
0303 basepath + "/oldhadoop/",
0304 "org.apache.hadoop.mapred.SequenceFileOutputFormat",
0305 "org.apache.hadoop.io.IntWritable",
0306 "org.apache.hadoop.io.MapWritable")
0307 result = self.sc.hadoopFile(
0308 basepath + "/oldhadoop/",
0309 "org.apache.hadoop.mapred.SequenceFileInputFormat",
0310 "org.apache.hadoop.io.IntWritable",
0311 "org.apache.hadoop.io.MapWritable").collect()
0312 for v in result:
0313 self.assertTrue(v, dict_data)
0314
0315 conf = {
0316 "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
0317 "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0318 "mapreduce.job.output.value.class": "org.apache.hadoop.io.MapWritable",
0319 "mapreduce.output.fileoutputformat.outputdir": basepath + "/olddataset/"
0320 }
0321 self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
0322 input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/olddataset/"}
0323 result = self.sc.hadoopRDD(
0324 "org.apache.hadoop.mapred.SequenceFileInputFormat",
0325 "org.apache.hadoop.io.IntWritable",
0326 "org.apache.hadoop.io.MapWritable",
0327 conf=input_conf).collect()
0328 for v in result:
0329 self.assertTrue(v, dict_data)
0330
0331 def test_newhadoop(self):
0332 basepath = self.tempdir.name
0333 data = [(1, ""),
0334 (1, "a"),
0335 (2, "bcdf")]
0336 self.sc.parallelize(data).saveAsNewAPIHadoopFile(
0337 basepath + "/newhadoop/",
0338 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0339 "org.apache.hadoop.io.IntWritable",
0340 "org.apache.hadoop.io.Text")
0341 result = sorted(self.sc.newAPIHadoopFile(
0342 basepath + "/newhadoop/",
0343 "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0344 "org.apache.hadoop.io.IntWritable",
0345 "org.apache.hadoop.io.Text").collect())
0346 self.assertEqual(result, data)
0347
0348 conf = {
0349 "mapreduce.job.outputformat.class":
0350 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0351 "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0352 "mapreduce.job.output.value.class": "org.apache.hadoop.io.Text",
0353 "mapreduce.output.fileoutputformat.outputdir": basepath + "/newdataset/"
0354 }
0355 self.sc.parallelize(data).saveAsNewAPIHadoopDataset(conf)
0356 input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/newdataset/"}
0357 new_dataset = sorted(self.sc.newAPIHadoopRDD(
0358 "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0359 "org.apache.hadoop.io.IntWritable",
0360 "org.apache.hadoop.io.Text",
0361 conf=input_conf).collect())
0362 self.assertEqual(new_dataset, data)
0363
0364 @unittest.skipIf(sys.version >= "3", "serialize of array")
0365 def test_newhadoop_with_array(self):
0366 basepath = self.tempdir.name
0367
0368 array_data = [(1, array('d')),
0369 (1, array('d', [1.0, 2.0, 3.0])),
0370 (2, array('d', [3.0, 4.0, 5.0]))]
0371 self.sc.parallelize(array_data).saveAsNewAPIHadoopFile(
0372 basepath + "/newhadoop/",
0373 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0374 "org.apache.hadoop.io.IntWritable",
0375 "org.apache.spark.api.python.DoubleArrayWritable",
0376 valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
0377 result = sorted(self.sc.newAPIHadoopFile(
0378 basepath + "/newhadoop/",
0379 "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0380 "org.apache.hadoop.io.IntWritable",
0381 "org.apache.spark.api.python.DoubleArrayWritable",
0382 valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
0383 self.assertEqual(result, array_data)
0384
0385 conf = {
0386 "mapreduce.job.outputformat.class":
0387 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0388 "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0389 "mapreduce.job.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable",
0390 "mapreduce.output.fileoutputformat.outputdir": basepath + "/newdataset/"
0391 }
0392 self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(
0393 conf,
0394 valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
0395 input_conf = {"mapreduce.input.fileinputformat.inputdir": basepath + "/newdataset/"}
0396 new_dataset = sorted(self.sc.newAPIHadoopRDD(
0397 "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
0398 "org.apache.hadoop.io.IntWritable",
0399 "org.apache.spark.api.python.DoubleArrayWritable",
0400 valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter",
0401 conf=input_conf).collect())
0402 self.assertEqual(new_dataset, array_data)
0403
0404 def test_newolderror(self):
0405 basepath = self.tempdir.name
0406 rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
0407 self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
0408 basepath + "/newolderror/saveAsHadoopFile/",
0409 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
0410 self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
0411 basepath + "/newolderror/saveAsNewAPIHadoopFile/",
0412 "org.apache.hadoop.mapred.SequenceFileOutputFormat"))
0413
0414 def test_bad_inputs(self):
0415 basepath = self.tempdir.name
0416 rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
0417 self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
0418 basepath + "/badinputs/saveAsHadoopFile/",
0419 "org.apache.hadoop.mapred.NotValidOutputFormat"))
0420 self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
0421 basepath + "/badinputs/saveAsNewAPIHadoopFile/",
0422 "org.apache.hadoop.mapreduce.lib.output.NotValidOutputFormat"))
0423
0424 def test_converters(self):
0425
0426 basepath = self.tempdir.name
0427 data = [(1, {3.0: u'bb'}),
0428 (2, {1.0: u'aa'}),
0429 (3, {2.0: u'dd'})]
0430 self.sc.parallelize(data).saveAsNewAPIHadoopFile(
0431 basepath + "/converters/",
0432 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0433 keyConverter="org.apache.spark.api.python.TestOutputKeyConverter",
0434 valueConverter="org.apache.spark.api.python.TestOutputValueConverter")
0435 converted = sorted(self.sc.sequenceFile(basepath + "/converters/").collect())
0436 expected = [(u'1', 3.0),
0437 (u'2', 1.0),
0438 (u'3', 2.0)]
0439 self.assertEqual(converted, expected)
0440
0441 def test_reserialization(self):
0442 basepath = self.tempdir.name
0443 x = range(1, 5)
0444 y = range(1001, 1005)
0445 data = list(zip(x, y))
0446 rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y))
0447 rdd.saveAsSequenceFile(basepath + "/reserialize/sequence")
0448 result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
0449 self.assertEqual(result1, data)
0450
0451 rdd.saveAsHadoopFile(
0452 basepath + "/reserialize/hadoop",
0453 "org.apache.hadoop.mapred.SequenceFileOutputFormat")
0454 result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect())
0455 self.assertEqual(result2, data)
0456
0457 rdd.saveAsNewAPIHadoopFile(
0458 basepath + "/reserialize/newhadoop",
0459 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
0460 result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect())
0461 self.assertEqual(result3, data)
0462
0463 conf4 = {
0464 "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
0465 "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0466 "mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
0467 "mapreduce.output.fileoutputformat.outputdir": basepath + "/reserialize/dataset"}
0468 rdd.saveAsHadoopDataset(conf4)
0469 result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
0470 self.assertEqual(result4, data)
0471
0472 conf5 = {"mapreduce.job.outputformat.class":
0473 "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
0474 "mapreduce.job.output.key.class": "org.apache.hadoop.io.IntWritable",
0475 "mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
0476 "mapreduce.output.fileoutputformat.outputdir": basepath + "/reserialize/newdataset"
0477 }
0478 rdd.saveAsNewAPIHadoopDataset(conf5)
0479 result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
0480 self.assertEqual(result5, data)
0481
0482 def test_malformed_RDD(self):
0483 basepath = self.tempdir.name
0484
0485 data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]]
0486 rdd = self.sc.parallelize(data, len(data))
0487 self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
0488 basepath + "/malformed/sequence"))
0489
0490
0491 if __name__ == "__main__":
0492 from pyspark.tests.test_readwrite import *
0493
0494 try:
0495 import xmlrunner
0496 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0497 except ImportError:
0498 testRunner = None
0499 unittest.main(testRunner=testRunner, verbosity=2)