0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.shuffle.sort.io;
0019
0020 import java.util.Map;
0021 import java.util.Optional;
0022
0023 import com.google.common.annotations.VisibleForTesting;
0024
0025 import org.apache.spark.SparkConf;
0026 import org.apache.spark.SparkEnv;
0027 import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
0028 import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
0029 import org.apache.spark.shuffle.IndexShuffleBlockResolver;
0030 import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter;
0031 import org.apache.spark.storage.BlockManager;
0032
0033 public class LocalDiskShuffleExecutorComponents implements ShuffleExecutorComponents {
0034
0035 private final SparkConf sparkConf;
0036 private BlockManager blockManager;
0037 private IndexShuffleBlockResolver blockResolver;
0038
0039 public LocalDiskShuffleExecutorComponents(SparkConf sparkConf) {
0040 this.sparkConf = sparkConf;
0041 }
0042
0043 @VisibleForTesting
0044 public LocalDiskShuffleExecutorComponents(
0045 SparkConf sparkConf,
0046 BlockManager blockManager,
0047 IndexShuffleBlockResolver blockResolver) {
0048 this.sparkConf = sparkConf;
0049 this.blockManager = blockManager;
0050 this.blockResolver = blockResolver;
0051 }
0052
0053 @Override
0054 public void initializeExecutor(String appId, String execId, Map<String, String> extraConfigs) {
0055 blockManager = SparkEnv.get().blockManager();
0056 if (blockManager == null) {
0057 throw new IllegalStateException("No blockManager available from the SparkEnv.");
0058 }
0059 blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
0060 }
0061
0062 @Override
0063 public ShuffleMapOutputWriter createMapOutputWriter(
0064 int shuffleId,
0065 long mapTaskId,
0066 int numPartitions) {
0067 if (blockResolver == null) {
0068 throw new IllegalStateException(
0069 "Executor components must be initialized before getting writers.");
0070 }
0071 return new LocalDiskShuffleMapOutputWriter(
0072 shuffleId, mapTaskId, numPartitions, blockResolver, sparkConf);
0073 }
0074
0075 @Override
0076 public Optional<SingleSpillShuffleMapOutputWriter> createSingleFileMapOutputWriter(
0077 int shuffleId,
0078 long mapId) {
0079 if (blockResolver == null) {
0080 throw new IllegalStateException(
0081 "Executor components must be initialized before getting writers.");
0082 }
0083 return Optional.of(new LocalDiskSingleSpillMapOutputWriter(shuffleId, mapId, blockResolver));
0084 }
0085 }