0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli;
0020
0021 import java.io.ByteArrayInputStream;
0022 import java.util.ArrayList;
0023 import java.util.Iterator;
0024 import java.util.List;
0025
0026 import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer;
0027 import org.apache.hive.service.rpc.thrift.TColumn;
0028 import org.apache.hive.service.rpc.thrift.TRow;
0029 import org.apache.hive.service.rpc.thrift.TRowSet;
0030 import org.apache.thrift.TException;
0031 import org.apache.thrift.protocol.TCompactProtocol;
0032 import org.apache.thrift.protocol.TProtocol;
0033 import org.apache.thrift.transport.TIOStreamTransport;
0034 import org.slf4j.Logger;
0035 import org.slf4j.LoggerFactory;
0036
0037
0038
0039
0040 public class ColumnBasedSet implements RowSet {
0041
0042 private long startOffset;
0043
0044 private final TypeDescriptor[] descriptors;
0045 private final List<ColumnBuffer> columns;
0046 private byte[] blob;
0047 private boolean isBlobBased = false;
0048 public static final Logger LOG = LoggerFactory.getLogger(ColumnBasedSet.class);
0049
0050 public ColumnBasedSet(TableSchema schema) {
0051 descriptors = schema.toTypeDescriptors();
0052 columns = new ArrayList<ColumnBuffer>();
0053 for (ColumnDescriptor colDesc : schema.getColumnDescriptors()) {
0054 columns.add(new ColumnBuffer(colDesc.getType()));
0055 }
0056 }
0057
0058 public ColumnBasedSet(TRowSet tRowSet) throws TException {
0059 descriptors = null;
0060 columns = new ArrayList<ColumnBuffer>();
0061
0062 if (tRowSet.isSetBinaryColumns()) {
0063 TProtocol protocol =
0064 new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream(
0065 tRowSet.getBinaryColumns())));
0066
0067 for (int i = 0; i < tRowSet.getColumnCount(); i++) {
0068 TColumn tvalue = new TColumn();
0069 try {
0070 tvalue.read(protocol);
0071 } catch (TException e) {
0072 LOG.error(e.getMessage(), e);
0073 throw new TException("Error reading column value from the row set blob", e);
0074 }
0075 columns.add(new ColumnBuffer(tvalue));
0076 }
0077 }
0078 else {
0079 if (tRowSet.getColumns() != null) {
0080 for (TColumn tvalue : tRowSet.getColumns()) {
0081 columns.add(new ColumnBuffer(tvalue));
0082 }
0083 }
0084 }
0085 startOffset = tRowSet.getStartRowOffset();
0086 }
0087
0088 private ColumnBasedSet(TypeDescriptor[] descriptors, List<ColumnBuffer> columns, long startOffset) {
0089 this.descriptors = descriptors;
0090 this.columns = columns;
0091 this.startOffset = startOffset;
0092 }
0093
0094 public ColumnBasedSet(TableSchema schema, boolean isBlobBased) {
0095 this(schema);
0096 this.isBlobBased = isBlobBased;
0097 }
0098
0099 @Override
0100 public ColumnBasedSet addRow(Object[] fields) {
0101 if (isBlobBased) {
0102 this.blob = (byte[]) fields[0];
0103 } else {
0104 for (int i = 0; i < fields.length; i++) {
0105 TypeDescriptor descriptor = descriptors[i];
0106 columns.get(i).addValue(descriptor.getType(), fields[i]);
0107 }
0108 }
0109 return this;
0110 }
0111
0112 public List<ColumnBuffer> getColumns() {
0113 return columns;
0114 }
0115
0116 @Override
0117 public int numColumns() {
0118 return columns.size();
0119 }
0120
0121 @Override
0122 public int numRows() {
0123 return columns.isEmpty() ? 0 : columns.get(0).size();
0124 }
0125
0126 @Override
0127 public ColumnBasedSet extractSubset(int maxRows) {
0128 int numRows = Math.min(numRows(), maxRows);
0129
0130 List<ColumnBuffer> subset = new ArrayList<ColumnBuffer>();
0131 for (int i = 0; i < columns.size(); i++) {
0132 subset.add(columns.get(i).extractSubset(numRows));
0133 }
0134 ColumnBasedSet result = new ColumnBasedSet(descriptors, subset, startOffset);
0135 startOffset += numRows;
0136 return result;
0137 }
0138
0139 @Override
0140 public long getStartOffset() {
0141 return startOffset;
0142 }
0143
0144 @Override
0145 public void setStartOffset(long startOffset) {
0146 this.startOffset = startOffset;
0147 }
0148
0149 public TRowSet toTRowSet() {
0150 TRowSet tRowSet = new TRowSet(startOffset, new ArrayList<TRow>());
0151 if (isBlobBased) {
0152 tRowSet.setColumns(null);
0153 tRowSet.setBinaryColumns(blob);
0154 tRowSet.setColumnCount(numColumns());
0155 } else {
0156 for (int i = 0; i < columns.size(); i++) {
0157 tRowSet.addToColumns(columns.get(i).toTColumn());
0158 }
0159 }
0160 return tRowSet;
0161 }
0162
0163 @Override
0164 public Iterator<Object[]> iterator() {
0165 return new Iterator<Object[]>() {
0166
0167 private int index;
0168 private final Object[] convey = new Object[numColumns()];
0169
0170 @Override
0171 public boolean hasNext() {
0172 return index < numRows();
0173 }
0174
0175 @Override
0176 public Object[] next() {
0177 for (int i = 0; i < columns.size(); i++) {
0178 convey[i] = columns.get(i).get(index);
0179 }
0180 index++;
0181 return convey;
0182 }
0183
0184 @Override
0185 public void remove() {
0186 throw new UnsupportedOperationException("remove");
0187 }
0188 };
0189 }
0190
0191 public Object[] fill(int index, Object[] convey) {
0192 for (int i = 0; i < columns.size(); i++) {
0193 convey[i] = columns.get(i).get(index);
0194 }
0195 return convey;
0196 }
0197 }