0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.avro;
0019
0020 import java.io.IOException;
0021 import java.io.OutputStream;
0022 import java.util.Map;
0023
0024 import org.apache.avro.Schema;
0025 import org.apache.avro.file.CodecFactory;
0026 import org.apache.avro.file.DataFileWriter;
0027 import org.apache.avro.generic.GenericData;
0028 import org.apache.avro.generic.GenericRecord;
0029 import org.apache.avro.mapred.AvroKey;
0030 import org.apache.avro.mapreduce.AvroKeyOutputFormat;
0031 import org.apache.avro.mapreduce.Syncable;
0032 import org.apache.hadoop.io.NullWritable;
0033 import org.apache.hadoop.mapreduce.RecordWriter;
0034 import org.apache.hadoop.mapreduce.TaskAttemptContext;
0035
0036
0037
0038 public class SparkAvroKeyOutputFormat extends AvroKeyOutputFormat<GenericRecord> {
0039 public SparkAvroKeyOutputFormat(Map<String, String> metadata) {
0040 super(new SparkRecordWriterFactory(metadata));
0041 }
0042
0043 static class SparkRecordWriterFactory extends RecordWriterFactory<GenericRecord> {
0044 private final Map<String, String> metadata;
0045 SparkRecordWriterFactory(Map<String, String> metadata) {
0046 this.metadata = metadata;
0047 }
0048
0049 protected RecordWriter<AvroKey<GenericRecord>, NullWritable> create(
0050 Schema writerSchema,
0051 GenericData dataModel,
0052 CodecFactory compressionCodec,
0053 OutputStream outputStream,
0054 int syncInterval) throws IOException {
0055 return new SparkAvroKeyRecordWriter(
0056 writerSchema, dataModel, compressionCodec, outputStream, syncInterval, metadata);
0057 }
0058 }
0059 }
0060
0061
0062 class SparkAvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable>
0063 implements Syncable {
0064
0065 private final DataFileWriter<T> mAvroFileWriter;
0066
0067 SparkAvroKeyRecordWriter(
0068 Schema writerSchema,
0069 GenericData dataModel,
0070 CodecFactory compressionCodec,
0071 OutputStream outputStream,
0072 int syncInterval,
0073 Map<String, String> metadata) throws IOException {
0074 this.mAvroFileWriter = new DataFileWriter(dataModel.createDatumWriter(writerSchema));
0075 for (Map.Entry<String, String> entry : metadata.entrySet()) {
0076 this.mAvroFileWriter.setMeta(entry.getKey(), entry.getValue());
0077 }
0078 this.mAvroFileWriter.setCodec(compressionCodec);
0079 this.mAvroFileWriter.setSyncInterval(syncInterval);
0080 this.mAvroFileWriter.create(writerSchema, outputStream);
0081 }
0082
0083 public void write(AvroKey<T> record, NullWritable ignore) throws IOException {
0084 this.mAvroFileWriter.append(record.datum());
0085 }
0086
0087 public void close(TaskAttemptContext context) throws IOException {
0088 this.mAvroFileWriter.close();
0089 }
0090
0091 public long sync() throws IOException {
0092 return this.mAvroFileWriter.sync();
0093 }
0094 }