Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.avro;
0019 
0020 import java.io.IOException;
0021 import java.io.OutputStream;
0022 import java.util.Map;
0023 
0024 import org.apache.avro.Schema;
0025 import org.apache.avro.file.CodecFactory;
0026 import org.apache.avro.file.DataFileWriter;
0027 import org.apache.avro.generic.GenericData;
0028 import org.apache.avro.generic.GenericRecord;
0029 import org.apache.avro.mapred.AvroKey;
0030 import org.apache.avro.mapreduce.AvroKeyOutputFormat;
0031 import org.apache.avro.mapreduce.Syncable;
0032 import org.apache.hadoop.io.NullWritable;
0033 import org.apache.hadoop.mapreduce.RecordWriter;
0034 import org.apache.hadoop.mapreduce.TaskAttemptContext;
0035 
0036 // A variant of `AvroKeyOutputFormat`, which is used to inject the custom `RecordWriterFactory` so
0037 // that we can set avro file metadata.
0038 public class SparkAvroKeyOutputFormat extends AvroKeyOutputFormat<GenericRecord> {
0039   public SparkAvroKeyOutputFormat(Map<String, String> metadata) {
0040     super(new SparkRecordWriterFactory(metadata));
0041   }
0042 
0043   static class SparkRecordWriterFactory extends RecordWriterFactory<GenericRecord> {
0044     private final Map<String, String> metadata;
0045     SparkRecordWriterFactory(Map<String, String> metadata) {
0046       this.metadata = metadata;
0047     }
0048 
0049     protected RecordWriter<AvroKey<GenericRecord>, NullWritable> create(
0050         Schema writerSchema,
0051         GenericData dataModel,
0052         CodecFactory compressionCodec,
0053         OutputStream outputStream,
0054         int syncInterval) throws IOException {
0055       return new SparkAvroKeyRecordWriter(
0056         writerSchema, dataModel, compressionCodec, outputStream, syncInterval, metadata);
0057     }
0058   }
0059 }
0060 
0061 // This a fork of org.apache.avro.mapreduce.AvroKeyRecordWriter, in order to set file metadata.
0062 class SparkAvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable>
0063     implements Syncable {
0064 
0065   private final DataFileWriter<T> mAvroFileWriter;
0066 
0067   SparkAvroKeyRecordWriter(
0068       Schema writerSchema,
0069       GenericData dataModel,
0070       CodecFactory compressionCodec,
0071       OutputStream outputStream,
0072       int syncInterval,
0073       Map<String, String> metadata) throws IOException {
0074     this.mAvroFileWriter = new DataFileWriter(dataModel.createDatumWriter(writerSchema));
0075     for (Map.Entry<String, String> entry : metadata.entrySet()) {
0076       this.mAvroFileWriter.setMeta(entry.getKey(), entry.getValue());
0077     }
0078     this.mAvroFileWriter.setCodec(compressionCodec);
0079     this.mAvroFileWriter.setSyncInterval(syncInterval);
0080     this.mAvroFileWriter.create(writerSchema, outputStream);
0081   }
0082 
0083   public void write(AvroKey<T> record, NullWritable ignore) throws IOException {
0084     this.mAvroFileWriter.append(record.datum());
0085   }
0086 
0087   public void close(TaskAttemptContext context) throws IOException {
0088     this.mAvroFileWriter.close();
0089   }
0090 
0091   public long sync() throws IOException {
0092     return this.mAvroFileWriter.sync();
0093   }
0094 }