0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.vectorized;
0019
0020 import io.netty.buffer.ArrowBuf;
0021 import org.apache.arrow.vector.*;
0022 import org.apache.arrow.vector.complex.*;
0023 import org.apache.arrow.vector.holders.NullableVarCharHolder;
0024
0025 import org.apache.spark.sql.util.ArrowUtils;
0026 import org.apache.spark.sql.types.*;
0027 import org.apache.spark.unsafe.types.UTF8String;
0028
0029
0030
0031
0032
0033 public final class ArrowColumnVector extends ColumnVector {
0034
0035 private final ArrowVectorAccessor accessor;
0036 private ArrowColumnVector[] childColumns;
0037
0038 @Override
0039 public boolean hasNull() {
0040 return accessor.getNullCount() > 0;
0041 }
0042
0043 @Override
0044 public int numNulls() {
0045 return accessor.getNullCount();
0046 }
0047
0048 @Override
0049 public void close() {
0050 if (childColumns != null) {
0051 for (int i = 0; i < childColumns.length; i++) {
0052 childColumns[i].close();
0053 childColumns[i] = null;
0054 }
0055 childColumns = null;
0056 }
0057 accessor.close();
0058 }
0059
0060 @Override
0061 public boolean isNullAt(int rowId) {
0062 return accessor.isNullAt(rowId);
0063 }
0064
0065 @Override
0066 public boolean getBoolean(int rowId) {
0067 return accessor.getBoolean(rowId);
0068 }
0069
0070 @Override
0071 public byte getByte(int rowId) {
0072 return accessor.getByte(rowId);
0073 }
0074
0075 @Override
0076 public short getShort(int rowId) {
0077 return accessor.getShort(rowId);
0078 }
0079
0080 @Override
0081 public int getInt(int rowId) {
0082 return accessor.getInt(rowId);
0083 }
0084
0085 @Override
0086 public long getLong(int rowId) {
0087 return accessor.getLong(rowId);
0088 }
0089
0090 @Override
0091 public float getFloat(int rowId) {
0092 return accessor.getFloat(rowId);
0093 }
0094
0095 @Override
0096 public double getDouble(int rowId) {
0097 return accessor.getDouble(rowId);
0098 }
0099
0100 @Override
0101 public Decimal getDecimal(int rowId, int precision, int scale) {
0102 if (isNullAt(rowId)) return null;
0103 return accessor.getDecimal(rowId, precision, scale);
0104 }
0105
0106 @Override
0107 public UTF8String getUTF8String(int rowId) {
0108 if (isNullAt(rowId)) return null;
0109 return accessor.getUTF8String(rowId);
0110 }
0111
0112 @Override
0113 public byte[] getBinary(int rowId) {
0114 if (isNullAt(rowId)) return null;
0115 return accessor.getBinary(rowId);
0116 }
0117
0118 @Override
0119 public ColumnarArray getArray(int rowId) {
0120 if (isNullAt(rowId)) return null;
0121 return accessor.getArray(rowId);
0122 }
0123
0124 @Override
0125 public ColumnarMap getMap(int rowId) {
0126 if (isNullAt(rowId)) return null;
0127 return accessor.getMap(rowId);
0128 }
0129
0130 @Override
0131 public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
0132
0133 public ArrowColumnVector(ValueVector vector) {
0134 super(ArrowUtils.fromArrowField(vector.getField()));
0135
0136 if (vector instanceof BitVector) {
0137 accessor = new BooleanAccessor((BitVector) vector);
0138 } else if (vector instanceof TinyIntVector) {
0139 accessor = new ByteAccessor((TinyIntVector) vector);
0140 } else if (vector instanceof SmallIntVector) {
0141 accessor = new ShortAccessor((SmallIntVector) vector);
0142 } else if (vector instanceof IntVector) {
0143 accessor = new IntAccessor((IntVector) vector);
0144 } else if (vector instanceof BigIntVector) {
0145 accessor = new LongAccessor((BigIntVector) vector);
0146 } else if (vector instanceof Float4Vector) {
0147 accessor = new FloatAccessor((Float4Vector) vector);
0148 } else if (vector instanceof Float8Vector) {
0149 accessor = new DoubleAccessor((Float8Vector) vector);
0150 } else if (vector instanceof DecimalVector) {
0151 accessor = new DecimalAccessor((DecimalVector) vector);
0152 } else if (vector instanceof VarCharVector) {
0153 accessor = new StringAccessor((VarCharVector) vector);
0154 } else if (vector instanceof VarBinaryVector) {
0155 accessor = new BinaryAccessor((VarBinaryVector) vector);
0156 } else if (vector instanceof DateDayVector) {
0157 accessor = new DateAccessor((DateDayVector) vector);
0158 } else if (vector instanceof TimeStampMicroTZVector) {
0159 accessor = new TimestampAccessor((TimeStampMicroTZVector) vector);
0160 } else if (vector instanceof MapVector) {
0161 MapVector mapVector = (MapVector) vector;
0162 accessor = new MapAccessor(mapVector);
0163 } else if (vector instanceof ListVector) {
0164 ListVector listVector = (ListVector) vector;
0165 accessor = new ArrayAccessor(listVector);
0166 } else if (vector instanceof StructVector) {
0167 StructVector structVector = (StructVector) vector;
0168 accessor = new StructAccessor(structVector);
0169
0170 childColumns = new ArrowColumnVector[structVector.size()];
0171 for (int i = 0; i < childColumns.length; ++i) {
0172 childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i));
0173 }
0174 } else {
0175 throw new UnsupportedOperationException();
0176 }
0177 }
0178
0179 private abstract static class ArrowVectorAccessor {
0180
0181 private final ValueVector vector;
0182
0183 ArrowVectorAccessor(ValueVector vector) {
0184 this.vector = vector;
0185 }
0186
0187
0188 boolean isNullAt(int rowId) {
0189 return vector.isNull(rowId);
0190 }
0191
0192 final int getNullCount() {
0193 return vector.getNullCount();
0194 }
0195
0196 final void close() {
0197 vector.close();
0198 }
0199
0200 boolean getBoolean(int rowId) {
0201 throw new UnsupportedOperationException();
0202 }
0203
0204 byte getByte(int rowId) {
0205 throw new UnsupportedOperationException();
0206 }
0207
0208 short getShort(int rowId) {
0209 throw new UnsupportedOperationException();
0210 }
0211
0212 int getInt(int rowId) {
0213 throw new UnsupportedOperationException();
0214 }
0215
0216 long getLong(int rowId) {
0217 throw new UnsupportedOperationException();
0218 }
0219
0220 float getFloat(int rowId) {
0221 throw new UnsupportedOperationException();
0222 }
0223
0224 double getDouble(int rowId) {
0225 throw new UnsupportedOperationException();
0226 }
0227
0228 Decimal getDecimal(int rowId, int precision, int scale) {
0229 throw new UnsupportedOperationException();
0230 }
0231
0232 UTF8String getUTF8String(int rowId) {
0233 throw new UnsupportedOperationException();
0234 }
0235
0236 byte[] getBinary(int rowId) {
0237 throw new UnsupportedOperationException();
0238 }
0239
0240 ColumnarArray getArray(int rowId) {
0241 throw new UnsupportedOperationException();
0242 }
0243
0244 ColumnarMap getMap(int rowId) {
0245 throw new UnsupportedOperationException();
0246 }
0247 }
0248
0249 private static class BooleanAccessor extends ArrowVectorAccessor {
0250
0251 private final BitVector accessor;
0252
0253 BooleanAccessor(BitVector vector) {
0254 super(vector);
0255 this.accessor = vector;
0256 }
0257
0258 @Override
0259 final boolean getBoolean(int rowId) {
0260 return accessor.get(rowId) == 1;
0261 }
0262 }
0263
0264 private static class ByteAccessor extends ArrowVectorAccessor {
0265
0266 private final TinyIntVector accessor;
0267
0268 ByteAccessor(TinyIntVector vector) {
0269 super(vector);
0270 this.accessor = vector;
0271 }
0272
0273 @Override
0274 final byte getByte(int rowId) {
0275 return accessor.get(rowId);
0276 }
0277 }
0278
0279 private static class ShortAccessor extends ArrowVectorAccessor {
0280
0281 private final SmallIntVector accessor;
0282
0283 ShortAccessor(SmallIntVector vector) {
0284 super(vector);
0285 this.accessor = vector;
0286 }
0287
0288 @Override
0289 final short getShort(int rowId) {
0290 return accessor.get(rowId);
0291 }
0292 }
0293
0294 private static class IntAccessor extends ArrowVectorAccessor {
0295
0296 private final IntVector accessor;
0297
0298 IntAccessor(IntVector vector) {
0299 super(vector);
0300 this.accessor = vector;
0301 }
0302
0303 @Override
0304 final int getInt(int rowId) {
0305 return accessor.get(rowId);
0306 }
0307 }
0308
0309 private static class LongAccessor extends ArrowVectorAccessor {
0310
0311 private final BigIntVector accessor;
0312
0313 LongAccessor(BigIntVector vector) {
0314 super(vector);
0315 this.accessor = vector;
0316 }
0317
0318 @Override
0319 final long getLong(int rowId) {
0320 return accessor.get(rowId);
0321 }
0322 }
0323
0324 private static class FloatAccessor extends ArrowVectorAccessor {
0325
0326 private final Float4Vector accessor;
0327
0328 FloatAccessor(Float4Vector vector) {
0329 super(vector);
0330 this.accessor = vector;
0331 }
0332
0333 @Override
0334 final float getFloat(int rowId) {
0335 return accessor.get(rowId);
0336 }
0337 }
0338
0339 private static class DoubleAccessor extends ArrowVectorAccessor {
0340
0341 private final Float8Vector accessor;
0342
0343 DoubleAccessor(Float8Vector vector) {
0344 super(vector);
0345 this.accessor = vector;
0346 }
0347
0348 @Override
0349 final double getDouble(int rowId) {
0350 return accessor.get(rowId);
0351 }
0352 }
0353
0354 private static class DecimalAccessor extends ArrowVectorAccessor {
0355
0356 private final DecimalVector accessor;
0357
0358 DecimalAccessor(DecimalVector vector) {
0359 super(vector);
0360 this.accessor = vector;
0361 }
0362
0363 @Override
0364 final Decimal getDecimal(int rowId, int precision, int scale) {
0365 if (isNullAt(rowId)) return null;
0366 return Decimal.apply(accessor.getObject(rowId), precision, scale);
0367 }
0368 }
0369
0370 private static class StringAccessor extends ArrowVectorAccessor {
0371
0372 private final VarCharVector accessor;
0373 private final NullableVarCharHolder stringResult = new NullableVarCharHolder();
0374
0375 StringAccessor(VarCharVector vector) {
0376 super(vector);
0377 this.accessor = vector;
0378 }
0379
0380 @Override
0381 final UTF8String getUTF8String(int rowId) {
0382 accessor.get(rowId, stringResult);
0383 if (stringResult.isSet == 0) {
0384 return null;
0385 } else {
0386 return UTF8String.fromAddress(null,
0387 stringResult.buffer.memoryAddress() + stringResult.start,
0388 stringResult.end - stringResult.start);
0389 }
0390 }
0391 }
0392
0393 private static class BinaryAccessor extends ArrowVectorAccessor {
0394
0395 private final VarBinaryVector accessor;
0396
0397 BinaryAccessor(VarBinaryVector vector) {
0398 super(vector);
0399 this.accessor = vector;
0400 }
0401
0402 @Override
0403 final byte[] getBinary(int rowId) {
0404 return accessor.getObject(rowId);
0405 }
0406 }
0407
0408 private static class DateAccessor extends ArrowVectorAccessor {
0409
0410 private final DateDayVector accessor;
0411
0412 DateAccessor(DateDayVector vector) {
0413 super(vector);
0414 this.accessor = vector;
0415 }
0416
0417 @Override
0418 final int getInt(int rowId) {
0419 return accessor.get(rowId);
0420 }
0421 }
0422
0423 private static class TimestampAccessor extends ArrowVectorAccessor {
0424
0425 private final TimeStampMicroTZVector accessor;
0426
0427 TimestampAccessor(TimeStampMicroTZVector vector) {
0428 super(vector);
0429 this.accessor = vector;
0430 }
0431
0432 @Override
0433 final long getLong(int rowId) {
0434 return accessor.get(rowId);
0435 }
0436 }
0437
0438 private static class ArrayAccessor extends ArrowVectorAccessor {
0439
0440 private final ListVector accessor;
0441 private final ArrowColumnVector arrayData;
0442
0443 ArrayAccessor(ListVector vector) {
0444 super(vector);
0445 this.accessor = vector;
0446 this.arrayData = new ArrowColumnVector(vector.getDataVector());
0447 }
0448
0449 @Override
0450 final boolean isNullAt(int rowId) {
0451
0452 if (accessor.getValueCount() > 0 && accessor.getValidityBuffer().capacity() == 0) {
0453 return false;
0454 } else {
0455 return super.isNullAt(rowId);
0456 }
0457 }
0458
0459 @Override
0460 final ColumnarArray getArray(int rowId) {
0461 ArrowBuf offsets = accessor.getOffsetBuffer();
0462 int index = rowId * ListVector.OFFSET_WIDTH;
0463 int start = offsets.getInt(index);
0464 int end = offsets.getInt(index + ListVector.OFFSET_WIDTH);
0465 return new ColumnarArray(arrayData, start, end - start);
0466 }
0467 }
0468
0469
0470
0471
0472
0473
0474
0475
0476
0477 private static class StructAccessor extends ArrowVectorAccessor {
0478
0479 StructAccessor(StructVector vector) {
0480 super(vector);
0481 }
0482 }
0483
0484 private static class MapAccessor extends ArrowVectorAccessor {
0485 private final MapVector accessor;
0486 private final ArrowColumnVector keys;
0487 private final ArrowColumnVector values;
0488
0489 MapAccessor(MapVector vector) {
0490 super(vector);
0491 this.accessor = vector;
0492 StructVector entries = (StructVector) vector.getDataVector();
0493 this.keys = new ArrowColumnVector(entries.getChild(MapVector.KEY_NAME));
0494 this.values = new ArrowColumnVector(entries.getChild(MapVector.VALUE_NAME));
0495 }
0496
0497 @Override
0498 final ColumnarMap getMap(int rowId) {
0499 int index = rowId * MapVector.OFFSET_WIDTH;
0500 int offset = accessor.getOffsetBuffer().getInt(index);
0501 int length = accessor.getInnerValueCountAt(rowId);
0502 return new ColumnarMap(keys, values, offset, length);
0503 }
0504 }
0505 }