Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.util;
0019 
0020 import java.io.IOException;
0021 import java.util.HashMap;
0022 import java.util.Map;
0023 
0024 import com.codahale.metrics.Gauge;
0025 import com.codahale.metrics.Metric;
0026 import com.codahale.metrics.MetricRegistry;
0027 import com.codahale.metrics.MetricSet;
0028 import org.apache.spark.network.TestUtils;
0029 import org.apache.spark.network.client.TransportClient;
0030 import org.junit.After;
0031 import org.junit.Assert;
0032 import org.junit.Test;
0033 
0034 import org.apache.spark.network.TransportContext;
0035 import org.apache.spark.network.client.TransportClientFactory;
0036 import org.apache.spark.network.server.NoOpRpcHandler;
0037 import org.apache.spark.network.server.RpcHandler;
0038 import org.apache.spark.network.server.TransportServer;
0039 
0040 public class NettyMemoryMetricsSuite {
0041 
0042   private TransportConf conf;
0043   private TransportContext context;
0044   private TransportServer server;
0045   private TransportClientFactory clientFactory;
0046 
0047   private void setUp(boolean enableVerboseMetrics) {
0048     HashMap<String, String> configMap = new HashMap<>();
0049     configMap.put("spark.shuffle.io.enableVerboseMetrics", String.valueOf(enableVerboseMetrics));
0050     conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
0051     RpcHandler rpcHandler = new NoOpRpcHandler();
0052     context = new TransportContext(conf, rpcHandler);
0053     server = context.createServer();
0054     clientFactory = context.createClientFactory();
0055   }
0056 
0057   @After
0058   public void tearDown() {
0059     if (clientFactory != null) {
0060       JavaUtils.closeQuietly(clientFactory);
0061       clientFactory = null;
0062     }
0063     if (server != null) {
0064       JavaUtils.closeQuietly(server);
0065       server = null;
0066     }
0067     if (context != null) {
0068       JavaUtils.closeQuietly(context);
0069       context = null;
0070     }
0071   }
0072 
0073   @Test
0074   @SuppressWarnings("unchecked")
0075   public void testGeneralNettyMemoryMetrics() throws IOException, InterruptedException {
0076     setUp(false);
0077 
0078     MetricSet serverMetrics = server.getAllMetrics();
0079     Assert.assertNotNull(serverMetrics);
0080     Assert.assertNotNull(serverMetrics.getMetrics());
0081     Assert.assertNotEquals(serverMetrics.getMetrics().size(), 0);
0082 
0083     Map<String, Metric> serverMetricMap = serverMetrics.getMetrics();
0084     serverMetricMap.forEach((name, metric) ->
0085       Assert.assertTrue(name.startsWith("shuffle-server"))
0086     );
0087 
0088     MetricSet clientMetrics = clientFactory.getAllMetrics();
0089     Assert.assertNotNull(clientMetrics);
0090     Assert.assertNotNull(clientMetrics.getMetrics());
0091     Assert.assertNotEquals(clientMetrics.getMetrics().size(), 0);
0092 
0093     Map<String, Metric> clientMetricMap = clientMetrics.getMetrics();
0094     clientMetricMap.forEach((name, metrics) ->
0095       Assert.assertTrue(name.startsWith("shuffle-client"))
0096     );
0097 
0098     // Make sure general metrics existed.
0099     String heapMemoryMetric = "usedHeapMemory";
0100     String directMemoryMetric = "usedDirectMemory";
0101     Assert.assertNotNull(serverMetricMap.get(
0102       MetricRegistry.name("shuffle-server", heapMemoryMetric)));
0103     Assert.assertNotNull(serverMetricMap.get(
0104       MetricRegistry.name("shuffle-server", directMemoryMetric)));
0105 
0106     Assert.assertNotNull(clientMetricMap.get(
0107       MetricRegistry.name("shuffle-client", heapMemoryMetric)));
0108     Assert.assertNotNull(clientMetricMap.get(
0109       MetricRegistry.name("shuffle-client", directMemoryMetric)));
0110 
0111     TransportClient client = null;
0112     try {
0113       client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0114       Assert.assertTrue(client.isActive());
0115 
0116       Assert.assertTrue(((Gauge<Long>)serverMetricMap.get(
0117         MetricRegistry.name("shuffle-server", heapMemoryMetric))).getValue() >= 0L);
0118       Assert.assertTrue(((Gauge<Long>)serverMetricMap.get(
0119         MetricRegistry.name("shuffle-server", directMemoryMetric))).getValue() >= 0L);
0120 
0121       Assert.assertTrue(((Gauge<Long>)clientMetricMap.get(
0122         MetricRegistry.name("shuffle-client", heapMemoryMetric))).getValue() >= 0L);
0123       Assert.assertTrue(((Gauge<Long>)clientMetricMap.get(
0124         MetricRegistry.name("shuffle-client", directMemoryMetric))).getValue() >= 0L);
0125 
0126     } finally {
0127       if (client != null) {
0128         client.close();
0129       }
0130     }
0131   }
0132 
0133   @Test
0134   @SuppressWarnings("unchecked")
0135   public void testAdditionalMetrics() throws IOException, InterruptedException {
0136     setUp(true);
0137 
0138     // Make sure additional metrics are added.
0139     Map<String, Metric> serverMetricMap = server.getAllMetrics().getMetrics();
0140     serverMetricMap.forEach((name, metric) -> {
0141       Assert.assertTrue(name.startsWith("shuffle-server"));
0142       String metricName = name.substring(name.lastIndexOf(".") + 1);
0143       Assert.assertTrue(metricName.equals("usedDirectMemory")
0144         || metricName.equals("usedHeapMemory")
0145         || NettyMemoryMetrics.VERBOSE_METRICS.contains(metricName));
0146     });
0147 
0148     Map<String, Metric> clientMetricMap = clientFactory.getAllMetrics().getMetrics();
0149     clientMetricMap.forEach((name, metric) -> {
0150       Assert.assertTrue(name.startsWith("shuffle-client"));
0151       String metricName = name.substring(name.lastIndexOf(".") + 1);
0152       Assert.assertTrue(metricName.equals("usedDirectMemory")
0153         || metricName.equals("usedHeapMemory")
0154         || NettyMemoryMetrics.VERBOSE_METRICS.contains(metricName));
0155     });
0156 
0157     TransportClient client = null;
0158     try {
0159       client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0160       Assert.assertTrue(client.isActive());
0161 
0162       String activeBytesMetric = "numActiveBytes";
0163       Assert.assertTrue(((Gauge<Long>) serverMetricMap.get(MetricRegistry.name("shuffle-server",
0164         "directArena0", activeBytesMetric))).getValue() >= 0L);
0165 
0166       Assert.assertTrue(((Gauge<Long>) clientMetricMap.get(MetricRegistry.name("shuffle-client",
0167         "directArena0", activeBytesMetric))).getValue() >= 0L);
0168     } finally {
0169       if (client != null) {
0170         client.close();
0171       }
0172     }
0173   }
0174 }