0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark;
0018
0019 import java.io.Serializable;
0020 import java.sql.Connection;
0021 import java.sql.DriverManager;
0022 import java.sql.PreparedStatement;
0023 import java.sql.SQLException;
0024 import java.sql.Statement;
0025
0026 import org.apache.spark.api.java.JavaRDD;
0027 import org.apache.spark.api.java.JavaSparkContext;
0028 import org.apache.spark.rdd.JdbcRDD;
0029 import org.junit.After;
0030 import org.junit.Assert;
0031 import org.junit.Before;
0032 import org.junit.Test;
0033
0034 public class JavaJdbcRDDSuite implements Serializable {
0035 private transient JavaSparkContext sc;
0036
0037 @Before
0038 public void setUp() throws ClassNotFoundException, SQLException {
0039 sc = new JavaSparkContext("local", "JavaAPISuite");
0040
0041 Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
0042
0043 try (Connection connection = DriverManager.getConnection(
0044 "jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true")) {
0045
0046 try (Statement create = connection.createStatement()) {
0047 create.execute(
0048 "CREATE TABLE FOO(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY" +
0049 " (START WITH 1, INCREMENT BY 1), DATA INTEGER)");
0050 }
0051
0052 try (PreparedStatement insert = connection.prepareStatement(
0053 "INSERT INTO FOO(DATA) VALUES(?)")) {
0054 for (int i = 1; i <= 100; i++) {
0055 insert.setInt(1, i * 2);
0056 insert.executeUpdate();
0057 }
0058 }
0059 } catch (SQLException e) {
0060
0061 if (e.getSQLState().compareTo("X0Y32") != 0) {
0062 throw e;
0063 }
0064 }
0065 }
0066
0067 @After
0068 public void tearDown() throws SQLException {
0069 try {
0070 DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");
0071 } catch(SQLException e) {
0072
0073
0074 if (e.getSQLState().compareTo("08006") != 0) {
0075 throw e;
0076 }
0077 }
0078
0079 sc.stop();
0080 sc = null;
0081 }
0082
0083 @Test
0084 public void testJavaJdbcRDD() throws Exception {
0085 JavaRDD<Integer> rdd = JdbcRDD.create(
0086 sc,
0087 () -> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"),
0088 "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
0089 1, 100, 1,
0090 r -> r.getInt(1)
0091 ).cache();
0092
0093 Assert.assertEquals(100, rdd.count());
0094 Assert.assertEquals(Integer.valueOf(10100), rdd.reduce((i1, i2) -> i1 + i2));
0095 }
0096 }