Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli;
0020 
0021 import java.io.ByteArrayInputStream;
0022 import java.util.ArrayList;
0023 import java.util.Iterator;
0024 import java.util.List;
0025 
0026 import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer;
0027 import org.apache.hive.service.rpc.thrift.TColumn;
0028 import org.apache.hive.service.rpc.thrift.TRow;
0029 import org.apache.hive.service.rpc.thrift.TRowSet;
0030 import org.apache.thrift.TException;
0031 import org.apache.thrift.protocol.TCompactProtocol;
0032 import org.apache.thrift.protocol.TProtocol;
0033 import org.apache.thrift.transport.TIOStreamTransport;
0034 import org.slf4j.Logger;
0035 import org.slf4j.LoggerFactory;
0036 
0037 /**
0038  * ColumnBasedSet.
0039  */
0040 public class ColumnBasedSet implements RowSet {
0041 
0042   private long startOffset;
0043 
0044   private final TypeDescriptor[] descriptors; // non-null only for writing (server-side)
0045   private final List<ColumnBuffer> columns;
0046   private byte[] blob;
0047   private boolean isBlobBased = false;
0048   public static final Logger LOG = LoggerFactory.getLogger(ColumnBasedSet.class);
0049 
0050   public ColumnBasedSet(TableSchema schema) {
0051     descriptors = schema.toTypeDescriptors();
0052     columns = new ArrayList<ColumnBuffer>();
0053     for (ColumnDescriptor colDesc : schema.getColumnDescriptors()) {
0054       columns.add(new ColumnBuffer(colDesc.getType()));
0055     }
0056   }
0057 
0058   public ColumnBasedSet(TRowSet tRowSet) throws TException {
0059     descriptors = null;
0060     columns = new ArrayList<ColumnBuffer>();
0061     // Use TCompactProtocol to read serialized TColumns
0062     if (tRowSet.isSetBinaryColumns()) {
0063       TProtocol protocol =
0064           new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream(
0065               tRowSet.getBinaryColumns())));
0066       // Read from the stream using the protocol for each column in final schema
0067       for (int i = 0; i < tRowSet.getColumnCount(); i++) {
0068         TColumn tvalue = new TColumn();
0069         try {
0070           tvalue.read(protocol);
0071         } catch (TException e) {
0072           LOG.error(e.getMessage(), e);
0073           throw new TException("Error reading column value from the row set blob", e);
0074         }
0075         columns.add(new ColumnBuffer(tvalue));
0076       }
0077     }
0078     else {
0079       if (tRowSet.getColumns() != null) {
0080         for (TColumn tvalue : tRowSet.getColumns()) {
0081           columns.add(new ColumnBuffer(tvalue));
0082         }
0083       }
0084     }
0085     startOffset = tRowSet.getStartRowOffset();
0086   }
0087 
0088   private ColumnBasedSet(TypeDescriptor[] descriptors, List<ColumnBuffer> columns, long startOffset) {
0089     this.descriptors = descriptors;
0090     this.columns = columns;
0091     this.startOffset = startOffset;
0092   }
0093 
0094   public ColumnBasedSet(TableSchema schema, boolean isBlobBased) {
0095     this(schema);
0096     this.isBlobBased = isBlobBased;
0097   }
0098 
0099   @Override
0100   public ColumnBasedSet addRow(Object[] fields) {
0101     if (isBlobBased) {
0102       this.blob = (byte[]) fields[0];
0103     } else {
0104       for (int i = 0; i < fields.length; i++) {
0105         TypeDescriptor descriptor = descriptors[i];
0106         columns.get(i).addValue(descriptor.getType(), fields[i]);
0107       }
0108     }
0109     return this;
0110   }
0111 
0112   public List<ColumnBuffer> getColumns() {
0113     return columns;
0114   }
0115 
0116   @Override
0117   public int numColumns() {
0118     return columns.size();
0119   }
0120 
0121   @Override
0122   public int numRows() {
0123     return columns.isEmpty() ? 0 : columns.get(0).size();
0124   }
0125 
0126   @Override
0127   public ColumnBasedSet extractSubset(int maxRows) {
0128     int numRows = Math.min(numRows(), maxRows);
0129 
0130     List<ColumnBuffer> subset = new ArrayList<ColumnBuffer>();
0131     for (int i = 0; i < columns.size(); i++) {
0132       subset.add(columns.get(i).extractSubset(numRows));
0133     }
0134     ColumnBasedSet result = new ColumnBasedSet(descriptors, subset, startOffset);
0135     startOffset += numRows;
0136     return result;
0137   }
0138 
0139   @Override
0140   public long getStartOffset() {
0141     return startOffset;
0142   }
0143 
0144   @Override
0145   public void setStartOffset(long startOffset) {
0146     this.startOffset = startOffset;
0147   }
0148 
0149   public TRowSet toTRowSet() {
0150     TRowSet tRowSet = new TRowSet(startOffset, new ArrayList<TRow>());
0151     if (isBlobBased) {
0152       tRowSet.setColumns(null);
0153       tRowSet.setBinaryColumns(blob);
0154       tRowSet.setColumnCount(numColumns());
0155     } else {
0156       for (int i = 0; i < columns.size(); i++) {
0157         tRowSet.addToColumns(columns.get(i).toTColumn());
0158       }
0159     }
0160     return tRowSet;
0161   }
0162 
0163   @Override
0164   public Iterator<Object[]> iterator() {
0165     return new Iterator<Object[]>() {
0166 
0167       private int index;
0168       private final Object[] convey = new Object[numColumns()];
0169 
0170       @Override
0171       public boolean hasNext() {
0172         return index < numRows();
0173       }
0174 
0175       @Override
0176       public Object[] next() {
0177         for (int i = 0; i < columns.size(); i++) {
0178           convey[i] = columns.get(i).get(index);
0179         }
0180         index++;
0181         return convey;
0182       }
0183 
0184       @Override
0185       public void remove() {
0186         throw new UnsupportedOperationException("remove");
0187       }
0188     };
0189   }
0190 
0191   public Object[] fill(int index, Object[] convey) {
0192     for (int i = 0; i < columns.size(); i++) {
0193       convey[i] = columns.get(i).get(index);
0194     }
0195     return convey;
0196   }
0197 }