0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.connector.catalog;
0019
0020 import org.apache.spark.SparkException;
0021 import org.apache.spark.sql.internal.SQLConf;
0022 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
0023 import org.junit.Assert;
0024 import org.junit.Test;
0025
0026 import java.util.concurrent.Callable;
0027
0028 public class CatalogLoadingSuite {
0029 @Test
0030 public void testLoad() throws SparkException {
0031 SQLConf conf = new SQLConf();
0032 conf.setConfString("spark.sql.catalog.test-name", TestCatalogPlugin.class.getCanonicalName());
0033
0034 CatalogPlugin plugin = Catalogs.load("test-name", conf);
0035 Assert.assertNotNull("Should instantiate a non-null plugin", plugin);
0036 Assert.assertEquals("Plugin should have correct implementation",
0037 TestCatalogPlugin.class, plugin.getClass());
0038
0039 TestCatalogPlugin testPlugin = (TestCatalogPlugin) plugin;
0040 Assert.assertEquals("Options should contain no keys", 0, testPlugin.options.size());
0041 Assert.assertEquals("Catalog should have correct name", "test-name", testPlugin.name());
0042 }
0043
0044 @Test
0045 public void testInitializationOptions() throws SparkException {
0046 SQLConf conf = new SQLConf();
0047 conf.setConfString("spark.sql.catalog.test-name", TestCatalogPlugin.class.getCanonicalName());
0048 conf.setConfString("spark.sql.catalog.test-name.name", "not-catalog-name");
0049 conf.setConfString("spark.sql.catalog.test-name.kEy", "valUE");
0050
0051 CatalogPlugin plugin = Catalogs.load("test-name", conf);
0052 Assert.assertNotNull("Should instantiate a non-null plugin", plugin);
0053 Assert.assertEquals("Plugin should have correct implementation",
0054 TestCatalogPlugin.class, plugin.getClass());
0055
0056 TestCatalogPlugin testPlugin = (TestCatalogPlugin) plugin;
0057
0058 Assert.assertEquals("Options should contain only two keys", 2, testPlugin.options.size());
0059 Assert.assertEquals("Options should contain correct value for name (not overwritten)",
0060 "not-catalog-name", testPlugin.options.get("name"));
0061 Assert.assertEquals("Options should contain correct value for key",
0062 "valUE", testPlugin.options.get("key"));
0063 }
0064
0065 @Test
0066 public void testLoadWithoutConfig() {
0067 SQLConf conf = new SQLConf();
0068
0069 SparkException exc = intercept(CatalogNotFoundException.class,
0070 () -> Catalogs.load("missing", conf));
0071
0072 Assert.assertTrue("Should complain that implementation is not configured",
0073 exc.getMessage()
0074 .contains("plugin class not found: spark.sql.catalog.missing is not defined"));
0075 Assert.assertTrue("Should identify the catalog by name",
0076 exc.getMessage().contains("missing"));
0077 }
0078
0079 @Test
0080 public void testLoadMissingClass() {
0081 SQLConf conf = new SQLConf();
0082 conf.setConfString("spark.sql.catalog.missing", "com.example.NoSuchCatalogPlugin");
0083
0084 SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf));
0085
0086 Assert.assertTrue("Should complain that the class is not found",
0087 exc.getMessage().contains("Cannot find catalog plugin class"));
0088 Assert.assertTrue("Should identify the catalog by name",
0089 exc.getMessage().contains("missing"));
0090 Assert.assertTrue("Should identify the missing class",
0091 exc.getMessage().contains("com.example.NoSuchCatalogPlugin"));
0092 }
0093
0094 @Test
0095 public void testLoadNonCatalogPlugin() {
0096 SQLConf conf = new SQLConf();
0097 String invalidClassName = InvalidCatalogPlugin.class.getCanonicalName();
0098 conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
0099
0100 SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf));
0101
0102 Assert.assertTrue("Should complain that class does not implement CatalogPlugin",
0103 exc.getMessage().contains("does not implement CatalogPlugin"));
0104 Assert.assertTrue("Should identify the catalog by name",
0105 exc.getMessage().contains("invalid"));
0106 Assert.assertTrue("Should identify the class",
0107 exc.getMessage().contains(invalidClassName));
0108 }
0109
0110 @Test
0111 public void testLoadConstructorFailureCatalogPlugin() {
0112 SQLConf conf = new SQLConf();
0113 String invalidClassName = ConstructorFailureCatalogPlugin.class.getCanonicalName();
0114 conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
0115
0116 SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf));
0117
0118 Assert.assertTrue("Should identify the constructor error",
0119 exc.getMessage().contains("Failed during instantiating constructor for catalog"));
0120 Assert.assertTrue("Should have expected error message",
0121 exc.getCause().getMessage().contains("Expected failure"));
0122 }
0123
0124 @Test
0125 public void testLoadAccessErrorCatalogPlugin() {
0126 SQLConf conf = new SQLConf();
0127 String invalidClassName = AccessErrorCatalogPlugin.class.getCanonicalName();
0128 conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
0129
0130 SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf));
0131
0132 Assert.assertTrue("Should complain that no public constructor is provided",
0133 exc.getMessage().contains("Failed to call public no-arg constructor for catalog"));
0134 Assert.assertTrue("Should identify the catalog by name",
0135 exc.getMessage().contains("invalid"));
0136 Assert.assertTrue("Should identify the class",
0137 exc.getMessage().contains(invalidClassName));
0138 }
0139
0140 @SuppressWarnings("unchecked")
0141 public static <E extends Exception> E intercept(Class<E> expected, Callable<?> callable) {
0142 try {
0143 callable.call();
0144 Assert.fail("No exception was thrown, expected: " +
0145 expected.getName());
0146 } catch (Exception actual) {
0147 try {
0148 Assert.assertEquals(expected, actual.getClass());
0149 return (E) actual;
0150 } catch (AssertionError e) {
0151 e.addSuppressed(actual);
0152 throw e;
0153 }
0154 }
0155
0156 throw new UnsupportedOperationException("[BUG] Should not reach this statement");
0157 }
0158 }
0159
0160 class TestCatalogPlugin implements CatalogPlugin {
0161 String name = null;
0162 CaseInsensitiveStringMap options = null;
0163
0164 TestCatalogPlugin() {
0165 }
0166
0167 @Override
0168 public void initialize(String name, CaseInsensitiveStringMap options) {
0169 this.name = name;
0170 this.options = options;
0171 }
0172
0173 @Override
0174 public String name() {
0175 return name;
0176 }
0177 }
0178
0179 class ConstructorFailureCatalogPlugin implements CatalogPlugin {
0180 ConstructorFailureCatalogPlugin() {
0181 throw new RuntimeException("Expected failure.");
0182 }
0183
0184 @Override
0185 public void initialize(String name, CaseInsensitiveStringMap options) {
0186 }
0187
0188 @Override
0189 public String name() {
0190 return null;
0191 }
0192 }
0193
0194 class AccessErrorCatalogPlugin implements CatalogPlugin {
0195 private AccessErrorCatalogPlugin() {
0196 }
0197
0198 @Override
0199 public void initialize(String name, CaseInsensitiveStringMap options) {
0200 }
0201
0202 @Override
0203 public String name() {
0204 return null;
0205 }
0206 }
0207
0208 class InvalidCatalogPlugin {
0209 public void initialize(CaseInsensitiveStringMap options) {
0210 }
0211 }