0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.util;
0019
0020 import java.lang.reflect.Method;
0021 import java.lang.reflect.Modifier;
0022 import java.util.*;
0023
0024 import com.codahale.metrics.Gauge;
0025 import com.codahale.metrics.Metric;
0026 import com.codahale.metrics.MetricRegistry;
0027 import com.codahale.metrics.MetricSet;
0028 import com.google.common.annotations.VisibleForTesting;
0029 import io.netty.buffer.PoolArenaMetric;
0030 import io.netty.buffer.PooledByteBufAllocator;
0031 import io.netty.buffer.PooledByteBufAllocatorMetric;
0032
0033
0034
0035
0036 public class NettyMemoryMetrics implements MetricSet {
0037
0038 private final PooledByteBufAllocator pooledAllocator;
0039
0040 private final boolean verboseMetricsEnabled;
0041
0042 private final Map<String, Metric> allMetrics;
0043
0044 private final String metricPrefix;
0045
0046 @VisibleForTesting
0047 static final Set<String> VERBOSE_METRICS = new HashSet<>();
0048 static {
0049 VERBOSE_METRICS.addAll(Arrays.asList(
0050 "numAllocations",
0051 "numTinyAllocations",
0052 "numSmallAllocations",
0053 "numNormalAllocations",
0054 "numHugeAllocations",
0055 "numDeallocations",
0056 "numTinyDeallocations",
0057 "numSmallDeallocations",
0058 "numNormalDeallocations",
0059 "numHugeDeallocations",
0060 "numActiveAllocations",
0061 "numActiveTinyAllocations",
0062 "numActiveSmallAllocations",
0063 "numActiveNormalAllocations",
0064 "numActiveHugeAllocations",
0065 "numActiveBytes"));
0066 }
0067
0068 public NettyMemoryMetrics(PooledByteBufAllocator pooledAllocator,
0069 String metricPrefix,
0070 TransportConf conf) {
0071 this.pooledAllocator = pooledAllocator;
0072 this.allMetrics = new HashMap<>();
0073 this.metricPrefix = metricPrefix;
0074 this.verboseMetricsEnabled = conf.verboseMetrics();
0075
0076 registerMetrics(this.pooledAllocator);
0077 }
0078
0079 private void registerMetrics(PooledByteBufAllocator allocator) {
0080 PooledByteBufAllocatorMetric pooledAllocatorMetric = allocator.metric();
0081
0082
0083 allMetrics.put(MetricRegistry.name(metricPrefix, "usedHeapMemory"),
0084 (Gauge<Long>) () -> pooledAllocatorMetric.usedHeapMemory());
0085 allMetrics.put(MetricRegistry.name(metricPrefix, "usedDirectMemory"),
0086 (Gauge<Long>) () -> pooledAllocatorMetric.usedDirectMemory());
0087
0088 if (verboseMetricsEnabled) {
0089 int directArenaIndex = 0;
0090 for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
0091 registerArenaMetric(metric, "directArena" + directArenaIndex);
0092 directArenaIndex++;
0093 }
0094
0095 int heapArenaIndex = 0;
0096 for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) {
0097 registerArenaMetric(metric, "heapArena" + heapArenaIndex);
0098 heapArenaIndex++;
0099 }
0100 }
0101 }
0102
0103 private void registerArenaMetric(PoolArenaMetric arenaMetric, String arenaName) {
0104 for (String methodName : VERBOSE_METRICS) {
0105 Method m;
0106 try {
0107 m = PoolArenaMetric.class.getMethod(methodName);
0108 } catch (Exception e) {
0109
0110 continue;
0111 }
0112
0113 if (!Modifier.isPublic(m.getModifiers())) {
0114
0115 continue;
0116 }
0117
0118 Class<?> returnType = m.getReturnType();
0119 String metricName = MetricRegistry.name(metricPrefix, arenaName, m.getName());
0120 if (returnType.equals(int.class)) {
0121 allMetrics.put(metricName, (Gauge<Integer>) () -> {
0122 try {
0123 return (Integer) m.invoke(arenaMetric);
0124 } catch (Exception e) {
0125 return -1;
0126 }
0127 });
0128
0129 } else if (returnType.equals(long.class)) {
0130 allMetrics.put(metricName, (Gauge<Long>) () -> {
0131 try {
0132 return (Long) m.invoke(arenaMetric);
0133 } catch (Exception e) {
0134 return -1L;
0135 }
0136 });
0137 }
0138 }
0139 }
0140
0141 @Override
0142 public Map<String, Metric> getMetrics() {
0143 return Collections.unmodifiableMap(allMetrics);
0144 }
0145 }