0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.storage;
0019
0020 import java.io.IOException;
0021 import java.io.OutputStream;
0022
0023 import org.apache.spark.annotation.Private;
0024 import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
0025
0026
0027
0028
0029
0030 @Private
0031 public final class TimeTrackingOutputStream extends OutputStream {
0032
0033 private final ShuffleWriteMetricsReporter writeMetrics;
0034 private final OutputStream outputStream;
0035
0036 public TimeTrackingOutputStream(
0037 ShuffleWriteMetricsReporter writeMetrics, OutputStream outputStream) {
0038 this.writeMetrics = writeMetrics;
0039 this.outputStream = outputStream;
0040 }
0041
0042 @Override
0043 public void write(int b) throws IOException {
0044 final long startTime = System.nanoTime();
0045 outputStream.write(b);
0046 writeMetrics.incWriteTime(System.nanoTime() - startTime);
0047 }
0048
0049 @Override
0050 public void write(byte[] b) throws IOException {
0051 final long startTime = System.nanoTime();
0052 outputStream.write(b);
0053 writeMetrics.incWriteTime(System.nanoTime() - startTime);
0054 }
0055
0056 @Override
0057 public void write(byte[] b, int off, int len) throws IOException {
0058 final long startTime = System.nanoTime();
0059 outputStream.write(b, off, len);
0060 writeMetrics.incWriteTime(System.nanoTime() - startTime);
0061 }
0062
0063 @Override
0064 public void flush() throws IOException {
0065 final long startTime = System.nanoTime();
0066 outputStream.flush();
0067 writeMetrics.incWriteTime(System.nanoTime() - startTime);
0068 }
0069
0070 @Override
0071 public void close() throws IOException {
0072 final long startTime = System.nanoTime();
0073 outputStream.close();
0074 writeMetrics.incWriteTime(System.nanoTime() - startTime);
0075 }
0076 }