0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.shuffle.sort.io;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.file.Files;
0023
0024 import org.apache.spark.shuffle.IndexShuffleBlockResolver;
0025 import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter;
0026 import org.apache.spark.util.Utils;
0027
0028 public class LocalDiskSingleSpillMapOutputWriter
0029 implements SingleSpillShuffleMapOutputWriter {
0030
0031 private final int shuffleId;
0032 private final long mapId;
0033 private final IndexShuffleBlockResolver blockResolver;
0034
0035 public LocalDiskSingleSpillMapOutputWriter(
0036 int shuffleId,
0037 long mapId,
0038 IndexShuffleBlockResolver blockResolver) {
0039 this.shuffleId = shuffleId;
0040 this.mapId = mapId;
0041 this.blockResolver = blockResolver;
0042 }
0043
0044 @Override
0045 public void transferMapSpillFile(
0046 File mapSpillFile,
0047 long[] partitionLengths) throws IOException {
0048
0049
0050 File outputFile = blockResolver.getDataFile(shuffleId, mapId);
0051 File tempFile = Utils.tempFileWith(outputFile);
0052 Files.move(mapSpillFile.toPath(), tempFile.toPath());
0053 blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tempFile);
0054 }
0055 }