Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import java.io.IOException;
0021 import java.util.Arrays;
0022 
0023 import com.google.common.collect.ImmutableMap;
0024 import org.junit.After;
0025 import org.junit.Before;
0026 import org.junit.Test;
0027 
0028 import static org.junit.Assert.*;
0029 
0030 import org.apache.spark.network.TestUtils;
0031 import org.apache.spark.network.TransportContext;
0032 import org.apache.spark.network.sasl.SaslServerBootstrap;
0033 import org.apache.spark.network.sasl.SecretKeyHolder;
0034 import org.apache.spark.network.server.TransportServer;
0035 import org.apache.spark.network.server.TransportServerBootstrap;
0036 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0037 import org.apache.spark.network.util.MapConfigProvider;
0038 import org.apache.spark.network.util.TransportConf;
0039 
0040 public class ExternalShuffleSecuritySuite {
0041 
0042   TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
0043   TransportServer server;
0044   TransportContext transportContext;
0045 
0046   @Before
0047   public void beforeEach() throws IOException {
0048     transportContext = new TransportContext(conf, new ExternalBlockHandler(conf, null));
0049     TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf,
0050         new TestSecretKeyHolder("my-app-id", "secret"));
0051     this.server = transportContext.createServer(Arrays.asList(bootstrap));
0052   }
0053 
0054   @After
0055   public void afterEach() {
0056     if (server != null) {
0057       server.close();
0058       server = null;
0059     }
0060     if (transportContext != null) {
0061       transportContext.close();
0062       transportContext = null;
0063     }
0064   }
0065 
0066   @Test
0067   public void testValid() throws IOException, InterruptedException {
0068     validate("my-app-id", "secret", false);
0069   }
0070 
0071   @Test
0072   public void testBadAppId() {
0073     try {
0074       validate("wrong-app-id", "secret", false);
0075     } catch (Exception e) {
0076       assertTrue(e.getMessage(), e.getMessage().contains("Wrong appId!"));
0077     }
0078   }
0079 
0080   @Test
0081   public void testBadSecret() {
0082     try {
0083       validate("my-app-id", "bad-secret", false);
0084     } catch (Exception e) {
0085       assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
0086     }
0087   }
0088 
0089   @Test
0090   public void testEncryption() throws IOException, InterruptedException {
0091     validate("my-app-id", "secret", true);
0092   }
0093 
0094   /** Creates an ExternalBlockStoreClient and attempts to register with the server. */
0095   private void validate(String appId, String secretKey, boolean encrypt)
0096         throws IOException, InterruptedException {
0097     TransportConf testConf = conf;
0098     if (encrypt) {
0099       testConf = new TransportConf("shuffle", new MapConfigProvider(
0100         ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true")));
0101     }
0102 
0103     try (ExternalBlockStoreClient client =
0104         new ExternalBlockStoreClient(
0105           testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) {
0106       client.init(appId);
0107       // Registration either succeeds or throws an exception.
0108       client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
0109         new ExecutorShuffleInfo(
0110           new String[0], 0, "org.apache.spark.shuffle.sort.SortShuffleManager")
0111       );
0112     }
0113   }
0114 
0115   /** Provides a secret key holder which always returns the given secret key, for a single appId. */
0116   static class TestSecretKeyHolder implements SecretKeyHolder {
0117     private final String appId;
0118     private final String secretKey;
0119 
0120     TestSecretKeyHolder(String appId, String secretKey) {
0121       this.appId = appId;
0122       this.secretKey = secretKey;
0123     }
0124 
0125     @Override
0126     public String getSaslUser(String appId) {
0127       return "user";
0128     }
0129 
0130     @Override
0131     public String getSecretKey(String appId) {
0132       if (!appId.equals(this.appId)) {
0133         throw new IllegalArgumentException("Wrong appId!");
0134       }
0135       return secretKey;
0136     }
0137   }
0138 }