0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.yarn;
0019
0020 import java.util.Map;
0021
0022 import com.codahale.metrics.*;
0023 import org.apache.hadoop.metrics2.MetricsCollector;
0024 import org.apache.hadoop.metrics2.MetricsInfo;
0025 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
0026 import org.apache.hadoop.metrics2.MetricsSource;
0027
0028
0029
0030
0031
0032
0033 class YarnShuffleServiceMetrics implements MetricsSource {
0034
0035 private final MetricSet metricSet;
0036
0037 YarnShuffleServiceMetrics(MetricSet metricSet) {
0038 this.metricSet = metricSet;
0039 }
0040
0041
0042
0043
0044
0045
0046
0047 @Override
0048 public void getMetrics(MetricsCollector collector, boolean all) {
0049 MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService");
0050
0051 for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
0052 collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
0053 }
0054 }
0055
0056
0057
0058
0059
0060
0061 public static void collectMetric(
0062 MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
0063
0064 if (metric instanceof Timer) {
0065 Timer t = (Timer) metric;
0066 metricsRecordBuilder
0067 .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
0068 t.getCount())
0069 .addGauge(
0070 new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name),
0071 t.getFifteenMinuteRate())
0072 .addGauge(
0073 new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name),
0074 t.getFiveMinuteRate())
0075 .addGauge(
0076 new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name),
0077 t.getOneMinuteRate())
0078 .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
0079 t.getMeanRate());
0080 } else if (metric instanceof Meter) {
0081 Meter m = (Meter) metric;
0082 metricsRecordBuilder
0083 .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name),
0084 m.getCount())
0085 .addGauge(
0086 new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name),
0087 m.getFifteenMinuteRate())
0088 .addGauge(
0089 new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name),
0090 m.getFiveMinuteRate())
0091 .addGauge(
0092 new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name),
0093 m.getOneMinuteRate())
0094 .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name),
0095 m.getMeanRate());
0096 } else if (metric instanceof Gauge) {
0097 final Object gaugeValue = ((Gauge) metric).getValue();
0098 if (gaugeValue instanceof Integer) {
0099 metricsRecordBuilder.addGauge(
0100 getShuffleServiceMetricsInfoForGauge(name), (Integer) gaugeValue);
0101 } else if (gaugeValue instanceof Long) {
0102 metricsRecordBuilder.addGauge(
0103 getShuffleServiceMetricsInfoForGauge(name), (Long) gaugeValue);
0104 } else if (gaugeValue instanceof Float) {
0105 metricsRecordBuilder.addGauge(
0106 getShuffleServiceMetricsInfoForGauge(name), (Float) gaugeValue);
0107 } else if (gaugeValue instanceof Double) {
0108 metricsRecordBuilder.addGauge(
0109 getShuffleServiceMetricsInfoForGauge(name), (Double) gaugeValue);
0110 } else {
0111 throw new IllegalStateException(
0112 "Not supported class type of metric[" + name + "] for value " + gaugeValue);
0113 }
0114 } else if (metric instanceof Counter) {
0115 Counter c = (Counter) metric;
0116 long counterValue = c.getCount();
0117 metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForCounter(name), counterValue);
0118 }
0119 }
0120
0121 private static MetricsInfo getShuffleServiceMetricsInfoForGauge(String name) {
0122 return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name);
0123 }
0124
0125 private static ShuffleServiceMetricsInfo getShuffleServiceMetricsInfoForCounter(String name) {
0126 return new ShuffleServiceMetricsInfo(name, "Value of counter " + name);
0127 }
0128
0129 private static class ShuffleServiceMetricsInfo implements MetricsInfo {
0130
0131 private final String name;
0132 private final String description;
0133
0134 ShuffleServiceMetricsInfo(String name, String description) {
0135 this.name = name;
0136 this.description = description;
0137 }
0138
0139 @Override
0140 public String name() {
0141 return name;
0142 }
0143
0144 @Override
0145 public String description() {
0146 return description;
0147 }
0148 }
0149 }