0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.util;
0019
0020 import java.io.IOException;
0021 import java.util.HashMap;
0022 import java.util.Map;
0023
0024 import com.codahale.metrics.Gauge;
0025 import com.codahale.metrics.Metric;
0026 import com.codahale.metrics.MetricRegistry;
0027 import com.codahale.metrics.MetricSet;
0028 import org.apache.spark.network.TestUtils;
0029 import org.apache.spark.network.client.TransportClient;
0030 import org.junit.After;
0031 import org.junit.Assert;
0032 import org.junit.Test;
0033
0034 import org.apache.spark.network.TransportContext;
0035 import org.apache.spark.network.client.TransportClientFactory;
0036 import org.apache.spark.network.server.NoOpRpcHandler;
0037 import org.apache.spark.network.server.RpcHandler;
0038 import org.apache.spark.network.server.TransportServer;
0039
0040 public class NettyMemoryMetricsSuite {
0041
0042 private TransportConf conf;
0043 private TransportContext context;
0044 private TransportServer server;
0045 private TransportClientFactory clientFactory;
0046
0047 private void setUp(boolean enableVerboseMetrics) {
0048 HashMap<String, String> configMap = new HashMap<>();
0049 configMap.put("spark.shuffle.io.enableVerboseMetrics", String.valueOf(enableVerboseMetrics));
0050 conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
0051 RpcHandler rpcHandler = new NoOpRpcHandler();
0052 context = new TransportContext(conf, rpcHandler);
0053 server = context.createServer();
0054 clientFactory = context.createClientFactory();
0055 }
0056
0057 @After
0058 public void tearDown() {
0059 if (clientFactory != null) {
0060 JavaUtils.closeQuietly(clientFactory);
0061 clientFactory = null;
0062 }
0063 if (server != null) {
0064 JavaUtils.closeQuietly(server);
0065 server = null;
0066 }
0067 if (context != null) {
0068 JavaUtils.closeQuietly(context);
0069 context = null;
0070 }
0071 }
0072
0073 @Test
0074 @SuppressWarnings("unchecked")
0075 public void testGeneralNettyMemoryMetrics() throws IOException, InterruptedException {
0076 setUp(false);
0077
0078 MetricSet serverMetrics = server.getAllMetrics();
0079 Assert.assertNotNull(serverMetrics);
0080 Assert.assertNotNull(serverMetrics.getMetrics());
0081 Assert.assertNotEquals(serverMetrics.getMetrics().size(), 0);
0082
0083 Map<String, Metric> serverMetricMap = serverMetrics.getMetrics();
0084 serverMetricMap.forEach((name, metric) ->
0085 Assert.assertTrue(name.startsWith("shuffle-server"))
0086 );
0087
0088 MetricSet clientMetrics = clientFactory.getAllMetrics();
0089 Assert.assertNotNull(clientMetrics);
0090 Assert.assertNotNull(clientMetrics.getMetrics());
0091 Assert.assertNotEquals(clientMetrics.getMetrics().size(), 0);
0092
0093 Map<String, Metric> clientMetricMap = clientMetrics.getMetrics();
0094 clientMetricMap.forEach((name, metrics) ->
0095 Assert.assertTrue(name.startsWith("shuffle-client"))
0096 );
0097
0098
0099 String heapMemoryMetric = "usedHeapMemory";
0100 String directMemoryMetric = "usedDirectMemory";
0101 Assert.assertNotNull(serverMetricMap.get(
0102 MetricRegistry.name("shuffle-server", heapMemoryMetric)));
0103 Assert.assertNotNull(serverMetricMap.get(
0104 MetricRegistry.name("shuffle-server", directMemoryMetric)));
0105
0106 Assert.assertNotNull(clientMetricMap.get(
0107 MetricRegistry.name("shuffle-client", heapMemoryMetric)));
0108 Assert.assertNotNull(clientMetricMap.get(
0109 MetricRegistry.name("shuffle-client", directMemoryMetric)));
0110
0111 TransportClient client = null;
0112 try {
0113 client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0114 Assert.assertTrue(client.isActive());
0115
0116 Assert.assertTrue(((Gauge<Long>)serverMetricMap.get(
0117 MetricRegistry.name("shuffle-server", heapMemoryMetric))).getValue() >= 0L);
0118 Assert.assertTrue(((Gauge<Long>)serverMetricMap.get(
0119 MetricRegistry.name("shuffle-server", directMemoryMetric))).getValue() >= 0L);
0120
0121 Assert.assertTrue(((Gauge<Long>)clientMetricMap.get(
0122 MetricRegistry.name("shuffle-client", heapMemoryMetric))).getValue() >= 0L);
0123 Assert.assertTrue(((Gauge<Long>)clientMetricMap.get(
0124 MetricRegistry.name("shuffle-client", directMemoryMetric))).getValue() >= 0L);
0125
0126 } finally {
0127 if (client != null) {
0128 client.close();
0129 }
0130 }
0131 }
0132
0133 @Test
0134 @SuppressWarnings("unchecked")
0135 public void testAdditionalMetrics() throws IOException, InterruptedException {
0136 setUp(true);
0137
0138
0139 Map<String, Metric> serverMetricMap = server.getAllMetrics().getMetrics();
0140 serverMetricMap.forEach((name, metric) -> {
0141 Assert.assertTrue(name.startsWith("shuffle-server"));
0142 String metricName = name.substring(name.lastIndexOf(".") + 1);
0143 Assert.assertTrue(metricName.equals("usedDirectMemory")
0144 || metricName.equals("usedHeapMemory")
0145 || NettyMemoryMetrics.VERBOSE_METRICS.contains(metricName));
0146 });
0147
0148 Map<String, Metric> clientMetricMap = clientFactory.getAllMetrics().getMetrics();
0149 clientMetricMap.forEach((name, metric) -> {
0150 Assert.assertTrue(name.startsWith("shuffle-client"));
0151 String metricName = name.substring(name.lastIndexOf(".") + 1);
0152 Assert.assertTrue(metricName.equals("usedDirectMemory")
0153 || metricName.equals("usedHeapMemory")
0154 || NettyMemoryMetrics.VERBOSE_METRICS.contains(metricName));
0155 });
0156
0157 TransportClient client = null;
0158 try {
0159 client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0160 Assert.assertTrue(client.isActive());
0161
0162 String activeBytesMetric = "numActiveBytes";
0163 Assert.assertTrue(((Gauge<Long>) serverMetricMap.get(MetricRegistry.name("shuffle-server",
0164 "directArena0", activeBytesMetric))).getValue() >= 0L);
0165
0166 Assert.assertTrue(((Gauge<Long>) clientMetricMap.get(MetricRegistry.name("shuffle-client",
0167 "directArena0", activeBytesMetric))).getValue() >= 0L);
0168 } finally {
0169 if (client != null) {
0170 client.close();
0171 }
0172 }
0173 }
0174 }