Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.util.kvstore;
0019 
0020 import java.lang.reflect.Array;
0021 import java.util.Collection;
0022 import java.util.HashMap;
0023 import java.util.Map;
0024 import static java.nio.charset.StandardCharsets.UTF_8;
0025 
0026 import com.google.common.base.Preconditions;
0027 import org.iq80.leveldb.WriteBatch;
0028 
0029 /**
0030  * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected
0031  * via reflection, to make it cheaper to access it multiple times.
0032  *
0033  * <p>
0034  * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures
0035  * that iteration over indices is easy, and that updating values in the store is not overly
0036  * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping
0037  * lists of pointers, which would be more expensive to update at runtime.
0038  * </p>
0039  *
0040  * <p>
0041  * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full
0042  * key would be the concatenation of everything up to that point in the hierarchy, with each
0043  * component separated by a NULL byte.
0044  * </p>
0045  *
0046  * <pre>
0047  * +TYPE_NAME
0048  *   NATURAL_INDEX
0049  *     +NATURAL_KEY
0050  *     -
0051  *   -NATURAL_INDEX
0052  *   INDEX_NAME
0053  *     +INDEX_VALUE
0054  *       +NATURAL_KEY
0055  *     -INDEX_VALUE
0056  *     .INDEX_VALUE
0057  *       CHILD_INDEX_NAME
0058  *         +CHILD_INDEX_VALUE
0059  *           NATURAL_KEY_OR_DATA
0060  *         -
0061  *   -INDEX_NAME
0062  * </pre>
0063  *
0064  * <p>
0065  * Entity data (either the entity's natural key or a copy of the data) is stored in all keys
0066  * that end with "+<something>". A count of all objects that match a particular top-level index
0067  * value is kept at the end marker ("-<something>"). A count is also kept at the natural index's end
0068  * marker, to make it easy to retrieve the number of all elements of a particular type.
0069  * </p>
0070  *
0071  * <p>
0072  * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd
0073  * have these keys and values in the store for two instances, one with natural key "key1" and the
0074  * other "key2", both with value "yes" for "bar":
0075  * </p>
0076  *
0077  * <pre>
0078  * Foo __main__ +key1   [data for instance 1]
0079  * Foo __main__ +key2   [data for instance 2]
0080  * Foo __main__ -       [count of all Foo]
0081  * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
0082  * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
0083  * Foo bar +yes -       [count of all Foo with "bar=yes" ]
0084  * </pre>
0085  *
0086  * <p>
0087  * Note that all indexed values are prepended with "+", even if the index itself does not have an
0088  * explicit end marker. This allows for easily skipping to the end of an index by telling LevelDB
0089  * to seek to the "phantom" end marker of the index. Throughout the code and comments, this part
0090  * of the full LevelDB key is generally referred to as the "index value" of the entity.
0091  * </p>
0092  *
0093  * <p>
0094  * Child indices are stored after their parent index. In the example above, let's assume there is
0095  * a child index "child", whose parent is "bar". If both instances have value "no" for this field,
0096  * the data in the store would look something like the following:
0097  * </p>
0098  *
0099  * <pre>
0100  * ...
0101  * Foo bar +yes -
0102  * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on index type]
0103  * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on index type]
0104  * ...
0105  * </pre>
0106  */
0107 class LevelDBTypeInfo {
0108 
0109   static final byte[] END_MARKER = new byte[] { '-' };
0110   static final byte ENTRY_PREFIX = (byte) '+';
0111   static final byte KEY_SEPARATOR = 0x0;
0112   static byte TRUE = (byte) '1';
0113   static byte FALSE = (byte) '0';
0114 
0115   private static final byte SECONDARY_IDX_PREFIX = (byte) '.';
0116   private static final byte POSITIVE_MARKER = (byte) '=';
0117   private static final byte NEGATIVE_MARKER = (byte) '*';
0118   private static final byte[] HEX_BYTES = new byte[] {
0119     '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
0120   };
0121 
0122   private final LevelDB db;
0123   private final Class<?> type;
0124   private final Map<String, Index> indices;
0125   private final byte[] typePrefix;
0126 
0127   LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception {
0128     this.db = db;
0129     this.type = type;
0130     this.indices = new HashMap<>();
0131 
0132     KVTypeInfo ti = new KVTypeInfo(type);
0133 
0134     // First create the parent indices, then the child indices.
0135     ti.indices().forEach(idx -> {
0136       // In LevelDB, there is no parent index for the NUTURAL INDEX.
0137       if (idx.parent().isEmpty() || idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
0138         indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null));
0139       }
0140     });
0141     ti.indices().forEach(idx -> {
0142       if (!idx.parent().isEmpty() && !idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
0143         indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
0144           indices.get(idx.parent())));
0145       }
0146     });
0147 
0148     this.typePrefix = alias;
0149   }
0150 
0151   Class<?> type() {
0152     return type;
0153   }
0154 
0155   byte[] keyPrefix() {
0156     return typePrefix;
0157   }
0158 
0159   Index naturalIndex() {
0160     return index(KVIndex.NATURAL_INDEX_NAME);
0161   }
0162 
0163   Index index(String name) {
0164     Index i = indices.get(name);
0165     Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name,
0166       type.getName());
0167     return i;
0168   }
0169 
0170   Collection<Index> indices() {
0171     return indices.values();
0172   }
0173 
0174   byte[] buildKey(byte[]... components) {
0175     return buildKey(true, components);
0176   }
0177 
0178   byte[] buildKey(boolean addTypePrefix, byte[]... components) {
0179     int len = 0;
0180     if (addTypePrefix) {
0181       len += typePrefix.length + 1;
0182     }
0183     for (byte[] comp : components) {
0184       len += comp.length;
0185     }
0186     len += components.length - 1;
0187 
0188     byte[] dest = new byte[len];
0189     int written = 0;
0190 
0191     if (addTypePrefix) {
0192       System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length);
0193       dest[typePrefix.length] = KEY_SEPARATOR;
0194       written += typePrefix.length + 1;
0195     }
0196 
0197     for (byte[] comp : components) {
0198       System.arraycopy(comp, 0, dest, written, comp.length);
0199       written += comp.length;
0200       if (written < dest.length) {
0201         dest[written] = KEY_SEPARATOR;
0202         written++;
0203       }
0204     }
0205 
0206     return dest;
0207   }
0208 
0209   /**
0210    * Models a single index in LevelDB. See top-level class's javadoc for a description of how the
0211    * keys are generated.
0212    */
0213   class Index {
0214 
0215     private final boolean copy;
0216     private final boolean isNatural;
0217     private final byte[] name;
0218     private final KVTypeInfo.Accessor accessor;
0219     private final Index parent;
0220 
0221     private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) {
0222       byte[] name = self.value().getBytes(UTF_8);
0223       if (parent != null) {
0224         byte[] child = new byte[name.length + 1];
0225         child[0] = SECONDARY_IDX_PREFIX;
0226         System.arraycopy(name, 0, child, 1, name.length);
0227       }
0228 
0229       this.name = name;
0230       this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME);
0231       this.copy = isNatural || self.copy();
0232       this.accessor = accessor;
0233       this.parent = parent;
0234     }
0235 
0236     boolean isCopy() {
0237       return copy;
0238     }
0239 
0240     boolean isChild() {
0241       return parent != null;
0242     }
0243 
0244     Index parent() {
0245       return parent;
0246     }
0247 
0248     /**
0249      * Creates a key prefix for child indices of this index. This allows the prefix to be
0250      * calculated only once, avoiding redundant work when multiple child indices of the
0251      * same parent index exist.
0252      */
0253     byte[] childPrefix(Object value) {
0254       Preconditions.checkState(parent == null, "Not a parent index.");
0255       return buildKey(name, toParentKey(value));
0256     }
0257 
0258     /**
0259      * Gets the index value for a particular entity (which is the value of the field or method
0260      * tagged with the index annotation). This is used as part of the LevelDB key where the
0261      * entity (or its id) is stored.
0262      */
0263     Object getValue(Object entity) throws Exception {
0264       return accessor.get(entity);
0265     }
0266 
0267     private void checkParent(byte[] prefix) {
0268       if (prefix != null) {
0269         Preconditions.checkState(parent != null, "Parent prefix provided for parent index.");
0270       } else {
0271         Preconditions.checkState(parent == null, "Parent prefix missing for child index.");
0272       }
0273     }
0274 
0275     /** The prefix for all keys that belong to this index. */
0276     byte[] keyPrefix(byte[] prefix) {
0277       checkParent(prefix);
0278       return (parent != null) ? buildKey(false, prefix, name) : buildKey(name);
0279     }
0280 
0281     /**
0282      * The key where to start ascending iteration for entities whose value for the indexed field
0283      * match the given value.
0284      */
0285     byte[] start(byte[] prefix, Object value) {
0286       checkParent(prefix);
0287       return (parent != null) ? buildKey(false, prefix, name, toKey(value))
0288         : buildKey(name, toKey(value));
0289     }
0290 
0291     /** The key for the index's end marker. */
0292     byte[] end(byte[] prefix) {
0293       checkParent(prefix);
0294       return (parent != null) ? buildKey(false, prefix, name, END_MARKER)
0295         : buildKey(name, END_MARKER);
0296     }
0297 
0298     /** The key for the end marker for entries with the given value. */
0299     byte[] end(byte[] prefix, Object value) {
0300       checkParent(prefix);
0301       return (parent != null) ? buildKey(false, prefix, name, toKey(value), END_MARKER)
0302         : buildKey(name, toKey(value), END_MARKER);
0303     }
0304 
0305     /** The full key in the index that identifies the given entity. */
0306     byte[] entityKey(byte[] prefix, Object entity) throws Exception {
0307       Object indexValue = getValue(entity);
0308       Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
0309         name, type.getName());
0310       byte[] entityKey = start(prefix, indexValue);
0311       if (!isNatural) {
0312         entityKey = buildKey(false, entityKey, toKey(naturalIndex().getValue(entity)));
0313       }
0314       return entityKey;
0315     }
0316 
0317     private void updateCount(WriteBatch batch, byte[] key, long delta) {
0318       long updated = getCount(key) + delta;
0319       if (updated > 0) {
0320         batch.put(key, db.serializer.serialize(updated));
0321       } else {
0322         batch.delete(key);
0323       }
0324     }
0325 
0326     private void addOrRemove(
0327         WriteBatch batch,
0328         Object entity,
0329         Object existing,
0330         byte[] data,
0331         byte[] naturalKey,
0332         byte[] prefix) throws Exception {
0333       Object indexValue = getValue(entity);
0334       Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
0335         name, type.getName());
0336 
0337       byte[] entityKey = start(prefix, indexValue);
0338       if (!isNatural) {
0339         entityKey = buildKey(false, entityKey, naturalKey);
0340       }
0341 
0342       boolean needCountUpdate = (existing == null);
0343 
0344       // Check whether there's a need to update the index. The index needs to be updated in two
0345       // cases:
0346       //
0347       // - There is no existing value for the entity, so a new index value will be added.
0348       // - If there is a previously stored value for the entity, and the index value for the
0349       //   current index does not match the new value, the old entry needs to be deleted and
0350       //   the new one added.
0351       //
0352       // Natural indices don't need to be checked, because by definition both old and new entities
0353       // will have the same key. The put() call is all that's needed in that case.
0354       //
0355       // Also check whether we need to update the counts. If the indexed value is changing, we
0356       // need to decrement the count at the old index value, and the new indexed value count needs
0357       // to be incremented.
0358       if (existing != null && !isNatural) {
0359         byte[] oldPrefix = null;
0360         Object oldIndexedValue = getValue(existing);
0361         boolean removeExisting = !indexValue.equals(oldIndexedValue);
0362         if (!removeExisting && isChild()) {
0363           oldPrefix = parent().childPrefix(parent().getValue(existing));
0364           removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0;
0365         }
0366 
0367         if (removeExisting) {
0368           if (oldPrefix == null && isChild()) {
0369             oldPrefix = parent().childPrefix(parent().getValue(existing));
0370           }
0371 
0372           byte[] oldKey = entityKey(oldPrefix, existing);
0373           batch.delete(oldKey);
0374 
0375           // If the indexed value has changed, we need to update the counts at the old and new
0376           // end markers for the indexed value.
0377           if (!isChild()) {
0378             byte[] oldCountKey = end(null, oldIndexedValue);
0379             updateCount(batch, oldCountKey, -1L);
0380             needCountUpdate = true;
0381           }
0382         }
0383       }
0384 
0385       if (data != null) {
0386         byte[] stored = copy ? data : naturalKey;
0387         batch.put(entityKey, stored);
0388       } else {
0389         batch.delete(entityKey);
0390       }
0391 
0392       if (needCountUpdate && !isChild()) {
0393         long delta = data != null ? 1L : -1L;
0394         byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue);
0395         updateCount(batch, countKey, delta);
0396       }
0397     }
0398 
0399     /**
0400      * Add an entry to the index.
0401      *
0402      * @param batch Write batch with other related changes.
0403      * @param entity The entity being added to the index.
0404      * @param existing The entity being replaced in the index, or null.
0405      * @param data Serialized entity to store (when storing the entity, not a reference).
0406      * @param naturalKey The value's natural key (to avoid re-computing it for every index).
0407      * @param prefix The parent index prefix, if this is a child index.
0408      */
0409     void add(
0410         WriteBatch batch,
0411         Object entity,
0412         Object existing,
0413         byte[] data,
0414         byte[] naturalKey,
0415         byte[] prefix) throws Exception {
0416       addOrRemove(batch, entity, existing, data, naturalKey, prefix);
0417     }
0418 
0419     /**
0420      * Remove a value from the index.
0421      *
0422      * @param batch Write batch with other related changes.
0423      * @param entity The entity being removed, to identify the index entry to modify.
0424      * @param naturalKey The value's natural key (to avoid re-computing it for every index).
0425      * @param prefix The parent index prefix, if this is a child index.
0426      */
0427     void remove(
0428         WriteBatch batch,
0429         Object entity,
0430         byte[] naturalKey,
0431         byte[] prefix) throws Exception {
0432       addOrRemove(batch, entity, null, null, naturalKey, prefix);
0433     }
0434 
0435     long getCount(byte[] key) {
0436       byte[] data = db.db().get(key);
0437       return data != null ? db.serializer.deserializeLong(data) : 0;
0438     }
0439 
0440     byte[] toParentKey(Object value) {
0441       return toKey(value, SECONDARY_IDX_PREFIX);
0442     }
0443 
0444     byte[] toKey(Object value) {
0445       return toKey(value, ENTRY_PREFIX);
0446     }
0447 
0448     /**
0449      * Translates a value to be used as part of the store key.
0450      *
0451      * Integral numbers are encoded as a string in a way that preserves lexicographical
0452      * ordering. The string is prepended with a marker telling whether the number is negative
0453      * or positive ("*" for negative and "=" for positive are used since "-" and "+" have the
0454      * opposite of the desired order), and then the number is encoded into a hex string (so
0455      * it occupies twice the number of bytes as the original type).
0456      *
0457      * Arrays are encoded by encoding each element separately, separated by KEY_SEPARATOR.
0458      */
0459     byte[] toKey(Object value, byte prefix) {
0460       final byte[] result;
0461 
0462       if (value instanceof String) {
0463         byte[] str = ((String) value).getBytes(UTF_8);
0464         result = new byte[str.length + 1];
0465         result[0] = prefix;
0466         System.arraycopy(str, 0, result, 1, str.length);
0467       } else if (value instanceof Boolean) {
0468         result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE };
0469       } else if (value.getClass().isArray()) {
0470         int length = Array.getLength(value);
0471         byte[][] components = new byte[length][];
0472         for (int i = 0; i < length; i++) {
0473           components[i] = toKey(Array.get(value, i));
0474         }
0475         result = buildKey(false, components);
0476       } else {
0477         int bytes;
0478 
0479         if (value instanceof Integer) {
0480           bytes = Integer.SIZE;
0481         } else if (value instanceof Long) {
0482           bytes = Long.SIZE;
0483         } else if (value instanceof Short) {
0484           bytes = Short.SIZE;
0485         } else if (value instanceof Byte) {
0486           bytes = Byte.SIZE;
0487         } else {
0488           throw new IllegalArgumentException(String.format("Type %s not allowed as key.",
0489             value.getClass().getName()));
0490         }
0491 
0492         bytes = bytes / Byte.SIZE;
0493 
0494         byte[] key = new byte[bytes * 2 + 2];
0495         long longValue = ((Number) value).longValue();
0496         key[0] = prefix;
0497         key[1] = longValue >= 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
0498 
0499         for (int i = 0; i < key.length - 2; i++) {
0500           int masked = (int) ((longValue >>> (4 * i)) & 0xF);
0501           key[key.length - i - 1] = HEX_BYTES[masked];
0502         }
0503 
0504         result = key;
0505       }
0506 
0507       return result;
0508     }
0509 
0510   }
0511 
0512 }