0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import java.io.IOException;
0021 import java.util.Arrays;
0022
0023 import com.google.common.collect.ImmutableMap;
0024 import org.junit.After;
0025 import org.junit.Before;
0026 import org.junit.Test;
0027
0028 import static org.junit.Assert.*;
0029
0030 import org.apache.spark.network.TestUtils;
0031 import org.apache.spark.network.TransportContext;
0032 import org.apache.spark.network.sasl.SaslServerBootstrap;
0033 import org.apache.spark.network.sasl.SecretKeyHolder;
0034 import org.apache.spark.network.server.TransportServer;
0035 import org.apache.spark.network.server.TransportServerBootstrap;
0036 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0037 import org.apache.spark.network.util.MapConfigProvider;
0038 import org.apache.spark.network.util.TransportConf;
0039
0040 public class ExternalShuffleSecuritySuite {
0041
0042 TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
0043 TransportServer server;
0044 TransportContext transportContext;
0045
0046 @Before
0047 public void beforeEach() throws IOException {
0048 transportContext = new TransportContext(conf, new ExternalBlockHandler(conf, null));
0049 TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf,
0050 new TestSecretKeyHolder("my-app-id", "secret"));
0051 this.server = transportContext.createServer(Arrays.asList(bootstrap));
0052 }
0053
0054 @After
0055 public void afterEach() {
0056 if (server != null) {
0057 server.close();
0058 server = null;
0059 }
0060 if (transportContext != null) {
0061 transportContext.close();
0062 transportContext = null;
0063 }
0064 }
0065
0066 @Test
0067 public void testValid() throws IOException, InterruptedException {
0068 validate("my-app-id", "secret", false);
0069 }
0070
0071 @Test
0072 public void testBadAppId() {
0073 try {
0074 validate("wrong-app-id", "secret", false);
0075 } catch (Exception e) {
0076 assertTrue(e.getMessage(), e.getMessage().contains("Wrong appId!"));
0077 }
0078 }
0079
0080 @Test
0081 public void testBadSecret() {
0082 try {
0083 validate("my-app-id", "bad-secret", false);
0084 } catch (Exception e) {
0085 assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
0086 }
0087 }
0088
0089 @Test
0090 public void testEncryption() throws IOException, InterruptedException {
0091 validate("my-app-id", "secret", true);
0092 }
0093
0094
0095 private void validate(String appId, String secretKey, boolean encrypt)
0096 throws IOException, InterruptedException {
0097 TransportConf testConf = conf;
0098 if (encrypt) {
0099 testConf = new TransportConf("shuffle", new MapConfigProvider(
0100 ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true")));
0101 }
0102
0103 try (ExternalBlockStoreClient client =
0104 new ExternalBlockStoreClient(
0105 testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) {
0106 client.init(appId);
0107
0108 client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
0109 new ExecutorShuffleInfo(
0110 new String[0], 0, "org.apache.spark.shuffle.sort.SortShuffleManager")
0111 );
0112 }
0113 }
0114
0115
0116 static class TestSecretKeyHolder implements SecretKeyHolder {
0117 private final String appId;
0118 private final String secretKey;
0119
0120 TestSecretKeyHolder(String appId, String secretKey) {
0121 this.appId = appId;
0122 this.secretKey = secretKey;
0123 }
0124
0125 @Override
0126 public String getSaslUser(String appId) {
0127 return "user";
0128 }
0129
0130 @Override
0131 public String getSecretKey(String appId) {
0132 if (!appId.equals(this.appId)) {
0133 throw new IllegalArgumentException("Wrong appId!");
0134 }
0135 return secretKey;
0136 }
0137 }
0138 }