Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark;
0018 
0019 import java.io.Serializable;
0020 import java.sql.Connection;
0021 import java.sql.DriverManager;
0022 import java.sql.PreparedStatement;
0023 import java.sql.SQLException;
0024 import java.sql.Statement;
0025 
0026 import org.apache.spark.api.java.JavaRDD;
0027 import org.apache.spark.api.java.JavaSparkContext;
0028 import org.apache.spark.rdd.JdbcRDD;
0029 import org.junit.After;
0030 import org.junit.Assert;
0031 import org.junit.Before;
0032 import org.junit.Test;
0033 
0034 public class JavaJdbcRDDSuite implements Serializable {
0035   private transient JavaSparkContext sc;
0036 
0037   @Before
0038   public void setUp() throws ClassNotFoundException, SQLException {
0039     sc = new JavaSparkContext("local", "JavaAPISuite");
0040 
0041     Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
0042 
0043     try (Connection connection = DriverManager.getConnection(
0044         "jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true")) {
0045 
0046       try (Statement create = connection.createStatement()) {
0047         create.execute(
0048           "CREATE TABLE FOO(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY" +
0049                   " (START WITH 1, INCREMENT BY 1), DATA INTEGER)");
0050       }
0051 
0052       try (PreparedStatement insert = connection.prepareStatement(
0053           "INSERT INTO FOO(DATA) VALUES(?)")) {
0054         for (int i = 1; i <= 100; i++) {
0055           insert.setInt(1, i * 2);
0056           insert.executeUpdate();
0057         }
0058       }
0059     } catch (SQLException e) {
0060       // If table doesn't exist...
0061       if (e.getSQLState().compareTo("X0Y32") != 0) {
0062         throw e;
0063       }
0064     }
0065   }
0066 
0067   @After
0068   public void tearDown() throws SQLException {
0069     try {
0070       DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");
0071     } catch(SQLException e) {
0072       // Throw if not normal single database shutdown
0073       // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html
0074       if (e.getSQLState().compareTo("08006") != 0) {
0075         throw e;
0076       }
0077     }
0078 
0079     sc.stop();
0080     sc = null;
0081   }
0082 
0083   @Test
0084   public void testJavaJdbcRDD() throws Exception {
0085     JavaRDD<Integer> rdd = JdbcRDD.create(
0086       sc,
0087       () -> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"),
0088       "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
0089       1, 100, 1,
0090       r -> r.getInt(1)
0091     ).cache();
0092 
0093     Assert.assertEquals(100, rdd.count());
0094     Assert.assertEquals(Integer.valueOf(10100), rdd.reduce((i1, i2) -> i1 + i2));
0095   }
0096 }