Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package test.org.apache.spark.sql;
0019 
0020 import java.io.Serializable;
0021 import java.math.BigDecimal;
0022 import java.sql.Date;
0023 import java.sql.Timestamp;
0024 import java.time.Instant;
0025 import java.time.LocalDate;
0026 import java.util.*;
0027 
0028 import org.apache.spark.sql.streaming.GroupStateTimeout;
0029 import org.apache.spark.sql.streaming.OutputMode;
0030 import scala.Tuple2;
0031 import scala.Tuple3;
0032 import scala.Tuple4;
0033 import scala.Tuple5;
0034 
0035 import com.google.common.base.Objects;
0036 import org.junit.*;
0037 import org.junit.rules.ExpectedException;
0038 
0039 import org.apache.spark.api.java.JavaPairRDD;
0040 import org.apache.spark.api.java.JavaSparkContext;
0041 import org.apache.spark.api.java.function.*;
0042 import org.apache.spark.sql.*;
0043 import org.apache.spark.sql.catalyst.encoders.OuterScopes;
0044 import org.apache.spark.sql.catalyst.expressions.GenericRow;
0045 import org.apache.spark.sql.test.TestSparkSession;
0046 import org.apache.spark.sql.types.StructType;
0047 import org.apache.spark.util.LongAccumulator;
0048 import static org.apache.spark.sql.functions.col;
0049 import static org.apache.spark.sql.functions.expr;
0050 import static org.apache.spark.sql.types.DataTypes.*;
0051 
0052 public class JavaDatasetSuite implements Serializable {
0053   private transient TestSparkSession spark;
0054   private transient JavaSparkContext jsc;
0055 
0056   @Before
0057   public void setUp() {
0058     // Trigger static initializer of TestData
0059     spark = new TestSparkSession();
0060     jsc = new JavaSparkContext(spark.sparkContext());
0061     spark.loadTestData();
0062   }
0063 
0064   @After
0065   public void tearDown() {
0066     spark.stop();
0067     spark = null;
0068   }
0069 
0070   private <T1, T2> Tuple2<T1, T2> tuple2(T1 t1, T2 t2) {
0071     return new Tuple2<>(t1, t2);
0072   }
0073 
0074   @Test
0075   public void testCollect() {
0076     List<String> data = Arrays.asList("hello", "world");
0077     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0078     List<String> collected = ds.collectAsList();
0079     Assert.assertEquals(Arrays.asList("hello", "world"), collected);
0080   }
0081 
0082   @Test
0083   public void testTake() {
0084     List<String> data = Arrays.asList("hello", "world");
0085     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0086     List<String> collected = ds.takeAsList(1);
0087     Assert.assertEquals(Arrays.asList("hello"), collected);
0088   }
0089 
0090   @Test
0091   public void testToLocalIterator() {
0092     List<String> data = Arrays.asList("hello", "world");
0093     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0094     Iterator<String> iter = ds.toLocalIterator();
0095     Assert.assertEquals("hello", iter.next());
0096     Assert.assertEquals("world", iter.next());
0097     Assert.assertFalse(iter.hasNext());
0098   }
0099 
0100   // SPARK-15632: typed filter should preserve the underlying logical schema
0101   @Test
0102   public void testTypedFilterPreservingSchema() {
0103     Dataset<Long> ds = spark.range(10);
0104     Dataset<Long> ds2 = ds.filter((FilterFunction<Long>) value -> value > 3);
0105     Assert.assertEquals(ds.schema(), ds2.schema());
0106   }
0107 
0108   @Test
0109   public void testCommonOperation() {
0110     List<String> data = Arrays.asList("hello", "world");
0111     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0112     Assert.assertEquals("hello", ds.first());
0113 
0114     Dataset<String> filtered = ds.filter((FilterFunction<String>) v -> v.startsWith("h"));
0115     Assert.assertEquals(Arrays.asList("hello"), filtered.collectAsList());
0116 
0117 
0118     Dataset<Integer> mapped =
0119       ds.map((MapFunction<String, Integer>) String::length, Encoders.INT());
0120     Assert.assertEquals(Arrays.asList(5, 5), mapped.collectAsList());
0121 
0122     Dataset<String> parMapped = ds.mapPartitions((MapPartitionsFunction<String, String>) it -> {
0123       List<String> ls = new LinkedList<>();
0124       while (it.hasNext()) {
0125         ls.add(it.next().toUpperCase(Locale.ROOT));
0126       }
0127       return ls.iterator();
0128     }, Encoders.STRING());
0129     Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList());
0130 
0131     Dataset<String> flatMapped = ds.flatMap((FlatMapFunction<String, String>) s -> {
0132       List<String> ls = new LinkedList<>();
0133       for (char c : s.toCharArray()) {
0134         ls.add(String.valueOf(c));
0135       }
0136       return ls.iterator();
0137     }, Encoders.STRING());
0138     Assert.assertEquals(
0139       Arrays.asList("h", "e", "l", "l", "o", "w", "o", "r", "l", "d"),
0140       flatMapped.collectAsList());
0141   }
0142 
0143   @Test
0144   public void testForeach() {
0145     LongAccumulator accum = jsc.sc().longAccumulator();
0146     List<String> data = Arrays.asList("a", "b", "c");
0147     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0148 
0149     ds.foreach((ForeachFunction<String>) s -> accum.add(1));
0150     Assert.assertEquals(3, accum.value().intValue());
0151   }
0152 
0153   @Test
0154   public void testReduce() {
0155     List<Integer> data = Arrays.asList(1, 2, 3);
0156     Dataset<Integer> ds = spark.createDataset(data, Encoders.INT());
0157 
0158     int reduced = ds.reduce((ReduceFunction<Integer>) (v1, v2) -> v1 + v2);
0159     Assert.assertEquals(6, reduced);
0160   }
0161 
0162   @Test
0163   public void testGroupBy() {
0164     List<String> data = Arrays.asList("a", "foo", "bar");
0165     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0166     KeyValueGroupedDataset<Integer, String> grouped =
0167       ds.groupByKey((MapFunction<String, Integer>) String::length, Encoders.INT());
0168 
0169     Dataset<String> mapped = grouped.mapGroups(
0170       (MapGroupsFunction<Integer, String, String>) (key, values) -> {
0171         StringBuilder sb = new StringBuilder(key.toString());
0172         while (values.hasNext()) {
0173           sb.append(values.next());
0174         }
0175         return sb.toString();
0176       }, Encoders.STRING());
0177 
0178     Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped.collectAsList()));
0179 
0180     Dataset<String> flatMapped = grouped.flatMapGroups(
0181         (FlatMapGroupsFunction<Integer, String, String>) (key, values) -> {
0182           StringBuilder sb = new StringBuilder(key.toString());
0183           while (values.hasNext()) {
0184             sb.append(values.next());
0185           }
0186           return Collections.singletonList(sb.toString()).iterator();
0187         },
0188       Encoders.STRING());
0189 
0190     Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped.collectAsList()));
0191 
0192     Dataset<String> mapped2 = grouped.mapGroupsWithState(
0193         (MapGroupsWithStateFunction<Integer, String, Long, String>) (key, values, s) -> {
0194           StringBuilder sb = new StringBuilder(key.toString());
0195           while (values.hasNext()) {
0196             sb.append(values.next());
0197           }
0198           return sb.toString();
0199         },
0200         Encoders.LONG(),
0201         Encoders.STRING());
0202 
0203     Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped2.collectAsList()));
0204 
0205     Dataset<String> flatMapped2 = grouped.flatMapGroupsWithState(
0206         (FlatMapGroupsWithStateFunction<Integer, String, Long, String>) (key, values, s) -> {
0207           StringBuilder sb = new StringBuilder(key.toString());
0208           while (values.hasNext()) {
0209             sb.append(values.next());
0210           }
0211           return Collections.singletonList(sb.toString()).iterator();
0212         },
0213       OutputMode.Append(),
0214       Encoders.LONG(),
0215       Encoders.STRING(),
0216       GroupStateTimeout.NoTimeout());
0217 
0218     Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped2.collectAsList()));
0219 
0220     Dataset<Tuple2<Integer, String>> reduced =
0221       grouped.reduceGroups((ReduceFunction<String>) (v1, v2) -> v1 + v2);
0222 
0223     Assert.assertEquals(
0224       asSet(tuple2(1, "a"), tuple2(3, "foobar")),
0225       toSet(reduced.collectAsList()));
0226 
0227     List<Integer> data2 = Arrays.asList(2, 6, 10);
0228     Dataset<Integer> ds2 = spark.createDataset(data2, Encoders.INT());
0229     KeyValueGroupedDataset<Integer, Integer> grouped2 = ds2.groupByKey(
0230         (MapFunction<Integer, Integer>) v -> v / 2,
0231       Encoders.INT());
0232 
0233     Dataset<String> cogrouped = grouped.cogroup(
0234       grouped2,
0235       (CoGroupFunction<Integer, String, Integer, String>) (key, left, right) -> {
0236         StringBuilder sb = new StringBuilder(key.toString());
0237         while (left.hasNext()) {
0238           sb.append(left.next());
0239         }
0240         sb.append("#");
0241         while (right.hasNext()) {
0242           sb.append(right.next());
0243         }
0244         return Collections.singletonList(sb.toString()).iterator();
0245       },
0246       Encoders.STRING());
0247 
0248     Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"), toSet(cogrouped.collectAsList()));
0249   }
0250 
0251   @Test
0252   public void testSelect() {
0253     List<Integer> data = Arrays.asList(2, 6);
0254     Dataset<Integer> ds = spark.createDataset(data, Encoders.INT());
0255 
0256     Dataset<Tuple2<Integer, String>> selected = ds.select(
0257       expr("value + 1"),
0258       col("value").cast("string")).as(Encoders.tuple(Encoders.INT(), Encoders.STRING()));
0259 
0260     Assert.assertEquals(
0261       Arrays.asList(tuple2(3, "2"), tuple2(7, "6")),
0262       selected.collectAsList());
0263   }
0264 
0265   @Test
0266   public void testSetOperation() {
0267     List<String> data = Arrays.asList("abc", "abc", "xyz");
0268     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0269 
0270     Assert.assertEquals(asSet("abc", "xyz"), toSet(ds.distinct().collectAsList()));
0271 
0272     List<String> data2 = Arrays.asList("xyz", "foo", "foo");
0273     Dataset<String> ds2 = spark.createDataset(data2, Encoders.STRING());
0274 
0275     Dataset<String> intersected = ds.intersect(ds2);
0276     Assert.assertEquals(Arrays.asList("xyz"), intersected.collectAsList());
0277 
0278     Dataset<String> unioned = ds.union(ds2).union(ds);
0279     Assert.assertEquals(
0280       Arrays.asList("abc", "abc", "xyz", "xyz", "foo", "foo", "abc", "abc", "xyz"),
0281       unioned.collectAsList());
0282 
0283     Dataset<String> subtracted = ds.except(ds2);
0284     Assert.assertEquals(Arrays.asList("abc"), subtracted.collectAsList());
0285   }
0286 
0287   private static <T> Set<T> toSet(List<T> records) {
0288     return new HashSet<>(records);
0289   }
0290 
0291   @SafeVarargs
0292   @SuppressWarnings("varargs")
0293   private static <T> Set<T> asSet(T... records) {
0294     return toSet(Arrays.asList(records));
0295   }
0296 
0297   @Test
0298   public void testJoin() {
0299     List<Integer> data = Arrays.asList(1, 2, 3);
0300     Dataset<Integer> ds = spark.createDataset(data, Encoders.INT()).as("a");
0301     List<Integer> data2 = Arrays.asList(2, 3, 4);
0302     Dataset<Integer> ds2 = spark.createDataset(data2, Encoders.INT()).as("b");
0303 
0304     Dataset<Tuple2<Integer, Integer>> joined =
0305       ds.joinWith(ds2, col("a.value").equalTo(col("b.value")));
0306     Assert.assertEquals(
0307       Arrays.asList(tuple2(2, 2), tuple2(3, 3)),
0308       joined.collectAsList());
0309   }
0310 
0311   @Test
0312   public void testTupleEncoder() {
0313     Encoder<Tuple2<Integer, String>> encoder2 = Encoders.tuple(Encoders.INT(), Encoders.STRING());
0314     List<Tuple2<Integer, String>> data2 = Arrays.asList(tuple2(1, "a"), tuple2(2, "b"));
0315     Dataset<Tuple2<Integer, String>> ds2 = spark.createDataset(data2, encoder2);
0316     Assert.assertEquals(data2, ds2.collectAsList());
0317 
0318     Encoder<Tuple3<Integer, Long, String>> encoder3 =
0319       Encoders.tuple(Encoders.INT(), Encoders.LONG(), Encoders.STRING());
0320     List<Tuple3<Integer, Long, String>> data3 =
0321       Arrays.asList(new Tuple3<>(1, 2L, "a"));
0322     Dataset<Tuple3<Integer, Long, String>> ds3 = spark.createDataset(data3, encoder3);
0323     Assert.assertEquals(data3, ds3.collectAsList());
0324 
0325     Encoder<Tuple4<Integer, String, Long, String>> encoder4 =
0326       Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING());
0327     List<Tuple4<Integer, String, Long, String>> data4 =
0328       Arrays.asList(new Tuple4<>(1, "b", 2L, "a"));
0329     Dataset<Tuple4<Integer, String, Long, String>> ds4 = spark.createDataset(data4, encoder4);
0330     Assert.assertEquals(data4, ds4.collectAsList());
0331 
0332     Encoder<Tuple5<Integer, String, Long, String, Boolean>> encoder5 =
0333       Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING(),
0334         Encoders.BOOLEAN());
0335     List<Tuple5<Integer, String, Long, String, Boolean>> data5 =
0336       Arrays.asList(new Tuple5<>(1, "b", 2L, "a", true));
0337     Dataset<Tuple5<Integer, String, Long, String, Boolean>> ds5 =
0338       spark.createDataset(data5, encoder5);
0339     Assert.assertEquals(data5, ds5.collectAsList());
0340   }
0341 
0342   @Test
0343   public void testTupleEncoderSchema() {
0344     Encoder<Tuple2<String, Tuple2<String,String>>> encoder =
0345       Encoders.tuple(Encoders.STRING(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
0346     List<Tuple2<String, Tuple2<String, String>>> data = Arrays.asList(tuple2("1", tuple2("a", "b")),
0347       tuple2("2", tuple2("c", "d")));
0348     Dataset<Row> ds1 = spark.createDataset(data, encoder).toDF("value1", "value2");
0349 
0350     JavaPairRDD<String, Tuple2<String, String>> pairRDD = jsc.parallelizePairs(data);
0351     Dataset<Row> ds2 = spark.createDataset(JavaPairRDD.toRDD(pairRDD), encoder)
0352       .toDF("value1", "value2");
0353 
0354     Assert.assertEquals(ds1.schema(), ds2.schema());
0355     Assert.assertEquals(ds1.select(expr("value2._1")).collectAsList(),
0356       ds2.select(expr("value2._1")).collectAsList());
0357   }
0358 
0359   @Test
0360   public void testNestedTupleEncoder() {
0361     // test ((int, string), string)
0362     Encoder<Tuple2<Tuple2<Integer, String>, String>> encoder =
0363       Encoders.tuple(Encoders.tuple(Encoders.INT(), Encoders.STRING()), Encoders.STRING());
0364     List<Tuple2<Tuple2<Integer, String>, String>> data =
0365       Arrays.asList(tuple2(tuple2(1, "a"), "a"), tuple2(tuple2(2, "b"), "b"));
0366     Dataset<Tuple2<Tuple2<Integer, String>, String>> ds = spark.createDataset(data, encoder);
0367     Assert.assertEquals(data, ds.collectAsList());
0368 
0369     // test (int, (string, string, long))
0370     Encoder<Tuple2<Integer, Tuple3<String, String, Long>>> encoder2 =
0371       Encoders.tuple(Encoders.INT(),
0372         Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.LONG()));
0373     List<Tuple2<Integer, Tuple3<String, String, Long>>> data2 =
0374       Arrays.asList(tuple2(1, new Tuple3<>("a", "b", 3L)));
0375     Dataset<Tuple2<Integer, Tuple3<String, String, Long>>> ds2 =
0376       spark.createDataset(data2, encoder2);
0377     Assert.assertEquals(data2, ds2.collectAsList());
0378 
0379     // test (int, ((string, long), string))
0380     Encoder<Tuple2<Integer, Tuple2<Tuple2<String, Long>, String>>> encoder3 =
0381       Encoders.tuple(Encoders.INT(),
0382         Encoders.tuple(Encoders.tuple(Encoders.STRING(), Encoders.LONG()), Encoders.STRING()));
0383     List<Tuple2<Integer, Tuple2<Tuple2<String, Long>, String>>> data3 =
0384       Arrays.asList(tuple2(1, tuple2(tuple2("a", 2L), "b")));
0385     Dataset<Tuple2<Integer, Tuple2<Tuple2<String, Long>, String>>> ds3 =
0386       spark.createDataset(data3, encoder3);
0387     Assert.assertEquals(data3, ds3.collectAsList());
0388   }
0389 
0390   @Test
0391   public void testPrimitiveEncoder() {
0392     Encoder<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> encoder =
0393       Encoders.tuple(Encoders.DOUBLE(), Encoders.DECIMAL(), Encoders.DATE(), Encoders.TIMESTAMP(),
0394         Encoders.FLOAT());
0395     List<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> data =
0396       Arrays.asList(new Tuple5<>(
0397         1.7976931348623157E308, new BigDecimal("0.922337203685477589"),
0398           Date.valueOf("1970-01-01"), new Timestamp(System.currentTimeMillis()), Float.MAX_VALUE));
0399     Dataset<Tuple5<Double, BigDecimal, Date, Timestamp, Float>> ds =
0400       spark.createDataset(data, encoder);
0401     Assert.assertEquals(data, ds.collectAsList());
0402   }
0403 
0404   @Test
0405   public void testLocalDateAndInstantEncoders() {
0406     Encoder<Tuple2<LocalDate, Instant>> encoder =
0407       Encoders.tuple(Encoders.LOCALDATE(), Encoders.INSTANT());
0408     List<Tuple2<LocalDate, Instant>> data =
0409       Arrays.asList(new Tuple2<>(LocalDate.ofEpochDay(0), Instant.ofEpochSecond(0)));
0410     Dataset<Tuple2<LocalDate, Instant>> ds = spark.createDataset(data, encoder);
0411     Assert.assertEquals(data, ds.collectAsList());
0412   }
0413 
0414   public static class KryoSerializable {
0415     String value;
0416 
0417     KryoSerializable(String value) {
0418       this.value = value;
0419     }
0420 
0421     @Override
0422     public boolean equals(Object other) {
0423       if (this == other) return true;
0424       if (other == null || getClass() != other.getClass()) return false;
0425 
0426       return this.value.equals(((KryoSerializable) other).value);
0427     }
0428 
0429     @Override
0430     public int hashCode() {
0431       return this.value.hashCode();
0432     }
0433   }
0434 
0435   public static class JavaSerializable implements Serializable {
0436     String value;
0437 
0438     JavaSerializable(String value) {
0439       this.value = value;
0440     }
0441 
0442     @Override
0443     public boolean equals(Object other) {
0444       if (this == other) return true;
0445       if (other == null || getClass() != other.getClass()) return false;
0446 
0447       return this.value.equals(((JavaSerializable) other).value);
0448     }
0449 
0450     @Override
0451     public int hashCode() {
0452       return this.value.hashCode();
0453     }
0454   }
0455 
0456   @Test
0457   public void testKryoEncoder() {
0458     Encoder<KryoSerializable> encoder = Encoders.kryo(KryoSerializable.class);
0459     List<KryoSerializable> data = Arrays.asList(
0460       new KryoSerializable("hello"), new KryoSerializable("world"));
0461     Dataset<KryoSerializable> ds = spark.createDataset(data, encoder);
0462     Assert.assertEquals(data, ds.collectAsList());
0463   }
0464 
0465   @Test
0466   public void testJavaEncoder() {
0467     Encoder<JavaSerializable> encoder = Encoders.javaSerialization(JavaSerializable.class);
0468     List<JavaSerializable> data = Arrays.asList(
0469       new JavaSerializable("hello"), new JavaSerializable("world"));
0470     Dataset<JavaSerializable> ds = spark.createDataset(data, encoder);
0471     Assert.assertEquals(data, ds.collectAsList());
0472   }
0473 
0474   @Test
0475   public void testRandomSplit() {
0476     List<String> data = Arrays.asList("hello", "world", "from", "spark");
0477     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
0478     double[] arraySplit = {1, 2, 3};
0479 
0480     List<Dataset<String>> randomSplit =  ds.randomSplitAsList(arraySplit, 1);
0481     Assert.assertEquals("wrong number of splits", randomSplit.size(), 3);
0482   }
0483 
0484   /**
0485    * For testing error messages when creating an encoder on a private class. This is done
0486    * here since we cannot create truly private classes in Scala.
0487    */
0488   private static class PrivateClassTest { }
0489 
0490   @Test(expected = UnsupportedOperationException.class)
0491   public void testJavaEncoderErrorMessageForPrivateClass() {
0492     Encoders.javaSerialization(PrivateClassTest.class);
0493   }
0494 
0495   @Test(expected = UnsupportedOperationException.class)
0496   public void testKryoEncoderErrorMessageForPrivateClass() {
0497     Encoders.kryo(PrivateClassTest.class);
0498   }
0499 
0500   public static class SimpleJavaBean implements Serializable {
0501     private boolean a;
0502     private int b;
0503     private byte[] c;
0504     private String[] d;
0505     private List<String> e;
0506     private List<Long> f;
0507     private Map<Integer, String> g;
0508     private Map<List<Long>, Map<String, String>> h;
0509 
0510     public boolean isA() {
0511       return a;
0512     }
0513 
0514     public void setA(boolean a) {
0515       this.a = a;
0516     }
0517 
0518     public int getB() {
0519       return b;
0520     }
0521 
0522     public void setB(int b) {
0523       this.b = b;
0524     }
0525 
0526     public byte[] getC() {
0527       return c;
0528     }
0529 
0530     public void setC(byte[] c) {
0531       this.c = c;
0532     }
0533 
0534     public String[] getD() {
0535       return d;
0536     }
0537 
0538     public void setD(String[] d) {
0539       this.d = d;
0540     }
0541 
0542     public List<String> getE() {
0543       return e;
0544     }
0545 
0546     public void setE(List<String> e) {
0547       this.e = e;
0548     }
0549 
0550     public List<Long> getF() {
0551       return f;
0552     }
0553 
0554     public void setF(List<Long> f) {
0555       this.f = f;
0556     }
0557 
0558     public Map<Integer, String> getG() {
0559       return g;
0560     }
0561 
0562     public void setG(Map<Integer, String> g) {
0563       this.g = g;
0564     }
0565 
0566     public Map<List<Long>, Map<String, String>> getH() {
0567       return h;
0568     }
0569 
0570     public void setH(Map<List<Long>, Map<String, String>> h) {
0571       this.h = h;
0572     }
0573 
0574     @Override
0575     public boolean equals(Object o) {
0576       if (this == o) return true;
0577       if (o == null || getClass() != o.getClass()) return false;
0578 
0579       SimpleJavaBean that = (SimpleJavaBean) o;
0580 
0581       if (a != that.a) return false;
0582       if (b != that.b) return false;
0583       if (!Arrays.equals(c, that.c)) return false;
0584       if (!Arrays.equals(d, that.d)) return false;
0585       if (!e.equals(that.e)) return false;
0586       if (!f.equals(that.f)) return false;
0587       if (!g.equals(that.g)) return false;
0588       return h.equals(that.h);
0589 
0590     }
0591 
0592     @Override
0593     public int hashCode() {
0594       int result = (a ? 1 : 0);
0595       result = 31 * result + b;
0596       result = 31 * result + Arrays.hashCode(c);
0597       result = 31 * result + Arrays.hashCode(d);
0598       result = 31 * result + e.hashCode();
0599       result = 31 * result + f.hashCode();
0600       result = 31 * result + g.hashCode();
0601       result = 31 * result + h.hashCode();
0602       return result;
0603     }
0604   }
0605 
0606   public static class SimpleJavaBean2 implements Serializable {
0607     private Timestamp a;
0608     private Date b;
0609     private java.math.BigDecimal c;
0610 
0611     public Timestamp getA() { return a; }
0612 
0613     public void setA(Timestamp a) { this.a = a; }
0614 
0615     public Date getB() { return b; }
0616 
0617     public void setB(Date b) { this.b = b; }
0618 
0619     public java.math.BigDecimal getC() { return c; }
0620 
0621     public void setC(java.math.BigDecimal c) { this.c = c; }
0622 
0623     @Override
0624     public boolean equals(Object o) {
0625       if (this == o) return true;
0626       if (o == null || getClass() != o.getClass()) return false;
0627 
0628       SimpleJavaBean2 that = (SimpleJavaBean2) o;
0629 
0630       if (!a.equals(that.a)) return false;
0631       if (!b.equals(that.b)) return false;
0632       return c.equals(that.c);
0633     }
0634 
0635     @Override
0636     public int hashCode() {
0637       int result = a.hashCode();
0638       result = 31 * result + b.hashCode();
0639       result = 31 * result + c.hashCode();
0640       return result;
0641     }
0642   }
0643 
0644   public static class NestedJavaBean implements Serializable {
0645     private SimpleJavaBean a;
0646 
0647     public SimpleJavaBean getA() {
0648       return a;
0649     }
0650 
0651     public void setA(SimpleJavaBean a) {
0652       this.a = a;
0653     }
0654 
0655     @Override
0656     public boolean equals(Object o) {
0657       if (this == o) return true;
0658       if (o == null || getClass() != o.getClass()) return false;
0659 
0660       NestedJavaBean that = (NestedJavaBean) o;
0661 
0662       return a.equals(that.a);
0663     }
0664 
0665     @Override
0666     public int hashCode() {
0667       return a.hashCode();
0668     }
0669   }
0670 
0671   @Test
0672   public void testJavaBeanEncoder() {
0673     OuterScopes.addOuterScope(this);
0674     SimpleJavaBean obj1 = new SimpleJavaBean();
0675     obj1.setA(true);
0676     obj1.setB(3);
0677     obj1.setC(new byte[]{1, 2});
0678     obj1.setD(new String[]{"hello", null});
0679     obj1.setE(Arrays.asList("a", "b"));
0680     obj1.setF(Arrays.asList(100L, null, 200L));
0681     Map<Integer, String> map1 = new HashMap<>();
0682     map1.put(1, "a");
0683     map1.put(2, "b");
0684     obj1.setG(map1);
0685     Map<String, String> nestedMap1 = new HashMap<>();
0686     nestedMap1.put("x", "1");
0687     nestedMap1.put("y", "2");
0688     Map<List<Long>, Map<String, String>> complexMap1 = new HashMap<>();
0689     complexMap1.put(Arrays.asList(1L, 2L), nestedMap1);
0690     obj1.setH(complexMap1);
0691 
0692     SimpleJavaBean obj2 = new SimpleJavaBean();
0693     obj2.setA(false);
0694     obj2.setB(30);
0695     obj2.setC(new byte[]{3, 4});
0696     obj2.setD(new String[]{null, "world"});
0697     obj2.setE(Arrays.asList("x", "y"));
0698     obj2.setF(Arrays.asList(300L, null, 400L));
0699     Map<Integer, String> map2 = new HashMap<>();
0700     map2.put(3, "c");
0701     map2.put(4, "d");
0702     obj2.setG(map2);
0703     Map<String, String> nestedMap2 = new HashMap<>();
0704     nestedMap2.put("q", "1");
0705     nestedMap2.put("w", "2");
0706     Map<List<Long>, Map<String, String>> complexMap2 = new HashMap<>();
0707     complexMap2.put(Arrays.asList(3L, 4L), nestedMap2);
0708     obj2.setH(complexMap2);
0709 
0710     List<SimpleJavaBean> data = Arrays.asList(obj1, obj2);
0711     Dataset<SimpleJavaBean> ds = spark.createDataset(data, Encoders.bean(SimpleJavaBean.class));
0712     Assert.assertEquals(data, ds.collectAsList());
0713 
0714     NestedJavaBean obj3 = new NestedJavaBean();
0715     obj3.setA(obj1);
0716 
0717     List<NestedJavaBean> data2 = Arrays.asList(obj3);
0718     Dataset<NestedJavaBean> ds2 = spark.createDataset(data2, Encoders.bean(NestedJavaBean.class));
0719     Assert.assertEquals(data2, ds2.collectAsList());
0720 
0721     Row row1 = new GenericRow(new Object[]{
0722       true,
0723       3,
0724       new byte[]{1, 2},
0725       new String[]{"hello", null},
0726       Arrays.asList("a", "b"),
0727       Arrays.asList(100L, null, 200L),
0728       map1,
0729       complexMap1});
0730     Row row2 = new GenericRow(new Object[]{
0731       false,
0732       30,
0733       new byte[]{3, 4},
0734       new String[]{null, "world"},
0735       Arrays.asList("x", "y"),
0736       Arrays.asList(300L, null, 400L),
0737       map2,
0738       complexMap2});
0739     StructType schema = new StructType()
0740       .add("a", BooleanType, false)
0741       .add("b", IntegerType, false)
0742       .add("c", BinaryType)
0743       .add("d", createArrayType(StringType))
0744       .add("e", createArrayType(StringType))
0745       .add("f", createArrayType(LongType))
0746       .add("g", createMapType(IntegerType, StringType))
0747       .add("h",createMapType(createArrayType(LongType), createMapType(StringType, StringType)));
0748     Dataset<SimpleJavaBean> ds3 = spark.createDataFrame(Arrays.asList(row1, row2), schema)
0749       .as(Encoders.bean(SimpleJavaBean.class));
0750     Assert.assertEquals(data, ds3.collectAsList());
0751   }
0752 
0753   @Test
0754   public void testJavaBeanEncoder2() {
0755     // This is a regression test of SPARK-12404
0756     OuterScopes.addOuterScope(this);
0757     SimpleJavaBean2 obj = new SimpleJavaBean2();
0758     obj.setA(new Timestamp(0));
0759     obj.setB(new Date(0));
0760     obj.setC(java.math.BigDecimal.valueOf(1));
0761     Dataset<SimpleJavaBean2> ds =
0762       spark.createDataset(Arrays.asList(obj), Encoders.bean(SimpleJavaBean2.class));
0763     ds.collect();
0764   }
0765 
0766   public static class SmallBean implements Serializable {
0767     private String a;
0768 
0769     private int b;
0770 
0771     public int getB() {
0772       return b;
0773     }
0774 
0775     public void setB(int b) {
0776       this.b = b;
0777     }
0778 
0779     public String getA() {
0780       return a;
0781     }
0782 
0783     public void setA(String a) {
0784       this.a = a;
0785     }
0786 
0787     @Override
0788     public boolean equals(Object o) {
0789       if (this == o) return true;
0790       if (o == null || getClass() != o.getClass()) return false;
0791       SmallBean smallBean = (SmallBean) o;
0792       return b == smallBean.b && com.google.common.base.Objects.equal(a, smallBean.a);
0793     }
0794 
0795     @Override
0796     public int hashCode() {
0797       return Objects.hashCode(a, b);
0798     }
0799   }
0800 
0801   public static class NestedSmallBean implements Serializable {
0802     private SmallBean f;
0803 
0804     public SmallBean getF() {
0805       return f;
0806     }
0807 
0808     public void setF(SmallBean f) {
0809       this.f = f;
0810     }
0811 
0812     @Override
0813     public boolean equals(Object o) {
0814       if (this == o) return true;
0815       if (o == null || getClass() != o.getClass()) return false;
0816       NestedSmallBean that = (NestedSmallBean) o;
0817       return Objects.equal(f, that.f);
0818     }
0819 
0820     @Override
0821     public int hashCode() {
0822       return Objects.hashCode(f);
0823     }
0824   }
0825 
0826   @Rule
0827   public transient ExpectedException nullabilityCheck = ExpectedException.none();
0828 
0829   @Test
0830   public void testRuntimeNullabilityCheck() {
0831     OuterScopes.addOuterScope(this);
0832 
0833     StructType schema = new StructType()
0834       .add("f", new StructType()
0835         .add("a", StringType, true)
0836         .add("b", IntegerType, true), true);
0837 
0838     // Shouldn't throw runtime exception since it passes nullability check.
0839     {
0840       Row row = new GenericRow(new Object[] {
0841           new GenericRow(new Object[] {
0842               "hello", 1
0843           })
0844       });
0845 
0846       Dataset<Row> df = spark.createDataFrame(Collections.singletonList(row), schema);
0847       Dataset<NestedSmallBean> ds = df.as(Encoders.bean(NestedSmallBean.class));
0848 
0849       SmallBean smallBean = new SmallBean();
0850       smallBean.setA("hello");
0851       smallBean.setB(1);
0852 
0853       NestedSmallBean nestedSmallBean = new NestedSmallBean();
0854       nestedSmallBean.setF(smallBean);
0855 
0856       Assert.assertEquals(Collections.singletonList(nestedSmallBean), ds.collectAsList());
0857     }
0858 
0859     // Shouldn't throw runtime exception when parent object (`ClassData`) is null
0860     {
0861       Row row = new GenericRow(new Object[] { null });
0862 
0863       Dataset<Row> df = spark.createDataFrame(Collections.singletonList(row), schema);
0864       Dataset<NestedSmallBean> ds = df.as(Encoders.bean(NestedSmallBean.class));
0865 
0866       NestedSmallBean nestedSmallBean = new NestedSmallBean();
0867       Assert.assertEquals(Collections.singletonList(nestedSmallBean), ds.collectAsList());
0868     }
0869 
0870     nullabilityCheck.expect(RuntimeException.class);
0871     nullabilityCheck.expectMessage("Null value appeared in non-nullable field");
0872 
0873     {
0874       Row row = new GenericRow(new Object[] {
0875           new GenericRow(new Object[] {
0876               "hello", null
0877           })
0878       });
0879 
0880       Dataset<Row> df = spark.createDataFrame(Collections.singletonList(row), schema);
0881       Dataset<NestedSmallBean> ds = df.as(Encoders.bean(NestedSmallBean.class));
0882 
0883       ds.collect();
0884     }
0885   }
0886 
0887   public static class Nesting3 implements Serializable {
0888     private Integer field3_1;
0889     private Double field3_2;
0890     private String field3_3;
0891 
0892     public Nesting3() {
0893     }
0894 
0895     public Nesting3(Integer field3_1, Double field3_2, String field3_3) {
0896       this.field3_1 = field3_1;
0897       this.field3_2 = field3_2;
0898       this.field3_3 = field3_3;
0899     }
0900 
0901     private Nesting3(Builder builder) {
0902       setField3_1(builder.field3_1);
0903       setField3_2(builder.field3_2);
0904       setField3_3(builder.field3_3);
0905     }
0906 
0907     public static Builder newBuilder() {
0908       return new Builder();
0909     }
0910 
0911     public Integer getField3_1() {
0912       return field3_1;
0913     }
0914 
0915     public void setField3_1(Integer field3_1) {
0916       this.field3_1 = field3_1;
0917     }
0918 
0919     public Double getField3_2() {
0920       return field3_2;
0921     }
0922 
0923     public void setField3_2(Double field3_2) {
0924       this.field3_2 = field3_2;
0925     }
0926 
0927     public String getField3_3() {
0928       return field3_3;
0929     }
0930 
0931     public void setField3_3(String field3_3) {
0932       this.field3_3 = field3_3;
0933     }
0934 
0935     public static final class Builder {
0936       private Integer field3_1 = 0;
0937       private Double field3_2 = 0.0;
0938       private String field3_3 = "value";
0939 
0940       private Builder() {
0941       }
0942 
0943       public Builder field3_1(Integer field3_1) {
0944         this.field3_1 = field3_1;
0945         return this;
0946       }
0947 
0948       public Builder field3_2(Double field3_2) {
0949         this.field3_2 = field3_2;
0950         return this;
0951       }
0952 
0953       public Builder field3_3(String field3_3) {
0954         this.field3_3 = field3_3;
0955         return this;
0956       }
0957 
0958       public Nesting3 build() {
0959         return new Nesting3(this);
0960       }
0961     }
0962   }
0963 
0964   public static class Nesting2 implements Serializable {
0965     private Nesting3 field2_1;
0966     private Nesting3 field2_2;
0967     private Nesting3 field2_3;
0968 
0969     public Nesting2() {
0970     }
0971 
0972     public Nesting2(Nesting3 field2_1, Nesting3 field2_2, Nesting3 field2_3) {
0973       this.field2_1 = field2_1;
0974       this.field2_2 = field2_2;
0975       this.field2_3 = field2_3;
0976     }
0977 
0978     private Nesting2(Builder builder) {
0979       setField2_1(builder.field2_1);
0980       setField2_2(builder.field2_2);
0981       setField2_3(builder.field2_3);
0982     }
0983 
0984     public static Builder newBuilder() {
0985       return new Builder();
0986     }
0987 
0988     public Nesting3 getField2_1() {
0989       return field2_1;
0990     }
0991 
0992     public void setField2_1(Nesting3 field2_1) {
0993       this.field2_1 = field2_1;
0994     }
0995 
0996     public Nesting3 getField2_2() {
0997       return field2_2;
0998     }
0999 
1000     public void setField2_2(Nesting3 field2_2) {
1001       this.field2_2 = field2_2;
1002     }
1003 
1004     public Nesting3 getField2_3() {
1005       return field2_3;
1006     }
1007 
1008     public void setField2_3(Nesting3 field2_3) {
1009       this.field2_3 = field2_3;
1010     }
1011 
1012 
1013     public static final class Builder {
1014       private Nesting3 field2_1 = Nesting3.newBuilder().build();
1015       private Nesting3 field2_2 = Nesting3.newBuilder().build();
1016       private Nesting3 field2_3 = Nesting3.newBuilder().build();
1017 
1018       private Builder() {
1019       }
1020 
1021       public Builder field2_1(Nesting3 field2_1) {
1022         this.field2_1 = field2_1;
1023         return this;
1024       }
1025 
1026       public Builder field2_2(Nesting3 field2_2) {
1027         this.field2_2 = field2_2;
1028         return this;
1029       }
1030 
1031       public Builder field2_3(Nesting3 field2_3) {
1032         this.field2_3 = field2_3;
1033         return this;
1034       }
1035 
1036       public Nesting2 build() {
1037         return new Nesting2(this);
1038       }
1039     }
1040   }
1041 
1042   public static class Nesting1 implements Serializable {
1043     private Nesting2 field1_1;
1044     private Nesting2 field1_2;
1045     private Nesting2 field1_3;
1046 
1047     public Nesting1() {
1048     }
1049 
1050     public Nesting1(Nesting2 field1_1, Nesting2 field1_2, Nesting2 field1_3) {
1051       this.field1_1 = field1_1;
1052       this.field1_2 = field1_2;
1053       this.field1_3 = field1_3;
1054     }
1055 
1056     private Nesting1(Builder builder) {
1057       setField1_1(builder.field1_1);
1058       setField1_2(builder.field1_2);
1059       setField1_3(builder.field1_3);
1060     }
1061 
1062     public static Builder newBuilder() {
1063       return new Builder();
1064     }
1065 
1066     public Nesting2 getField1_1() {
1067       return field1_1;
1068     }
1069 
1070     public void setField1_1(Nesting2 field1_1) {
1071       this.field1_1 = field1_1;
1072     }
1073 
1074     public Nesting2 getField1_2() {
1075       return field1_2;
1076     }
1077 
1078     public void setField1_2(Nesting2 field1_2) {
1079       this.field1_2 = field1_2;
1080     }
1081 
1082     public Nesting2 getField1_3() {
1083       return field1_3;
1084     }
1085 
1086     public void setField1_3(Nesting2 field1_3) {
1087       this.field1_3 = field1_3;
1088     }
1089 
1090 
1091     public static final class Builder {
1092       private Nesting2 field1_1 = Nesting2.newBuilder().build();
1093       private Nesting2 field1_2 = Nesting2.newBuilder().build();
1094       private Nesting2 field1_3 = Nesting2.newBuilder().build();
1095 
1096       private Builder() {
1097       }
1098 
1099       public Builder field1_1(Nesting2 field1_1) {
1100         this.field1_1 = field1_1;
1101         return this;
1102       }
1103 
1104       public Builder field1_2(Nesting2 field1_2) {
1105         this.field1_2 = field1_2;
1106         return this;
1107       }
1108 
1109       public Builder field1_3(Nesting2 field1_3) {
1110         this.field1_3 = field1_3;
1111         return this;
1112       }
1113 
1114       public Nesting1 build() {
1115         return new Nesting1(this);
1116       }
1117     }
1118   }
1119 
1120   public static class NestedComplicatedJavaBean implements Serializable {
1121     private Nesting1 field1;
1122     private Nesting1 field2;
1123     private Nesting1 field3;
1124     private Nesting1 field4;
1125     private Nesting1 field5;
1126     private Nesting1 field6;
1127     private Nesting1 field7;
1128     private Nesting1 field8;
1129     private Nesting1 field9;
1130     private Nesting1 field10;
1131 
1132     public NestedComplicatedJavaBean() {
1133     }
1134 
1135     private NestedComplicatedJavaBean(Builder builder) {
1136       setField1(builder.field1);
1137       setField2(builder.field2);
1138       setField3(builder.field3);
1139       setField4(builder.field4);
1140       setField5(builder.field5);
1141       setField6(builder.field6);
1142       setField7(builder.field7);
1143       setField8(builder.field8);
1144       setField9(builder.field9);
1145       setField10(builder.field10);
1146     }
1147 
1148     public static Builder newBuilder() {
1149       return new Builder();
1150     }
1151 
1152     public Nesting1 getField1() {
1153       return field1;
1154     }
1155 
1156     public void setField1(Nesting1 field1) {
1157       this.field1 = field1;
1158     }
1159 
1160     public Nesting1 getField2() {
1161       return field2;
1162     }
1163 
1164     public void setField2(Nesting1 field2) {
1165       this.field2 = field2;
1166     }
1167 
1168     public Nesting1 getField3() {
1169       return field3;
1170     }
1171 
1172     public void setField3(Nesting1 field3) {
1173       this.field3 = field3;
1174     }
1175 
1176     public Nesting1 getField4() {
1177       return field4;
1178     }
1179 
1180     public void setField4(Nesting1 field4) {
1181       this.field4 = field4;
1182     }
1183 
1184     public Nesting1 getField5() {
1185       return field5;
1186     }
1187 
1188     public void setField5(Nesting1 field5) {
1189       this.field5 = field5;
1190     }
1191 
1192     public Nesting1 getField6() {
1193       return field6;
1194     }
1195 
1196     public void setField6(Nesting1 field6) {
1197       this.field6 = field6;
1198     }
1199 
1200     public Nesting1 getField7() {
1201       return field7;
1202     }
1203 
1204     public void setField7(Nesting1 field7) {
1205       this.field7 = field7;
1206     }
1207 
1208     public Nesting1 getField8() {
1209       return field8;
1210     }
1211 
1212     public void setField8(Nesting1 field8) {
1213       this.field8 = field8;
1214     }
1215 
1216     public Nesting1 getField9() {
1217       return field9;
1218     }
1219 
1220     public void setField9(Nesting1 field9) {
1221       this.field9 = field9;
1222     }
1223 
1224     public Nesting1 getField10() {
1225       return field10;
1226     }
1227 
1228     public void setField10(Nesting1 field10) {
1229       this.field10 = field10;
1230     }
1231 
1232     public static final class Builder {
1233       private Nesting1 field1 = Nesting1.newBuilder().build();
1234       private Nesting1 field2 = Nesting1.newBuilder().build();
1235       private Nesting1 field3 = Nesting1.newBuilder().build();
1236       private Nesting1 field4 = Nesting1.newBuilder().build();
1237       private Nesting1 field5 = Nesting1.newBuilder().build();
1238       private Nesting1 field6 = Nesting1.newBuilder().build();
1239       private Nesting1 field7 = Nesting1.newBuilder().build();
1240       private Nesting1 field8 = Nesting1.newBuilder().build();
1241       private Nesting1 field9 = Nesting1.newBuilder().build();
1242       private Nesting1 field10 = Nesting1.newBuilder().build();
1243 
1244       private Builder() {
1245       }
1246 
1247       public Builder field1(Nesting1 field1) {
1248         this.field1 = field1;
1249         return this;
1250       }
1251 
1252       public Builder field2(Nesting1 field2) {
1253         this.field2 = field2;
1254         return this;
1255       }
1256 
1257       public Builder field3(Nesting1 field3) {
1258         this.field3 = field3;
1259         return this;
1260       }
1261 
1262       public Builder field4(Nesting1 field4) {
1263         this.field4 = field4;
1264         return this;
1265       }
1266 
1267       public Builder field5(Nesting1 field5) {
1268         this.field5 = field5;
1269         return this;
1270       }
1271 
1272       public Builder field6(Nesting1 field6) {
1273         this.field6 = field6;
1274         return this;
1275       }
1276 
1277       public Builder field7(Nesting1 field7) {
1278         this.field7 = field7;
1279         return this;
1280       }
1281 
1282       public Builder field8(Nesting1 field8) {
1283         this.field8 = field8;
1284         return this;
1285       }
1286 
1287       public Builder field9(Nesting1 field9) {
1288         this.field9 = field9;
1289         return this;
1290       }
1291 
1292       public Builder field10(Nesting1 field10) {
1293         this.field10 = field10;
1294         return this;
1295       }
1296 
1297       public NestedComplicatedJavaBean build() {
1298         return new NestedComplicatedJavaBean(this);
1299       }
1300     }
1301   }
1302 
1303   @Test
1304   public void test() {
1305     /* SPARK-15285 Large numbers of Nested JavaBeans generates more than 64KB java bytecode */
1306     List<NestedComplicatedJavaBean> data = new ArrayList<>();
1307     data.add(NestedComplicatedJavaBean.newBuilder().build());
1308 
1309     NestedComplicatedJavaBean obj3 = new NestedComplicatedJavaBean();
1310 
1311     Dataset<NestedComplicatedJavaBean> ds =
1312       spark.createDataset(data, Encoders.bean(NestedComplicatedJavaBean.class));
1313     ds.collectAsList();
1314   }
1315 
1316   public enum MyEnum {
1317     A("www.elgoog.com"),
1318     B("www.google.com");
1319 
1320     private String url;
1321 
1322     MyEnum(String url) {
1323       this.url = url;
1324     }
1325 
1326     public String getUrl() {
1327       return url;
1328     }
1329 
1330     public void setUrl(String url) {
1331       this.url = url;
1332     }
1333   }
1334 
1335   public static class BeanWithEnum {
1336     MyEnum enumField;
1337     String regularField;
1338 
1339     public String getRegularField() {
1340       return regularField;
1341     }
1342 
1343     public void setRegularField(String regularField) {
1344       this.regularField = regularField;
1345     }
1346 
1347     public MyEnum getEnumField() {
1348       return enumField;
1349     }
1350 
1351     public void setEnumField(MyEnum field) {
1352       this.enumField = field;
1353     }
1354 
1355     public BeanWithEnum(MyEnum enumField, String regularField) {
1356       this.enumField = enumField;
1357       this.regularField = regularField;
1358     }
1359 
1360     public BeanWithEnum() {
1361     }
1362 
1363     public String toString() {
1364       return "BeanWithEnum(" + enumField  + ", " + regularField + ")";
1365     }
1366 
1367     public int hashCode() {
1368       return Objects.hashCode(enumField, regularField);
1369     }
1370 
1371     public boolean equals(Object other) {
1372       if (other instanceof BeanWithEnum) {
1373         BeanWithEnum beanWithEnum = (BeanWithEnum) other;
1374         return beanWithEnum.regularField.equals(regularField)
1375           && beanWithEnum.enumField.equals(enumField);
1376       }
1377       return false;
1378     }
1379   }
1380 
1381   @Test
1382   public void testBeanWithEnum() {
1383     List<BeanWithEnum> data = Arrays.asList(new BeanWithEnum(MyEnum.A, "mira avenue"),
1384             new BeanWithEnum(MyEnum.B, "flower boulevard"));
1385     Encoder<BeanWithEnum> encoder = Encoders.bean(BeanWithEnum.class);
1386     Dataset<BeanWithEnum> ds = spark.createDataset(data, encoder);
1387     Assert.assertEquals(data, ds.collectAsList());
1388   }
1389 
1390   public static class EmptyBean implements Serializable {}
1391 
1392   @Test
1393   public void testEmptyBean() {
1394     EmptyBean bean = new EmptyBean();
1395     List<EmptyBean> data = Arrays.asList(bean);
1396     Dataset<EmptyBean> df = spark.createDataset(data, Encoders.bean(EmptyBean.class));
1397     Assert.assertEquals(0, df.schema().length());
1398     Assert.assertEquals(1, df.collectAsList().size());
1399   }
1400 
1401   public class CircularReference1Bean implements Serializable {
1402     private CircularReference2Bean child;
1403 
1404     public CircularReference2Bean getChild() {
1405       return child;
1406     }
1407 
1408     public void setChild(CircularReference2Bean child) {
1409       this.child = child;
1410     }
1411   }
1412 
1413   public class CircularReference2Bean implements Serializable {
1414     private CircularReference1Bean child;
1415 
1416     public CircularReference1Bean getChild() {
1417       return child;
1418     }
1419 
1420     public void setChild(CircularReference1Bean child) {
1421       this.child = child;
1422     }
1423   }
1424 
1425   public class CircularReference3Bean implements Serializable {
1426     private CircularReference3Bean[] child;
1427 
1428     public CircularReference3Bean[] getChild() {
1429       return child;
1430     }
1431 
1432     public void setChild(CircularReference3Bean[] child) {
1433       this.child = child;
1434     }
1435   }
1436 
1437   public class CircularReference4Bean implements Serializable {
1438     private Map<String, CircularReference5Bean> child;
1439 
1440     public Map<String, CircularReference5Bean> getChild() {
1441       return child;
1442     }
1443 
1444     public void setChild(Map<String, CircularReference5Bean> child) {
1445       this.child = child;
1446     }
1447   }
1448 
1449   public class CircularReference5Bean implements Serializable {
1450     private String id;
1451     private List<CircularReference4Bean> child;
1452 
1453     public String getId() {
1454       return id;
1455     }
1456 
1457     public List<CircularReference4Bean> getChild() {
1458       return child;
1459     }
1460 
1461     public void setId(String id) {
1462       this.id = id;
1463     }
1464 
1465     public void setChild(List<CircularReference4Bean> child) {
1466       this.child = child;
1467     }
1468   }
1469 
1470   @Test(expected = UnsupportedOperationException.class)
1471   public void testCircularReferenceBean1() {
1472     CircularReference1Bean bean = new CircularReference1Bean();
1473     spark.createDataset(Arrays.asList(bean), Encoders.bean(CircularReference1Bean.class));
1474   }
1475 
1476   @Test(expected = UnsupportedOperationException.class)
1477   public void testCircularReferenceBean2() {
1478     CircularReference3Bean bean = new CircularReference3Bean();
1479     spark.createDataset(Arrays.asList(bean), Encoders.bean(CircularReference3Bean.class));
1480   }
1481 
1482   @Test(expected = UnsupportedOperationException.class)
1483   public void testCircularReferenceBean3() {
1484     CircularReference4Bean bean = new CircularReference4Bean();
1485     spark.createDataset(Arrays.asList(bean), Encoders.bean(CircularReference4Bean.class));
1486   }
1487 
1488   @Test(expected = RuntimeException.class)
1489   public void testNullInTopLevelBean() {
1490     NestedSmallBean bean = new NestedSmallBean();
1491     // We cannot set null in top-level bean
1492     spark.createDataset(Arrays.asList(bean, null), Encoders.bean(NestedSmallBean.class));
1493   }
1494 
1495   @Test
1496   public void testSerializeNull() {
1497     NestedSmallBean bean = new NestedSmallBean();
1498     Encoder<NestedSmallBean> encoder = Encoders.bean(NestedSmallBean.class);
1499     List<NestedSmallBean> beans = Arrays.asList(bean);
1500     Dataset<NestedSmallBean> ds1 = spark.createDataset(beans, encoder);
1501     Assert.assertEquals(beans, ds1.collectAsList());
1502     Dataset<NestedSmallBean> ds2 =
1503       ds1.map((MapFunction<NestedSmallBean, NestedSmallBean>) b -> b, encoder);
1504     Assert.assertEquals(beans, ds2.collectAsList());
1505   }
1506 
1507   @Test
1508   public void testSpecificLists() {
1509     SpecificListsBean bean = new SpecificListsBean();
1510     ArrayList<Integer> arrayList = new ArrayList<>();
1511     arrayList.add(1);
1512     bean.setArrayList(arrayList);
1513     LinkedList<Integer> linkedList = new LinkedList<>();
1514     linkedList.add(1);
1515     bean.setLinkedList(linkedList);
1516     bean.setList(Collections.singletonList(1));
1517     List<SpecificListsBean> beans = Collections.singletonList(bean);
1518     Dataset<SpecificListsBean> dataset =
1519       spark.createDataset(beans, Encoders.bean(SpecificListsBean.class));
1520     Assert.assertEquals(beans, dataset.collectAsList());
1521   }
1522 
1523   public static class SpecificListsBean implements Serializable {
1524     private ArrayList<Integer> arrayList;
1525     private LinkedList<Integer> linkedList;
1526     private List<Integer> list;
1527 
1528     public ArrayList<Integer> getArrayList() {
1529       return arrayList;
1530     }
1531 
1532     public void setArrayList(ArrayList<Integer> arrayList) {
1533       this.arrayList = arrayList;
1534     }
1535 
1536     public LinkedList<Integer> getLinkedList() {
1537       return linkedList;
1538     }
1539 
1540     public void setLinkedList(LinkedList<Integer> linkedList) {
1541       this.linkedList = linkedList;
1542     }
1543 
1544     public List<Integer> getList() {
1545       return list;
1546     }
1547 
1548     public void setList(List<Integer> list) {
1549       this.list = list;
1550     }
1551 
1552     @Override
1553     public boolean equals(Object o) {
1554       if (this == o) return true;
1555       if (o == null || getClass() != o.getClass()) return false;
1556       SpecificListsBean that = (SpecificListsBean) o;
1557       return Objects.equal(arrayList, that.arrayList) &&
1558         Objects.equal(linkedList, that.linkedList) &&
1559         Objects.equal(list, that.list);
1560     }
1561 
1562     @Override
1563     public int hashCode() {
1564       return Objects.hashCode(arrayList, linkedList, list);
1565     }
1566   }
1567 }