0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.util.kvstore;
0019
0020 import java.lang.reflect.Array;
0021 import java.util.Collection;
0022 import java.util.HashMap;
0023 import java.util.Map;
0024 import static java.nio.charset.StandardCharsets.UTF_8;
0025
0026 import com.google.common.base.Preconditions;
0027 import org.iq80.leveldb.WriteBatch;
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107 class LevelDBTypeInfo {
0108
0109 static final byte[] END_MARKER = new byte[] { '-' };
0110 static final byte ENTRY_PREFIX = (byte) '+';
0111 static final byte KEY_SEPARATOR = 0x0;
0112 static byte TRUE = (byte) '1';
0113 static byte FALSE = (byte) '0';
0114
0115 private static final byte SECONDARY_IDX_PREFIX = (byte) '.';
0116 private static final byte POSITIVE_MARKER = (byte) '=';
0117 private static final byte NEGATIVE_MARKER = (byte) '*';
0118 private static final byte[] HEX_BYTES = new byte[] {
0119 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
0120 };
0121
0122 private final LevelDB db;
0123 private final Class<?> type;
0124 private final Map<String, Index> indices;
0125 private final byte[] typePrefix;
0126
0127 LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception {
0128 this.db = db;
0129 this.type = type;
0130 this.indices = new HashMap<>();
0131
0132 KVTypeInfo ti = new KVTypeInfo(type);
0133
0134
0135 ti.indices().forEach(idx -> {
0136
0137 if (idx.parent().isEmpty() || idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
0138 indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null));
0139 }
0140 });
0141 ti.indices().forEach(idx -> {
0142 if (!idx.parent().isEmpty() && !idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
0143 indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
0144 indices.get(idx.parent())));
0145 }
0146 });
0147
0148 this.typePrefix = alias;
0149 }
0150
0151 Class<?> type() {
0152 return type;
0153 }
0154
0155 byte[] keyPrefix() {
0156 return typePrefix;
0157 }
0158
0159 Index naturalIndex() {
0160 return index(KVIndex.NATURAL_INDEX_NAME);
0161 }
0162
0163 Index index(String name) {
0164 Index i = indices.get(name);
0165 Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name,
0166 type.getName());
0167 return i;
0168 }
0169
0170 Collection<Index> indices() {
0171 return indices.values();
0172 }
0173
0174 byte[] buildKey(byte[]... components) {
0175 return buildKey(true, components);
0176 }
0177
0178 byte[] buildKey(boolean addTypePrefix, byte[]... components) {
0179 int len = 0;
0180 if (addTypePrefix) {
0181 len += typePrefix.length + 1;
0182 }
0183 for (byte[] comp : components) {
0184 len += comp.length;
0185 }
0186 len += components.length - 1;
0187
0188 byte[] dest = new byte[len];
0189 int written = 0;
0190
0191 if (addTypePrefix) {
0192 System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length);
0193 dest[typePrefix.length] = KEY_SEPARATOR;
0194 written += typePrefix.length + 1;
0195 }
0196
0197 for (byte[] comp : components) {
0198 System.arraycopy(comp, 0, dest, written, comp.length);
0199 written += comp.length;
0200 if (written < dest.length) {
0201 dest[written] = KEY_SEPARATOR;
0202 written++;
0203 }
0204 }
0205
0206 return dest;
0207 }
0208
0209
0210
0211
0212
0213 class Index {
0214
0215 private final boolean copy;
0216 private final boolean isNatural;
0217 private final byte[] name;
0218 private final KVTypeInfo.Accessor accessor;
0219 private final Index parent;
0220
0221 private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) {
0222 byte[] name = self.value().getBytes(UTF_8);
0223 if (parent != null) {
0224 byte[] child = new byte[name.length + 1];
0225 child[0] = SECONDARY_IDX_PREFIX;
0226 System.arraycopy(name, 0, child, 1, name.length);
0227 }
0228
0229 this.name = name;
0230 this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME);
0231 this.copy = isNatural || self.copy();
0232 this.accessor = accessor;
0233 this.parent = parent;
0234 }
0235
0236 boolean isCopy() {
0237 return copy;
0238 }
0239
0240 boolean isChild() {
0241 return parent != null;
0242 }
0243
0244 Index parent() {
0245 return parent;
0246 }
0247
0248
0249
0250
0251
0252
0253 byte[] childPrefix(Object value) {
0254 Preconditions.checkState(parent == null, "Not a parent index.");
0255 return buildKey(name, toParentKey(value));
0256 }
0257
0258
0259
0260
0261
0262
0263 Object getValue(Object entity) throws Exception {
0264 return accessor.get(entity);
0265 }
0266
0267 private void checkParent(byte[] prefix) {
0268 if (prefix != null) {
0269 Preconditions.checkState(parent != null, "Parent prefix provided for parent index.");
0270 } else {
0271 Preconditions.checkState(parent == null, "Parent prefix missing for child index.");
0272 }
0273 }
0274
0275
0276 byte[] keyPrefix(byte[] prefix) {
0277 checkParent(prefix);
0278 return (parent != null) ? buildKey(false, prefix, name) : buildKey(name);
0279 }
0280
0281
0282
0283
0284
0285 byte[] start(byte[] prefix, Object value) {
0286 checkParent(prefix);
0287 return (parent != null) ? buildKey(false, prefix, name, toKey(value))
0288 : buildKey(name, toKey(value));
0289 }
0290
0291
0292 byte[] end(byte[] prefix) {
0293 checkParent(prefix);
0294 return (parent != null) ? buildKey(false, prefix, name, END_MARKER)
0295 : buildKey(name, END_MARKER);
0296 }
0297
0298
0299 byte[] end(byte[] prefix, Object value) {
0300 checkParent(prefix);
0301 return (parent != null) ? buildKey(false, prefix, name, toKey(value), END_MARKER)
0302 : buildKey(name, toKey(value), END_MARKER);
0303 }
0304
0305
0306 byte[] entityKey(byte[] prefix, Object entity) throws Exception {
0307 Object indexValue = getValue(entity);
0308 Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
0309 name, type.getName());
0310 byte[] entityKey = start(prefix, indexValue);
0311 if (!isNatural) {
0312 entityKey = buildKey(false, entityKey, toKey(naturalIndex().getValue(entity)));
0313 }
0314 return entityKey;
0315 }
0316
0317 private void updateCount(WriteBatch batch, byte[] key, long delta) {
0318 long updated = getCount(key) + delta;
0319 if (updated > 0) {
0320 batch.put(key, db.serializer.serialize(updated));
0321 } else {
0322 batch.delete(key);
0323 }
0324 }
0325
0326 private void addOrRemove(
0327 WriteBatch batch,
0328 Object entity,
0329 Object existing,
0330 byte[] data,
0331 byte[] naturalKey,
0332 byte[] prefix) throws Exception {
0333 Object indexValue = getValue(entity);
0334 Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
0335 name, type.getName());
0336
0337 byte[] entityKey = start(prefix, indexValue);
0338 if (!isNatural) {
0339 entityKey = buildKey(false, entityKey, naturalKey);
0340 }
0341
0342 boolean needCountUpdate = (existing == null);
0343
0344
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354
0355
0356
0357
0358 if (existing != null && !isNatural) {
0359 byte[] oldPrefix = null;
0360 Object oldIndexedValue = getValue(existing);
0361 boolean removeExisting = !indexValue.equals(oldIndexedValue);
0362 if (!removeExisting && isChild()) {
0363 oldPrefix = parent().childPrefix(parent().getValue(existing));
0364 removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0;
0365 }
0366
0367 if (removeExisting) {
0368 if (oldPrefix == null && isChild()) {
0369 oldPrefix = parent().childPrefix(parent().getValue(existing));
0370 }
0371
0372 byte[] oldKey = entityKey(oldPrefix, existing);
0373 batch.delete(oldKey);
0374
0375
0376
0377 if (!isChild()) {
0378 byte[] oldCountKey = end(null, oldIndexedValue);
0379 updateCount(batch, oldCountKey, -1L);
0380 needCountUpdate = true;
0381 }
0382 }
0383 }
0384
0385 if (data != null) {
0386 byte[] stored = copy ? data : naturalKey;
0387 batch.put(entityKey, stored);
0388 } else {
0389 batch.delete(entityKey);
0390 }
0391
0392 if (needCountUpdate && !isChild()) {
0393 long delta = data != null ? 1L : -1L;
0394 byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue);
0395 updateCount(batch, countKey, delta);
0396 }
0397 }
0398
0399
0400
0401
0402
0403
0404
0405
0406
0407
0408
0409 void add(
0410 WriteBatch batch,
0411 Object entity,
0412 Object existing,
0413 byte[] data,
0414 byte[] naturalKey,
0415 byte[] prefix) throws Exception {
0416 addOrRemove(batch, entity, existing, data, naturalKey, prefix);
0417 }
0418
0419
0420
0421
0422
0423
0424
0425
0426
0427 void remove(
0428 WriteBatch batch,
0429 Object entity,
0430 byte[] naturalKey,
0431 byte[] prefix) throws Exception {
0432 addOrRemove(batch, entity, null, null, naturalKey, prefix);
0433 }
0434
0435 long getCount(byte[] key) {
0436 byte[] data = db.db().get(key);
0437 return data != null ? db.serializer.deserializeLong(data) : 0;
0438 }
0439
0440 byte[] toParentKey(Object value) {
0441 return toKey(value, SECONDARY_IDX_PREFIX);
0442 }
0443
0444 byte[] toKey(Object value) {
0445 return toKey(value, ENTRY_PREFIX);
0446 }
0447
0448
0449
0450
0451
0452
0453
0454
0455
0456
0457
0458
0459 byte[] toKey(Object value, byte prefix) {
0460 final byte[] result;
0461
0462 if (value instanceof String) {
0463 byte[] str = ((String) value).getBytes(UTF_8);
0464 result = new byte[str.length + 1];
0465 result[0] = prefix;
0466 System.arraycopy(str, 0, result, 1, str.length);
0467 } else if (value instanceof Boolean) {
0468 result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE };
0469 } else if (value.getClass().isArray()) {
0470 int length = Array.getLength(value);
0471 byte[][] components = new byte[length][];
0472 for (int i = 0; i < length; i++) {
0473 components[i] = toKey(Array.get(value, i));
0474 }
0475 result = buildKey(false, components);
0476 } else {
0477 int bytes;
0478
0479 if (value instanceof Integer) {
0480 bytes = Integer.SIZE;
0481 } else if (value instanceof Long) {
0482 bytes = Long.SIZE;
0483 } else if (value instanceof Short) {
0484 bytes = Short.SIZE;
0485 } else if (value instanceof Byte) {
0486 bytes = Byte.SIZE;
0487 } else {
0488 throw new IllegalArgumentException(String.format("Type %s not allowed as key.",
0489 value.getClass().getName()));
0490 }
0491
0492 bytes = bytes / Byte.SIZE;
0493
0494 byte[] key = new byte[bytes * 2 + 2];
0495 long longValue = ((Number) value).longValue();
0496 key[0] = prefix;
0497 key[1] = longValue >= 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
0498
0499 for (int i = 0; i < key.length - 2; i++) {
0500 int masked = (int) ((longValue >>> (4 * i)) & 0xF);
0501 key[key.length - i - 1] = HEX_BYTES[masked];
0502 }
0503
0504 result = key;
0505 }
0506
0507 return result;
0508 }
0509
0510 }
0511
0512 }