Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.yarn;
0019 
0020 import java.util.Map;
0021 
0022 import com.codahale.metrics.*;
0023 import org.apache.hadoop.metrics2.MetricsCollector;
0024 import org.apache.hadoop.metrics2.MetricsInfo;
0025 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
0026 import org.apache.hadoop.metrics2.MetricsSource;
0027 
0028 /**
0029  * Forward {@link org.apache.spark.network.shuffle.ExternalBlockHandler.ShuffleMetrics}
0030  * to hadoop metrics system.
0031  * NodeManager by default exposes JMX endpoint where can be collected.
0032  */
0033 class YarnShuffleServiceMetrics implements MetricsSource {
0034 
0035   private final MetricSet metricSet;
0036 
0037   YarnShuffleServiceMetrics(MetricSet metricSet) {
0038     this.metricSet = metricSet;
0039   }
0040 
0041   /**
0042    * Get metrics from the source
0043    *
0044    * @param collector to contain the resulting metrics snapshot
0045    * @param all       if true, return all metrics even if unchanged.
0046    */
0047   @Override
0048   public void getMetrics(MetricsCollector collector, boolean all) {
0049     MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService");
0050 
0051     for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
0052       collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
0053     }
0054   }
0055 
0056   /**
0057    * The metric types used in
0058    * {@link org.apache.spark.network.shuffle.ExternalBlockHandler.ShuffleMetrics}.
0059    * Visible for testing.
0060    */
0061   public static void collectMetric(
0062     MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
0063 
0064     if (metric instanceof Timer) {
0065       Timer t = (Timer) metric;
0066       metricsRecordBuilder
0067         .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
0068           t.getCount())
0069         .addGauge(
0070           new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name),
0071           t.getFifteenMinuteRate())
0072         .addGauge(
0073           new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name),
0074           t.getFiveMinuteRate())
0075         .addGauge(
0076           new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name),
0077           t.getOneMinuteRate())
0078         .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
0079           t.getMeanRate());
0080     } else if (metric instanceof Meter) {
0081       Meter m = (Meter) metric;
0082       metricsRecordBuilder
0083         .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name),
0084           m.getCount())
0085         .addGauge(
0086           new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name),
0087           m.getFifteenMinuteRate())
0088         .addGauge(
0089           new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name),
0090           m.getFiveMinuteRate())
0091         .addGauge(
0092           new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name),
0093           m.getOneMinuteRate())
0094         .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name),
0095           m.getMeanRate());
0096     } else if (metric instanceof Gauge) {
0097       final Object gaugeValue = ((Gauge) metric).getValue();
0098       if (gaugeValue instanceof Integer) {
0099         metricsRecordBuilder.addGauge(
0100           getShuffleServiceMetricsInfoForGauge(name), (Integer) gaugeValue);
0101       } else if (gaugeValue instanceof Long) {
0102         metricsRecordBuilder.addGauge(
0103           getShuffleServiceMetricsInfoForGauge(name), (Long) gaugeValue);
0104       } else if (gaugeValue instanceof Float) {
0105         metricsRecordBuilder.addGauge(
0106           getShuffleServiceMetricsInfoForGauge(name), (Float) gaugeValue);
0107       } else if (gaugeValue instanceof Double) {
0108         metricsRecordBuilder.addGauge(
0109           getShuffleServiceMetricsInfoForGauge(name), (Double) gaugeValue);
0110       } else {
0111         throw new IllegalStateException(
0112                 "Not supported class type of metric[" + name + "] for value " + gaugeValue);
0113       }
0114     } else if (metric instanceof Counter) {
0115       Counter c = (Counter) metric;
0116       long counterValue = c.getCount();
0117       metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForCounter(name), counterValue);
0118     }
0119   }
0120 
0121   private static MetricsInfo getShuffleServiceMetricsInfoForGauge(String name) {
0122     return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name);
0123   }
0124 
0125   private static ShuffleServiceMetricsInfo getShuffleServiceMetricsInfoForCounter(String name) {
0126     return new ShuffleServiceMetricsInfo(name, "Value of counter " + name);
0127   }
0128 
0129   private static class ShuffleServiceMetricsInfo implements MetricsInfo {
0130 
0131     private final String name;
0132     private final String description;
0133 
0134     ShuffleServiceMetricsInfo(String name, String description) {
0135       this.name = name;
0136       this.description = description;
0137     }
0138 
0139     @Override
0140     public String name() {
0141       return name;
0142     }
0143 
0144     @Override
0145     public String description() {
0146       return description;
0147     }
0148   }
0149 }