0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark.sql;
0019
0020 import java.io.Serializable;
0021 import java.math.BigDecimal;
0022 import java.sql.Date;
0023 import java.sql.Timestamp;
0024 import java.time.Instant;
0025 import java.time.LocalDate;
0026 import java.util.*;
0027
0028 import org.apache.spark.sql.streaming.GroupStateTimeout;
0029 import org.apache.spark.sql.streaming.OutputMode;
0030 import scala.Tuple2;
0031 import scala.Tuple3;
0032 import scala.Tuple4;
0033 import scala.Tuple5;
0034
0035 import com.google.common.base.Objects;
0036 import org.junit.*;
0037 import org.junit.rules.ExpectedException;
0038
0039 import org.apache.spark.api.java.JavaPairRDD;
0040 import org.apache.spark.api.java.JavaSparkContext;
0041 import org.apache.spark.api.java.function.*;
0042 import org.apache.spark.sql.*;
0043 import org.apache.spark.sql.catalyst.encoders.OuterScopes;
0044 import org.apache.spark.sql.catalyst.expressions.GenericRow;
0045 import org.apache.spark.sql.test.TestSparkSession;
0046 import org.apache.spark.sql.types.StructType;
0047 import org.apache.spark.util.LongAccumulator;
0048 import static org.apache.spark.sql.functions.col;
0049 import static org.apache.spark.sql.functions.expr;
0050 import static org.apache.spark.sql.types.DataTypes.*;
0051
0052 public class JavaDatasetSuite implements Serializable {
0053 private transient TestSparkSession spark;
0054 private transient JavaSparkContext jsc;
0055
0056 @Before
0057 public void setUp() {
0058
0059 spark = new TestSparkSession();
0060 jsc = new JavaSparkContext(spark.sparkContext());
0061 spark.loadTestData();
0062 }
0063
0064 @After
0065 public void tearDown() {
0066 spark.stop();
0067 spark = null;
0068 }
0069
0070 private <T1, T2> Tuple2<T1, T2> tuple2(T1 t1, T2 t2) {
0071 return new Tuple2<>(t1, t2);
0072 }
0073
0074 @Test
0075 public void testCollect() {
0076 List<String> data = Arrays.asList("hello", "world");
0077 Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0078 List<String> collected = ds.collectAsList();
0079 Assert.assertEquals(Arrays.asList("hello", "world"), collected);
0080 }
0081
0082 @Test
0083 public void testTake() {
0084 List<String> data = Arrays.asList("hello", "world");
0085 Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0086 List<String> collected = ds.takeAsList(1);
0087 Assert.assertEquals(Arrays.asList("hello"), collected);
0088 }
0089
0090 @Test
0091 public void testToLocalIterator() {
0092 List<String> data = Arrays.asList("hello", "world");
0093 Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0094 Iterator<String> iter = ds.toLocalIterator();
0095 Assert.assertEquals("hello", iter.next());
0096 Assert.assertEquals("world", iter.next());
0097 Assert.assertFalse(iter.hasNext());
0098 }
0099
0100
0101 @Test
0102 public void testTypedFilterPreservingSchema() {
0103 Dataset<Long> ds = spark.range(10);
0104 Dataset<Long> ds2 = ds.filter((FilterFunction<Long>) value -> value > 3);
0105 Assert.assertEquals(ds.schema(), ds2.schema());
0106 }
0107
0108 @Test
0109 public void testCommonOperation() {
0110 List<String> data = Arrays.asList("hello", "world");
0111 Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0112 Assert.assertEquals("hello", ds.first());
0113
0114 Dataset<String> filtered = ds.filter((FilterFunction<String>) v -> v.startsWith("h"));
0115 Assert.assertEquals(Arrays.asList("hello"), filtered.collectAsList());
0116
0117
0118 Dataset<Integer> mapped =
0119 ds.map((MapFunction<String, Integer>) String::length, Encoders.INT());
0120 Assert.assertEquals(Arrays.asList(5, 5), mapped.collectAsList());
0121
0122 Dataset<String> parMapped = ds.mapPartitions((MapPartitionsFunction<String, String>) it -> {
0123 List<String> ls = new LinkedList<>();
0124 while (it.hasNext()) {
0125 ls.add(it.next().toUpperCase(Locale.ROOT));
0126 }
0127 return ls.iterator();
0128 }, Encoders.STRING());
0129 Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList());
0130
0131 Dataset<String> flatMapped = ds.flatMap((FlatMapFunction<String, String>) s -> {
0132 List<String> ls = new LinkedList<>();
0133 for (char c : s.toCharArray()) {
0134 ls.add(String.valueOf(c));
0135 }
0136 return ls.iterator();
0137 }, Encoders.STRING());
0138 Assert.assertEquals(
0139 Arrays.asList("h", "e", "l", "l", "o", "w", "o", "r", "l", "d"),
0140 flatMapped.collectAsList());
0141 }
0142
0143 @Test
0144 public void testForeach() {
0145 LongAccumulator accum = jsc.sc().longAccumulator();
0146 List<String> data = Arrays.asList("a", "b", "c");
0147 Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0148
0149 ds.foreach((ForeachFunction<String>) s -> accum.add(1));
0150 Assert.assertEquals(3, accum.value().intValue());
0151 }
0152
0153 @Test
0154 public void testReduce() {
0155 List<Integer> data = Arrays.asList(1, 2, 3);
0156 Dataset<Integer> ds = spark.createDataset(data, Encoders.INT());
0157
0158 int reduced = ds.reduce((ReduceFunction<Integer>) (v1, v2) -> v1 + v2);
0159 Assert.assertEquals(6, reduced);
0160 }
0161
0162 @Test
0163 public void testGroupBy() {
0164 List<String> data = Arrays.asList("a", "foo", "bar");
0165 Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0166 KeyValueGroupedDataset<Integer, String> grouped =
0167 ds.groupByKey((MapFunction<String, Integer>) String::length, Encoders.INT());
0168
0169 Dataset<String> mapped = grouped.mapGroups(
0170 (MapGroupsFunction<Integer, String, String>) (key, values) -> {
0171 StringBuilder sb = new StringBuilder(key.toString());
0172 while (values.hasNext()) {
0173 sb.append(values.next());
0174 }
0175 return sb.toString();
0176 }, Encoders.STRING());
0177
0178 Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped.collectAsList()));
0179
0180 Dataset<String> flatMapped = grouped.flatMapGroups(
0181 (FlatMapGroupsFunction<Integer, String, String>) (key, values) -> {
0182 StringBuilder sb = new StringBuilder(key.toString());
0183 while (values.hasNext()) {
0184 sb.append(values.next());
0185 }
0186 return Collections.singletonList(sb.toString()).iterator();
0187 },
0188 Encoders.STRING());
0189
0190 Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped.collectAsList()));
0191
0192 Dataset<String> mapped2 = grouped.mapGroupsWithState(
0193 (MapGroupsWithStateFunction<Integer, String, Long, String>) (key, values, s) -> {
0194 StringBuilder sb = new StringBuilder(key.toString());
0195 while (values.hasNext()) {
0196 sb.append(values.next());
0197 }
0198 return sb.toString();
0199 },
0200 Encoders.LONG(),
0201 Encoders.STRING());
0202
0203 Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped2.collectAsList()));
0204
0205 Dataset<String> flatMapped2 = grouped.flatMapGroupsWithState(
0206 (FlatMapGroupsWithStateFunction<Integer, String, Long, String>) (key, values, s) -> {
0207 StringBuilder sb = new StringBuilder(key.toString());
0208 while (values.hasNext()) {
0209 sb.append(values.next());
0210 }
0211 return Collections.singletonList(sb.toString()).iterator();
0212 },
0213 OutputMode.Append(),
0214 Encoders.LONG(),
0215 Encoders.STRING(),
0216 GroupStateTimeout.NoTimeout());
0217
0218 Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped2.collectAsList()));
0219
0220 Dataset<Tuple2<Integer, String>> reduced =
0221 grouped.reduceGroups((ReduceFunction<String>) (v1, v2) -> v1 + v2);
0222
0223 Assert.assertEquals(
0224 asSet(tuple2(1, "a"), tuple2(3, "foobar")),
0225 toSet(reduced.collectAsList()));
0226
0227 List<Integer> data2 = Arrays.asList(2, 6, 10);
0228 Dataset<Integer> ds2 = spark.createDataset(data2, Encoders.INT());
0229 KeyValueGroupedDataset<Integer, Integer> grouped2 = ds2.groupByKey(
0230 (MapFunction<Integer, Integer>) v -> v / 2,
0231 Encoders.INT());
0232
0233 Dataset<String> cogrouped = grouped.cogroup(
0234 grouped2,
0235 (CoGroupFunction<Integer, String, Integer, String>) (key, left, right) -> {
0236 StringBuilder sb = new StringBuilder(key.toString());
0237 while (left.hasNext()) {
0238 sb.append(left.next());
0239 }
0240 sb.append("#");
0241 while (right.hasNext()) {
0242 sb.append(right.next());
0243 }
0244 return Collections.singletonList(sb.toString()).iterator();
0245 },
0246 Encoders.STRING());
0247
0248 Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"), toSet(cogrouped.collectAsList()));
0249 }
0250
0251 @Test
0252 public void testSelect() {
0253 List<Integer> data = Arrays.asList(2, 6);
0254 Dataset<Integer> ds = spark.createDataset(data, Encoders.INT());
0255
0256 Dataset<Tuple2<Integer, String>> selected = ds.select(
0257 expr("value + 1"),
0258 col("value").cast("string")).as(Encoders.tuple(Encoders.INT(), Encoders.STRING()));
0259
0260 Assert.assertEquals(
0261 Arrays.asList(tuple2(3, "2"), tuple2(7, "6")),
0262 selected.collectAsList());
0263 }
0264
0265 @Test
0266 public void testSetOperation() {
0267 List<String> data = Arrays.asList("abc", "abc", "xyz");
0268 Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0269
0270 Assert.assertEquals(asSet("abc", "xyz"), toSet(ds.distinct().collectAsList()));
0271
0272 List<String> data2 = Arrays.asList("xyz", "foo", "foo");
0273 Dataset<String> ds2 = spark.createDataset(data2, Encoders.STRING());
0274
0275 Dataset<String> intersected = ds.intersect(ds2);
0276 Assert.assertEquals(Arrays.asList("xyz"), intersected.collectAsList());
0277
0278 Dataset<String> unioned = ds.union(ds2).union(ds);
0279 Assert.assertEquals(
0280 Arrays.asList("abc", "abc", "xyz", "xyz", "foo", "foo", "abc", "abc", "xyz"),
0281 unioned.collectAsList());
0282
0283 Dataset<String> subtracted = ds.except(ds2);
0284 Assert.assertEquals(Arrays.asList("abc"), subtracted.collectAsList());
0285 }
0286
0287 private static <T> Set<T> toSet(List<T> records) {
0288 return new HashSet<>(records);
0289 }
0290
0291 @SafeVarargs
0292 @SuppressWarnings("varargs")
0293 private static <T> Set<T> asSet(T... records) {
0294 return toSet(Arrays.asList(records));
0295 }
0296
0297 @Test
0298 public void testJoin() {
0299 List<Integer> data = Arrays.asList(1, 2, 3);
0300 Dataset<Integer> ds = spark.createDataset(data, Encoders.INT()).as("a");
0301 List<Integer> data2 = Arrays.asList(2, 3, 4);
0302 Dataset<Integer> ds2 = spark.createDataset(data2, Encoders.INT()).as("b");
0303
0304 Dataset<Tuple2<Integer, Integer>> joined =
0305 ds.joinWith(ds2, col("a.value").equalTo(col("b.value")));
0306 Assert.assertEquals(
0307 Arrays.asList(tuple2(2, 2), tuple2(3, 3)),
0308 joined.collectAsList());
0309 }
0310
0311 @Test
0312 public void testTupleEncoder() {
0313 Encoder<Tuple2<Integer, String>> encoder2 = Encoders.tuple(Encoders.INT(), Encoders.STRING());
0314 List<Tuple2<Integer, String>> data2 = Arrays.asList(tuple2(1, "a"), tuple2(2, "b"));
0315 Dataset<Tuple2<Integer, String>> ds2 = spark.createDataset(data2, encoder2);
0316 Assert.assertEquals(data2, ds2.collectAsList());
0317
0318 Encoder<Tuple3<Integer, Long, String>> encoder3 =
0319 Encoders.tuple(Encoders.INT(), Encoders.LONG(), Encoders.STRING());
0320 List<Tuple3<Integer, Long, String>> data3 =
0321 Arrays.asList(new Tuple3<>(1, 2L, "a"));
0322 Dataset<Tuple3<Integer, Long, String>> ds3 = spark.createDataset(data3, encoder3);
0323 Assert.assertEquals(data3, ds3.collectAsList());
0324
0325 Encoder<Tuple4<Integer, String, Long, String>> encoder4 =
0326 Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING());
0327 List<Tuple4<Integer, String, Long, String>> data4 =
0328 Arrays.asList(new Tuple4<>(1, "b", 2L, "a"));
0329 Dataset<Tuple4<Integer, String, Long, String>> ds4 = spark.createDataset(data4, encoder4);
0330 Assert.assertEquals(data4, ds4.collectAsList());
0331
0332 Encoder<Tuple5<Integer, String, Long, String, Boolean>> encoder5 =
0333 Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING(),
0334 Encoders.BOOLEAN());
0335 List<Tuple5<Integer, String, Long, String, Boolean>> data5 =
0336 Arrays.asList(new Tuple5<>(1, "b", 2L, "a", true));
0337 Dataset<Tuple5<Integer, String, Long, String, Boolean>> ds5 =
0338 spark.createDataset(data5, encoder5);
0339 Assert.assertEquals(data5, ds5.collectAsList());
0340 }
0341
0342 @Test
0343 public void testTupleEncoderSchema() {
0344 Encoder<Tuple2<String, Tuple2<String,String>>> encoder =
0345 Encoders.tuple(Encoders.STRING(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
0346 List<Tuple2<String, Tuple2<String, String>>> data = Arrays.asList(tuple2("1", tuple2("a", "b")),
0347 tuple2("2", tuple2("c", "d")));
0348 Dataset<Row> ds1 = spark.createDataset(data, encoder).toDF("value1", "value2");
0349
0350 JavaPairRDD<String, Tuple2<String, String>> pairRDD = jsc.parallelizePairs(data);
0351 Dataset<Row> ds2 = spark.createDataset(JavaPairRDD.toRDD(pairRDD), encoder)
0352 .toDF("value1", "value2");
0353
0354 Assert.assertEquals(ds1.schema(), ds2.schema());
0355 Assert.assertEquals(ds1.select(expr("value2._1")).collectAsList(),
0356 ds2.select(expr("value2._1")).collectAsList());
0357 }
0358
0359 @Test
0360 public void testNestedTupleEncoder() {
0361
0362 Encoder<Tuple2<Tuple2<Integer, String>, String>> encoder =
0363 Encoders.tuple(Encoders.tuple(Encoders.INT(), Encoders.STRING()), Encoders.STRING());
0364 List<Tuple2<Tuple2<Integer, String>, String>> data =
0365 Arrays.asList(tuple2(tuple2(1, "a"), "a"), tuple2(tuple2(2, "b"), "b"));
0366 Dataset<Tuple2<Tuple2<Integer, String>, String>> ds = spark.createDataset(data, encoder);
0367 Assert.assertEquals(data, ds.collectAsList());
0368
0369
0370 Encoder<Tuple2<Integer, Tuple3<String, String, Long>>> encoder2 =
0371 Encoders.tuple(Encoders.INT(),
0372 Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.LONG()));
0373 List<Tuple2<Integer, Tuple3<String, String, Long>>> data2 =
0374 Arrays.asList(tuple2(1, new Tuple3<>("a", "b", 3L)));
0375 Dataset<Tuple2<Integer, Tuple3<String, String, Long>>> ds2 =
0376 spark.createDataset(data2, encoder2);
0377 Assert.assertEquals(data2, ds2.collectAsList());
0378
0379
0380 Encoder<Tuple2<Integer, Tuple2<Tuple2<String, Long>, String>>> encoder3 =
0381 Encoders.tuple(Encoders.INT(),
0382 Encoders.tuple(Encoders.tuple(Encoders.STRING(), Encoders.LONG()), Encoders.STRING()));
0383 List<Tuple2<Integer, Tuple2<Tuple2<String, Long>, String>>> data3 =
0384 Arrays.asList(tuple2(1, tuple2(tuple2("a", 2L), "b")));
0385 Dataset<Tuple2<Integer, Tuple2<Tuple2<String, Long>, String>>> ds3 =
0386 spark.createDataset(data3, encoder3);
0387 Assert.assertEquals(data3, ds3.collectAsList());
0388 }
0389
0390 @Test
0391 public void testPrimitiveEncoder() {
0392 Encoder<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> encoder =
0393 Encoders.tuple(Encoders.DOUBLE(), Encoders.DECIMAL(), Encoders.DATE(), Encoders.TIMESTAMP(),
0394 Encoders.FLOAT());
0395 List<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> data =
0396 Arrays.asList(new Tuple5<>(
0397 1.7976931348623157E308, new BigDecimal("0.922337203685477589"),
0398 Date.valueOf("1970-01-01"), new Timestamp(System.currentTimeMillis()), Float.MAX_VALUE));
0399 Dataset<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> ds =
0400 spark.createDataset(data, encoder);
0401 Assert.assertEquals(data, ds.collectAsList());
0402 }
0403
0404 @Test
0405 public void testLocalDateAndInstantEncoders() {
0406 Encoder<Tuple2<LocalDate, Instant>> encoder =
0407 Encoders.tuple(Encoders.LOCALDATE(), Encoders.INSTANT());
0408 List<Tuple2<LocalDate, Instant>> data =
0409 Arrays.asList(new Tuple2<>(LocalDate.ofEpochDay(0), Instant.ofEpochSecond(0)));
0410 Dataset<Tuple2<LocalDate, Instant>> ds = spark.createDataset(data, encoder);
0411 Assert.assertEquals(data, ds.collectAsList());
0412 }
0413
0414 public static class KryoSerializable {
0415 String value;
0416
0417 KryoSerializable(String value) {
0418 this.value = value;
0419 }
0420
0421 @Override
0422 public boolean equals(Object other) {
0423 if (this == other) return true;
0424 if (other == null || getClass() != other.getClass()) return false;
0425
0426 return this.value.equals(((KryoSerializable) other).value);
0427 }
0428
0429 @Override
0430 public int hashCode() {
0431 return this.value.hashCode();
0432 }
0433 }
0434
0435 public static class JavaSerializable implements Serializable {
0436 String value;
0437
0438 JavaSerializable(String value) {
0439 this.value = value;
0440 }
0441
0442 @Override
0443 public boolean equals(Object other) {
0444 if (this == other) return true;
0445 if (other == null || getClass() != other.getClass()) return false;
0446
0447 return this.value.equals(((JavaSerializable) other).value);
0448 }
0449
0450 @Override
0451 public int hashCode() {
0452 return this.value.hashCode();
0453 }
0454 }
0455
0456 @Test
0457 public void testKryoEncoder() {
0458 Encoder<KryoSerializable> encoder = Encoders.kryo(KryoSerializable.class);
0459 List<KryoSerializable> data = Arrays.asList(
0460 new KryoSerializable("hello"), new KryoSerializable("world"));
0461 Dataset<KryoSerializable> ds = spark.createDataset(data, encoder);
0462 Assert.assertEquals(data, ds.collectAsList());
0463 }
0464
0465 @Test
0466 public void testJavaEncoder() {
0467 Encoder<JavaSerializable> encoder = Encoders.javaSerialization(JavaSerializable.class);
0468 List<JavaSerializable> data = Arrays.asList(
0469 new JavaSerializable("hello"), new JavaSerializable("world"));
0470 Dataset<JavaSerializable> ds = spark.createDataset(data, encoder);
0471 Assert.assertEquals(data, ds.collectAsList());
0472 }
0473
0474 @Test
0475 public void testRandomSplit() {
0476 List<String> data = Arrays.asList("hello", "world", "from", "spark");
0477 Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0478 double[] arraySplit = {1, 2, 3};
0479
0480 List<Dataset<String>> randomSplit = ds.randomSplitAsList(arraySplit, 1);
0481 Assert.assertEquals("wrong number of splits", randomSplit.size(), 3);
0482 }
0483
0484
0485
0486
0487
0488 private static class PrivateClassTest { }
0489
0490 @Test(expected = UnsupportedOperationException.class)
0491 public void testJavaEncoderErrorMessageForPrivateClass() {
0492 Encoders.javaSerialization(PrivateClassTest.class);
0493 }
0494
0495 @Test(expected = UnsupportedOperationException.class)
0496 public void testKryoEncoderErrorMessageForPrivateClass() {
0497 Encoders.kryo(PrivateClassTest.class);
0498 }
0499
0500 public static class SimpleJavaBean implements Serializable {
0501 private boolean a;
0502 private int b;
0503 private byte[] c;
0504 private String[] d;
0505 private List<String> e;
0506 private List<Long> f;
0507 private Map<Integer, String> g;
0508 private Map<List<Long>, Map<String, String>> h;
0509
0510 public boolean isA() {
0511 return a;
0512 }
0513
0514 public void setA(boolean a) {
0515 this.a = a;
0516 }
0517
0518 public int getB() {
0519 return b;
0520 }
0521
0522 public void setB(int b) {
0523 this.b = b;
0524 }
0525
0526 public byte[] getC() {
0527 return c;
0528 }
0529
0530 public void setC(byte[] c) {
0531 this.c = c;
0532 }
0533
0534 public String[] getD() {
0535 return d;
0536 }
0537
0538 public void setD(String[] d) {
0539 this.d = d;
0540 }
0541
0542 public List<String> getE() {
0543 return e;
0544 }
0545
0546 public void setE(List<String> e) {
0547 this.e = e;
0548 }
0549
0550 public List<Long> getF() {
0551 return f;
0552 }
0553
0554 public void setF(List<Long> f) {
0555 this.f = f;
0556 }
0557
0558 public Map<Integer, String> getG() {
0559 return g;
0560 }
0561
0562 public void setG(Map<Integer, String> g) {
0563 this.g = g;
0564 }
0565
0566 public Map<List<Long>, Map<String, String>> getH() {
0567 return h;
0568 }
0569
0570 public void setH(Map<List<Long>, Map<String, String>> h) {
0571 this.h = h;
0572 }
0573
0574 @Override
0575 public boolean equals(Object o) {
0576 if (this == o) return true;
0577 if (o == null || getClass() != o.getClass()) return false;
0578
0579 SimpleJavaBean that = (SimpleJavaBean) o;
0580
0581 if (a != that.a) return false;
0582 if (b != that.b) return false;
0583 if (!Arrays.equals(c, that.c)) return false;
0584 if (!Arrays.equals(d, that.d)) return false;
0585 if (!e.equals(that.e)) return false;
0586 if (!f.equals(that.f)) return false;
0587 if (!g.equals(that.g)) return false;
0588 return h.equals(that.h);
0589
0590 }
0591
0592 @Override
0593 public int hashCode() {
0594 int result = (a ? 1 : 0);
0595 result = 31 * result + b;
0596 result = 31 * result + Arrays.hashCode(c);
0597 result = 31 * result + Arrays.hashCode(d);
0598 result = 31 * result + e.hashCode();
0599 result = 31 * result + f.hashCode();
0600 result = 31 * result + g.hashCode();
0601 result = 31 * result + h.hashCode();
0602 return result;
0603 }
0604 }
0605
0606 public static class SimpleJavaBean2 implements Serializable {
0607 private Timestamp a;
0608 private Date b;
0609 private java.math.BigDecimal c;
0610
0611 public Timestamp getA() { return a; }
0612
0613 public void setA(Timestamp a) { this.a = a; }
0614
0615 public Date getB() { return b; }
0616
0617 public void setB(Date b) { this.b = b; }
0618
0619 public java.math.BigDecimal getC() { return c; }
0620
0621 public void setC(java.math.BigDecimal c) { this.c = c; }
0622
0623 @Override
0624 public boolean equals(Object o) {
0625 if (this == o) return true;
0626 if (o == null || getClass() != o.getClass()) return false;
0627
0628 SimpleJavaBean2 that = (SimpleJavaBean2) o;
0629
0630 if (!a.equals(that.a)) return false;
0631 if (!b.equals(that.b)) return false;
0632 return c.equals(that.c);
0633 }
0634
0635 @Override
0636 public int hashCode() {
0637 int result = a.hashCode();
0638 result = 31 * result + b.hashCode();
0639 result = 31 * result + c.hashCode();
0640 return result;
0641 }
0642 }
0643
0644 public static class NestedJavaBean implements Serializable {
0645 private SimpleJavaBean a;
0646
0647 public SimpleJavaBean getA() {
0648 return a;
0649 }
0650
0651 public void setA(SimpleJavaBean a) {
0652 this.a = a;
0653 }
0654
0655 @Override
0656 public boolean equals(Object o) {
0657 if (this == o) return true;
0658 if (o == null || getClass() != o.getClass()) return false;
0659
0660 NestedJavaBean that = (NestedJavaBean) o;
0661
0662 return a.equals(that.a);
0663 }
0664
0665 @Override
0666 public int hashCode() {
0667 return a.hashCode();
0668 }
0669 }
0670
0671 @Test
0672 public void testJavaBeanEncoder() {
0673 OuterScopes.addOuterScope(this);
0674 SimpleJavaBean obj1 = new SimpleJavaBean();
0675 obj1.setA(true);
0676 obj1.setB(3);
0677 obj1.setC(new byte[]{1, 2});
0678 obj1.setD(new String[]{"hello", null});
0679 obj1.setE(Arrays.asList("a", "b"));
0680 obj1.setF(Arrays.asList(100L, null, 200L));
0681 Map<Integer, String> map1 = new HashMap<>();
0682 map1.put(1, "a");
0683 map1.put(2, "b");
0684 obj1.setG(map1);
0685 Map<String, String> nestedMap1 = new HashMap<>();
0686 nestedMap1.put("x", "1");
0687 nestedMap1.put("y", "2");
0688 Map<List<Long>, Map<String, String>> complexMap1 = new HashMap<>();
0689 complexMap1.put(Arrays.asList(1L, 2L), nestedMap1);
0690 obj1.setH(complexMap1);
0691
0692 SimpleJavaBean obj2 = new SimpleJavaBean();
0693 obj2.setA(false);
0694 obj2.setB(30);
0695 obj2.setC(new byte[]{3, 4});
0696 obj2.setD(new String[]{null, "world"});
0697 obj2.setE(Arrays.asList("x", "y"));
0698 obj2.setF(Arrays.asList(300L, null, 400L));
0699 Map<Integer, String> map2 = new HashMap<>();
0700 map2.put(3, "c");
0701 map2.put(4, "d");
0702 obj2.setG(map2);
0703 Map<String, String> nestedMap2 = new HashMap<>();
0704 nestedMap2.put("q", "1");
0705 nestedMap2.put("w", "2");
0706 Map<List<Long>, Map<String, String>> complexMap2 = new HashMap<>();
0707 complexMap2.put(Arrays.asList(3L, 4L), nestedMap2);
0708 obj2.setH(complexMap2);
0709
0710 List<SimpleJavaBean> data = Arrays.asList(obj1, obj2);
0711 Dataset<SimpleJavaBean> ds = spark.createDataset(data, Encoders.bean(SimpleJavaBean.class));
0712 Assert.assertEquals(data, ds.collectAsList());
0713
0714 NestedJavaBean obj3 = new NestedJavaBean();
0715 obj3.setA(obj1);
0716
0717 List<NestedJavaBean> data2 = Arrays.asList(obj3);
0718 Dataset<NestedJavaBean> ds2 = spark.createDataset(data2, Encoders.bean(NestedJavaBean.class));
0719 Assert.assertEquals(data2, ds2.collectAsList());
0720
0721 Row row1 = new GenericRow(new Object[]{
0722 true,
0723 3,
0724 new byte[]{1, 2},
0725 new String[]{"hello", null},
0726 Arrays.asList("a", "b"),
0727 Arrays.asList(100L, null, 200L),
0728 map1,
0729 complexMap1});
0730 Row row2 = new GenericRow(new Object[]{
0731 false,
0732 30,
0733 new byte[]{3, 4},
0734 new String[]{null, "world"},
0735 Arrays.asList("x", "y"),
0736 Arrays.asList(300L, null, 400L),
0737 map2,
0738 complexMap2});
0739 StructType schema = new StructType()
0740 .add("a", BooleanType, false)
0741 .add("b", IntegerType, false)
0742 .add("c", BinaryType)
0743 .add("d", createArrayType(StringType))
0744 .add("e", createArrayType(StringType))
0745 .add("f", createArrayType(LongType))
0746 .add("g", createMapType(IntegerType, StringType))
0747 .add("h",createMapType(createArrayType(LongType), createMapType(StringType, StringType)));
0748 Dataset<SimpleJavaBean> ds3 = spark.createDataFrame(Arrays.asList(row1, row2), schema)
0749 .as(Encoders.bean(SimpleJavaBean.class));
0750 Assert.assertEquals(data, ds3.collectAsList());
0751 }
0752
0753 @Test
0754 public void testJavaBeanEncoder2() {
0755
0756 OuterScopes.addOuterScope(this);
0757 SimpleJavaBean2 obj = new SimpleJavaBean2();
0758 obj.setA(new Timestamp(0));
0759 obj.setB(new Date(0));
0760 obj.setC(java.math.BigDecimal.valueOf(1));
0761 Dataset<SimpleJavaBean2> ds =
0762 spark.createDataset(Arrays.asList(obj), Encoders.bean(SimpleJavaBean2.class));
0763 ds.collect();
0764 }
0765
0766 public static class SmallBean implements Serializable {
0767 private String a;
0768
0769 private int b;
0770
0771 public int getB() {
0772 return b;
0773 }
0774
0775 public void setB(int b) {
0776 this.b = b;
0777 }
0778
0779 public String getA() {
0780 return a;
0781 }
0782
0783 public void setA(String a) {
0784 this.a = a;
0785 }
0786
0787 @Override
0788 public boolean equals(Object o) {
0789 if (this == o) return true;
0790 if (o == null || getClass() != o.getClass()) return false;
0791 SmallBean smallBean = (SmallBean) o;
0792 return b == smallBean.b && com.google.common.base.Objects.equal(a, smallBean.a);
0793 }
0794
0795 @Override
0796 public int hashCode() {
0797 return Objects.hashCode(a, b);
0798 }
0799 }
0800
0801 public static class NestedSmallBean implements Serializable {
0802 private SmallBean f;
0803
0804 public SmallBean getF() {
0805 return f;
0806 }
0807
0808 public void setF(SmallBean f) {
0809 this.f = f;
0810 }
0811
0812 @Override
0813 public boolean equals(Object o) {
0814 if (this == o) return true;
0815 if (o == null || getClass() != o.getClass()) return false;
0816 NestedSmallBean that = (NestedSmallBean) o;
0817 return Objects.equal(f, that.f);
0818 }
0819
0820 @Override
0821 public int hashCode() {
0822 return Objects.hashCode(f);
0823 }
0824 }
0825
0826 @Rule
0827 public transient ExpectedException nullabilityCheck = ExpectedException.none();
0828
0829 @Test
0830 public void testRuntimeNullabilityCheck() {
0831 OuterScopes.addOuterScope(this);
0832
0833 StructType schema = new StructType()
0834 .add("f", new StructType()
0835 .add("a", StringType, true)
0836 .add("b", IntegerType, true), true);
0837
0838
0839 {
0840 Row row = new GenericRow(new Object[] {
0841 new GenericRow(new Object[] {
0842 "hello", 1
0843 })
0844 });
0845
0846 Dataset<Row> df = spark.createDataFrame(Collections.singletonList(row), schema);
0847 Dataset<NestedSmallBean> ds = df.as(Encoders.bean(NestedSmallBean.class));
0848
0849 SmallBean smallBean = new SmallBean();
0850 smallBean.setA("hello");
0851 smallBean.setB(1);
0852
0853 NestedSmallBean nestedSmallBean = new NestedSmallBean();
0854 nestedSmallBean.setF(smallBean);
0855
0856 Assert.assertEquals(Collections.singletonList(nestedSmallBean), ds.collectAsList());
0857 }
0858
0859
0860 {
0861 Row row = new GenericRow(new Object[] { null });
0862
0863 Dataset<Row> df = spark.createDataFrame(Collections.singletonList(row), schema);
0864 Dataset<NestedSmallBean> ds = df.as(Encoders.bean(NestedSmallBean.class));
0865
0866 NestedSmallBean nestedSmallBean = new NestedSmallBean();
0867 Assert.assertEquals(Collections.singletonList(nestedSmallBean), ds.collectAsList());
0868 }
0869
0870 nullabilityCheck.expect(RuntimeException.class);
0871 nullabilityCheck.expectMessage("Null value appeared in non-nullable field");
0872
0873 {
0874 Row row = new GenericRow(new Object[] {
0875 new GenericRow(new Object[] {
0876 "hello", null
0877 })
0878 });
0879
0880 Dataset<Row> df = spark.createDataFrame(Collections.singletonList(row), schema);
0881 Dataset<NestedSmallBean> ds = df.as(Encoders.bean(NestedSmallBean.class));
0882
0883 ds.collect();
0884 }
0885 }
0886
0887 public static class Nesting3 implements Serializable {
0888 private Integer field3_1;
0889 private Double field3_2;
0890 private String field3_3;
0891
0892 public Nesting3() {
0893 }
0894
0895 public Nesting3(Integer field3_1, Double field3_2, String field3_3) {
0896 this.field3_1 = field3_1;
0897 this.field3_2 = field3_2;
0898 this.field3_3 = field3_3;
0899 }
0900
0901 private Nesting3(Builder builder) {
0902 setField3_1(builder.field3_1);
0903 setField3_2(builder.field3_2);
0904 setField3_3(builder.field3_3);
0905 }
0906
0907 public static Builder newBuilder() {
0908 return new Builder();
0909 }
0910
0911 public Integer getField3_1() {
0912 return field3_1;
0913 }
0914
0915 public void setField3_1(Integer field3_1) {
0916 this.field3_1 = field3_1;
0917 }
0918
0919 public Double getField3_2() {
0920 return field3_2;
0921 }
0922
0923 public void setField3_2(Double field3_2) {
0924 this.field3_2 = field3_2;
0925 }
0926
0927 public String getField3_3() {
0928 return field3_3;
0929 }
0930
0931 public void setField3_3(String field3_3) {
0932 this.field3_3 = field3_3;
0933 }
0934
0935 public static final class Builder {
0936 private Integer field3_1 = 0;
0937 private Double field3_2 = 0.0;
0938 private String field3_3 = "value";
0939
0940 private Builder() {
0941 }
0942
0943 public Builder field3_1(Integer field3_1) {
0944 this.field3_1 = field3_1;
0945 return this;
0946 }
0947
0948 public Builder field3_2(Double field3_2) {
0949 this.field3_2 = field3_2;
0950 return this;
0951 }
0952
0953 public Builder field3_3(String field3_3) {
0954 this.field3_3 = field3_3;
0955 return this;
0956 }
0957
0958 public Nesting3 build() {
0959 return new Nesting3(this);
0960 }
0961 }
0962 }
0963
0964 public static class Nesting2 implements Serializable {
0965 private Nesting3 field2_1;
0966 private Nesting3 field2_2;
0967 private Nesting3 field2_3;
0968
0969 public Nesting2() {
0970 }
0971
0972 public Nesting2(Nesting3 field2_1, Nesting3 field2_2, Nesting3 field2_3) {
0973 this.field2_1 = field2_1;
0974 this.field2_2 = field2_2;
0975 this.field2_3 = field2_3;
0976 }
0977
0978 private Nesting2(Builder builder) {
0979 setField2_1(builder.field2_1);
0980 setField2_2(builder.field2_2);
0981 setField2_3(builder.field2_3);
0982 }
0983
0984 public static Builder newBuilder() {
0985 return new Builder();
0986 }
0987
0988 public Nesting3 getField2_1() {
0989 return field2_1;
0990 }
0991
0992 public void setField2_1(Nesting3 field2_1) {
0993 this.field2_1 = field2_1;
0994 }
0995
0996 public Nesting3 getField2_2() {
0997 return field2_2;
0998 }
0999
1000 public void setField2_2(Nesting3 field2_2) {
1001 this.field2_2 = field2_2;
1002 }
1003
1004 public Nesting3 getField2_3() {
1005 return field2_3;
1006 }
1007
1008 public void setField2_3(Nesting3 field2_3) {
1009 this.field2_3 = field2_3;
1010 }
1011
1012
1013 public static final class Builder {
1014 private Nesting3 field2_1 = Nesting3.newBuilder().build();
1015 private Nesting3 field2_2 = Nesting3.newBuilder().build();
1016 private Nesting3 field2_3 = Nesting3.newBuilder().build();
1017
1018 private Builder() {
1019 }
1020
1021 public Builder field2_1(Nesting3 field2_1) {
1022 this.field2_1 = field2_1;
1023 return this;
1024 }
1025
1026 public Builder field2_2(Nesting3 field2_2) {
1027 this.field2_2 = field2_2;
1028 return this;
1029 }
1030
1031 public Builder field2_3(Nesting3 field2_3) {
1032 this.field2_3 = field2_3;
1033 return this;
1034 }
1035
1036 public Nesting2 build() {
1037 return new Nesting2(this);
1038 }
1039 }
1040 }
1041
1042 public static class Nesting1 implements Serializable {
1043 private Nesting2 field1_1;
1044 private Nesting2 field1_2;
1045 private Nesting2 field1_3;
1046
1047 public Nesting1() {
1048 }
1049
1050 public Nesting1(Nesting2 field1_1, Nesting2 field1_2, Nesting2 field1_3) {
1051 this.field1_1 = field1_1;
1052 this.field1_2 = field1_2;
1053 this.field1_3 = field1_3;
1054 }
1055
1056 private Nesting1(Builder builder) {
1057 setField1_1(builder.field1_1);
1058 setField1_2(builder.field1_2);
1059 setField1_3(builder.field1_3);
1060 }
1061
1062 public static Builder newBuilder() {
1063 return new Builder();
1064 }
1065
1066 public Nesting2 getField1_1() {
1067 return field1_1;
1068 }
1069
1070 public void setField1_1(Nesting2 field1_1) {
1071 this.field1_1 = field1_1;
1072 }
1073
1074 public Nesting2 getField1_2() {
1075 return field1_2;
1076 }
1077
1078 public void setField1_2(Nesting2 field1_2) {
1079 this.field1_2 = field1_2;
1080 }
1081
1082 public Nesting2 getField1_3() {
1083 return field1_3;
1084 }
1085
1086 public void setField1_3(Nesting2 field1_3) {
1087 this.field1_3 = field1_3;
1088 }
1089
1090
1091 public static final class Builder {
1092 private Nesting2 field1_1 = Nesting2.newBuilder().build();
1093 private Nesting2 field1_2 = Nesting2.newBuilder().build();
1094 private Nesting2 field1_3 = Nesting2.newBuilder().build();
1095
1096 private Builder() {
1097 }
1098
1099 public Builder field1_1(Nesting2 field1_1) {
1100 this.field1_1 = field1_1;
1101 return this;
1102 }
1103
1104 public Builder field1_2(Nesting2 field1_2) {
1105 this.field1_2 = field1_2;
1106 return this;
1107 }
1108
1109 public Builder field1_3(Nesting2 field1_3) {
1110 this.field1_3 = field1_3;
1111 return this;
1112 }
1113
1114 public Nesting1 build() {
1115 return new Nesting1(this);
1116 }
1117 }
1118 }
1119
1120 public static class NestedComplicatedJavaBean implements Serializable {
1121 private Nesting1 field1;
1122 private Nesting1 field2;
1123 private Nesting1 field3;
1124 private Nesting1 field4;
1125 private Nesting1 field5;
1126 private Nesting1 field6;
1127 private Nesting1 field7;
1128 private Nesting1 field8;
1129 private Nesting1 field9;
1130 private Nesting1 field10;
1131
1132 public NestedComplicatedJavaBean() {
1133 }
1134
1135 private NestedComplicatedJavaBean(Builder builder) {
1136 setField1(builder.field1);
1137 setField2(builder.field2);
1138 setField3(builder.field3);
1139 setField4(builder.field4);
1140 setField5(builder.field5);
1141 setField6(builder.field6);
1142 setField7(builder.field7);
1143 setField8(builder.field8);
1144 setField9(builder.field9);
1145 setField10(builder.field10);
1146 }
1147
1148 public static Builder newBuilder() {
1149 return new Builder();
1150 }
1151
1152 public Nesting1 getField1() {
1153 return field1;
1154 }
1155
1156 public void setField1(Nesting1 field1) {
1157 this.field1 = field1;
1158 }
1159
1160 public Nesting1 getField2() {
1161 return field2;
1162 }
1163
1164 public void setField2(Nesting1 field2) {
1165 this.field2 = field2;
1166 }
1167
1168 public Nesting1 getField3() {
1169 return field3;
1170 }
1171
1172 public void setField3(Nesting1 field3) {
1173 this.field3 = field3;
1174 }
1175
1176 public Nesting1 getField4() {
1177 return field4;
1178 }
1179
1180 public void setField4(Nesting1 field4) {
1181 this.field4 = field4;
1182 }
1183
1184 public Nesting1 getField5() {
1185 return field5;
1186 }
1187
1188 public void setField5(Nesting1 field5) {
1189 this.field5 = field5;
1190 }
1191
1192 public Nesting1 getField6() {
1193 return field6;
1194 }
1195
1196 public void setField6(Nesting1 field6) {
1197 this.field6 = field6;
1198 }
1199
1200 public Nesting1 getField7() {
1201 return field7;
1202 }
1203
1204 public void setField7(Nesting1 field7) {
1205 this.field7 = field7;
1206 }
1207
1208 public Nesting1 getField8() {
1209 return field8;
1210 }
1211
1212 public void setField8(Nesting1 field8) {
1213 this.field8 = field8;
1214 }
1215
1216 public Nesting1 getField9() {
1217 return field9;
1218 }
1219
1220 public void setField9(Nesting1 field9) {
1221 this.field9 = field9;
1222 }
1223
1224 public Nesting1 getField10() {
1225 return field10;
1226 }
1227
1228 public void setField10(Nesting1 field10) {
1229 this.field10 = field10;
1230 }
1231
1232 public static final class Builder {
1233 private Nesting1 field1 = Nesting1.newBuilder().build();
1234 private Nesting1 field2 = Nesting1.newBuilder().build();
1235 private Nesting1 field3 = Nesting1.newBuilder().build();
1236 private Nesting1 field4 = Nesting1.newBuilder().build();
1237 private Nesting1 field5 = Nesting1.newBuilder().build();
1238 private Nesting1 field6 = Nesting1.newBuilder().build();
1239 private Nesting1 field7 = Nesting1.newBuilder().build();
1240 private Nesting1 field8 = Nesting1.newBuilder().build();
1241 private Nesting1 field9 = Nesting1.newBuilder().build();
1242 private Nesting1 field10 = Nesting1.newBuilder().build();
1243
1244 private Builder() {
1245 }
1246
1247 public Builder field1(Nesting1 field1) {
1248 this.field1 = field1;
1249 return this;
1250 }
1251
1252 public Builder field2(Nesting1 field2) {
1253 this.field2 = field2;
1254 return this;
1255 }
1256
1257 public Builder field3(Nesting1 field3) {
1258 this.field3 = field3;
1259 return this;
1260 }
1261
1262 public Builder field4(Nesting1 field4) {
1263 this.field4 = field4;
1264 return this;
1265 }
1266
1267 public Builder field5(Nesting1 field5) {
1268 this.field5 = field5;
1269 return this;
1270 }
1271
1272 public Builder field6(Nesting1 field6) {
1273 this.field6 = field6;
1274 return this;
1275 }
1276
1277 public Builder field7(Nesting1 field7) {
1278 this.field7 = field7;
1279 return this;
1280 }
1281
1282 public Builder field8(Nesting1 field8) {
1283 this.field8 = field8;
1284 return this;
1285 }
1286
1287 public Builder field9(Nesting1 field9) {
1288 this.field9 = field9;
1289 return this;
1290 }
1291
1292 public Builder field10(Nesting1 field10) {
1293 this.field10 = field10;
1294 return this;
1295 }
1296
1297 public NestedComplicatedJavaBean build() {
1298 return new NestedComplicatedJavaBean(this);
1299 }
1300 }
1301 }
1302
1303 @Test
1304 public void test() {
1305
1306 List<NestedComplicatedJavaBean> data = new ArrayList<>();
1307 data.add(NestedComplicatedJavaBean.newBuilder().build());
1308
1309 NestedComplicatedJavaBean obj3 = new NestedComplicatedJavaBean();
1310
1311 Dataset<NestedComplicatedJavaBean> ds =
1312 spark.createDataset(data, Encoders.bean(NestedComplicatedJavaBean.class));
1313 ds.collectAsList();
1314 }
1315
1316 public enum MyEnum {
1317 A("www.elgoog.com"),
1318 B("www.google.com");
1319
1320 private String url;
1321
1322 MyEnum(String url) {
1323 this.url = url;
1324 }
1325
1326 public String getUrl() {
1327 return url;
1328 }
1329
1330 public void setUrl(String url) {
1331 this.url = url;
1332 }
1333 }
1334
1335 public static class BeanWithEnum {
1336 MyEnum enumField;
1337 String regularField;
1338
1339 public String getRegularField() {
1340 return regularField;
1341 }
1342
1343 public void setRegularField(String regularField) {
1344 this.regularField = regularField;
1345 }
1346
1347 public MyEnum getEnumField() {
1348 return enumField;
1349 }
1350
1351 public void setEnumField(MyEnum field) {
1352 this.enumField = field;
1353 }
1354
1355 public BeanWithEnum(MyEnum enumField, String regularField) {
1356 this.enumField = enumField;
1357 this.regularField = regularField;
1358 }
1359
1360 public BeanWithEnum() {
1361 }
1362
1363 public String toString() {
1364 return "BeanWithEnum(" + enumField + ", " + regularField + ")";
1365 }
1366
1367 public int hashCode() {
1368 return Objects.hashCode(enumField, regularField);
1369 }
1370
1371 public boolean equals(Object other) {
1372 if (other instanceof BeanWithEnum) {
1373 BeanWithEnum beanWithEnum = (BeanWithEnum) other;
1374 return beanWithEnum.regularField.equals(regularField)
1375 && beanWithEnum.enumField.equals(enumField);
1376 }
1377 return false;
1378 }
1379 }
1380
1381 @Test
1382 public void testBeanWithEnum() {
1383 List<BeanWithEnum> data = Arrays.asList(new BeanWithEnum(MyEnum.A, "mira avenue"),
1384 new BeanWithEnum(MyEnum.B, "flower boulevard"));
1385 Encoder<BeanWithEnum> encoder = Encoders.bean(BeanWithEnum.class);
1386 Dataset<BeanWithEnum> ds = spark.createDataset(data, encoder);
1387 Assert.assertEquals(data, ds.collectAsList());
1388 }
1389
1390 public static class EmptyBean implements Serializable {}
1391
1392 @Test
1393 public void testEmptyBean() {
1394 EmptyBean bean = new EmptyBean();
1395 List<EmptyBean> data = Arrays.asList(bean);
1396 Dataset<EmptyBean> df = spark.createDataset(data, Encoders.bean(EmptyBean.class));
1397 Assert.assertEquals(0, df.schema().length());
1398 Assert.assertEquals(1, df.collectAsList().size());
1399 }
1400
1401 public class CircularReference1Bean implements Serializable {
1402 private CircularReference2Bean child;
1403
1404 public CircularReference2Bean getChild() {
1405 return child;
1406 }
1407
1408 public void setChild(CircularReference2Bean child) {
1409 this.child = child;
1410 }
1411 }
1412
1413 public class CircularReference2Bean implements Serializable {
1414 private CircularReference1Bean child;
1415
1416 public CircularReference1Bean getChild() {
1417 return child;
1418 }
1419
1420 public void setChild(CircularReference1Bean child) {
1421 this.child = child;
1422 }
1423 }
1424
1425 public class CircularReference3Bean implements Serializable {
1426 private CircularReference3Bean[] child;
1427
1428 public CircularReference3Bean[] getChild() {
1429 return child;
1430 }
1431
1432 public void setChild(CircularReference3Bean[] child) {
1433 this.child = child;
1434 }
1435 }
1436
1437 public class CircularReference4Bean implements Serializable {
1438 private Map<String, CircularReference5Bean> child;
1439
1440 public Map<String, CircularReference5Bean> getChild() {
1441 return child;
1442 }
1443
1444 public void setChild(Map<String, CircularReference5Bean> child) {
1445 this.child = child;
1446 }
1447 }
1448
1449 public class CircularReference5Bean implements Serializable {
1450 private String id;
1451 private List<CircularReference4Bean> child;
1452
1453 public String getId() {
1454 return id;
1455 }
1456
1457 public List<CircularReference4Bean> getChild() {
1458 return child;
1459 }
1460
1461 public void setId(String id) {
1462 this.id = id;
1463 }
1464
1465 public void setChild(List<CircularReference4Bean> child) {
1466 this.child = child;
1467 }
1468 }
1469
1470 @Test(expected = UnsupportedOperationException.class)
1471 public void testCircularReferenceBean1() {
1472 CircularReference1Bean bean = new CircularReference1Bean();
1473 spark.createDataset(Arrays.asList(bean), Encoders.bean(CircularReference1Bean.class));
1474 }
1475
1476 @Test(expected = UnsupportedOperationException.class)
1477 public void testCircularReferenceBean2() {
1478 CircularReference3Bean bean = new CircularReference3Bean();
1479 spark.createDataset(Arrays.asList(bean), Encoders.bean(CircularReference3Bean.class));
1480 }
1481
1482 @Test(expected = UnsupportedOperationException.class)
1483 public void testCircularReferenceBean3() {
1484 CircularReference4Bean bean = new CircularReference4Bean();
1485 spark.createDataset(Arrays.asList(bean), Encoders.bean(CircularReference4Bean.class));
1486 }
1487
1488 @Test(expected = RuntimeException.class)
1489 public void testNullInTopLevelBean() {
1490 NestedSmallBean bean = new NestedSmallBean();
1491
1492 spark.createDataset(Arrays.asList(bean, null), Encoders.bean(NestedSmallBean.class));
1493 }
1494
1495 @Test
1496 public void testSerializeNull() {
1497 NestedSmallBean bean = new NestedSmallBean();
1498 Encoder<NestedSmallBean> encoder = Encoders.bean(NestedSmallBean.class);
1499 List<NestedSmallBean> beans = Arrays.asList(bean);
1500 Dataset<NestedSmallBean> ds1 = spark.createDataset(beans, encoder);
1501 Assert.assertEquals(beans, ds1.collectAsList());
1502 Dataset<NestedSmallBean> ds2 =
1503 ds1.map((MapFunction<NestedSmallBean, NestedSmallBean>) b -> b, encoder);
1504 Assert.assertEquals(beans, ds2.collectAsList());
1505 }
1506
1507 @Test
1508 public void testSpecificLists() {
1509 SpecificListsBean bean = new SpecificListsBean();
1510 ArrayList<Integer> arrayList = new ArrayList<>();
1511 arrayList.add(1);
1512 bean.setArrayList(arrayList);
1513 LinkedList<Integer> linkedList = new LinkedList<>();
1514 linkedList.add(1);
1515 bean.setLinkedList(linkedList);
1516 bean.setList(Collections.singletonList(1));
1517 List<SpecificListsBean> beans = Collections.singletonList(bean);
1518 Dataset<SpecificListsBean> dataset =
1519 spark.createDataset(beans, Encoders.bean(SpecificListsBean.class));
1520 Assert.assertEquals(beans, dataset.collectAsList());
1521 }
1522
1523 public static class SpecificListsBean implements Serializable {
1524 private ArrayList<Integer> arrayList;
1525 private LinkedList<Integer> linkedList;
1526 private List<Integer> list;
1527
1528 public ArrayList<Integer> getArrayList() {
1529 return arrayList;
1530 }
1531
1532 public void setArrayList(ArrayList<Integer> arrayList) {
1533 this.arrayList = arrayList;
1534 }
1535
1536 public LinkedList<Integer> getLinkedList() {
1537 return linkedList;
1538 }
1539
1540 public void setLinkedList(LinkedList<Integer> linkedList) {
1541 this.linkedList = linkedList;
1542 }
1543
1544 public List<Integer> getList() {
1545 return list;
1546 }
1547
1548 public void setList(List<Integer> list) {
1549 this.list = list;
1550 }
1551
1552 @Override
1553 public boolean equals(Object o) {
1554 if (this == o) return true;
1555 if (o == null || getClass() != o.getClass()) return false;
1556 SpecificListsBean that = (SpecificListsBean) o;
1557 return Objects.equal(arrayList, that.arrayList) &&
1558 Objects.equal(linkedList, that.linkedList) &&
1559 Objects.equal(list, that.list);
1560 }
1561
1562 @Override
1563 public int hashCode() {
1564 return Objects.hashCode(arrayList, linkedList, list);
1565 }
1566 }
1567 }