Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.util.kvstore;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.util.*;
0023 import java.util.concurrent.ConcurrentHashMap;
0024 import java.util.concurrent.ConcurrentMap;
0025 import java.util.concurrent.atomic.AtomicReference;
0026 import static java.nio.charset.StandardCharsets.UTF_8;
0027 
0028 import com.google.common.annotations.VisibleForTesting;
0029 import com.google.common.base.Preconditions;
0030 import com.google.common.base.Throwables;
0031 import org.fusesource.leveldbjni.JniDBFactory;
0032 import org.iq80.leveldb.DB;
0033 import org.iq80.leveldb.Options;
0034 import org.iq80.leveldb.WriteBatch;
0035 
0036 import org.apache.spark.annotation.Private;
0037 
0038 /**
0039  * Implementation of KVStore that uses LevelDB as the underlying data store.
0040  */
0041 @Private
0042 public class LevelDB implements KVStore {
0043 
0044   @VisibleForTesting
0045   static final long STORE_VERSION = 1L;
0046 
0047   @VisibleForTesting
0048   static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
0049 
0050   /** DB key where app metadata is stored. */
0051   private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
0052 
0053   /** DB key where type aliases are stored. */
0054   private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
0055 
0056   final AtomicReference<DB> _db;
0057   final KVStoreSerializer serializer;
0058 
0059   /**
0060    * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two
0061    * purposes: make the keys stored on disk shorter, and spread out the keys, since class names
0062    * will often have a long, redundant prefix (think "org.apache.spark.").
0063    */
0064   private final ConcurrentMap<String, byte[]> typeAliases;
0065   private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
0066 
0067   public LevelDB(File path) throws Exception {
0068     this(path, new KVStoreSerializer());
0069   }
0070 
0071   public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
0072     this.serializer = serializer;
0073     this.types = new ConcurrentHashMap<>();
0074 
0075     Options options = new Options();
0076     options.createIfMissing(true);
0077     this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
0078 
0079     byte[] versionData = db().get(STORE_VERSION_KEY);
0080     if (versionData != null) {
0081       long version = serializer.deserializeLong(versionData);
0082       if (version != STORE_VERSION) {
0083         close();
0084         throw new UnsupportedStoreVersionException();
0085       }
0086     } else {
0087       db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
0088     }
0089 
0090     Map<String, byte[]> aliases;
0091     try {
0092       aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
0093     } catch (NoSuchElementException e) {
0094       aliases = new HashMap<>();
0095     }
0096     typeAliases = new ConcurrentHashMap<>(aliases);
0097   }
0098 
0099   @Override
0100   public <T> T getMetadata(Class<T> klass) throws Exception {
0101     try {
0102       return get(METADATA_KEY, klass);
0103     } catch (NoSuchElementException nsee) {
0104       return null;
0105     }
0106   }
0107 
0108   @Override
0109   public void setMetadata(Object value) throws Exception {
0110     if (value != null) {
0111       put(METADATA_KEY, value);
0112     } else {
0113       db().delete(METADATA_KEY);
0114     }
0115   }
0116 
0117   <T> T get(byte[] key, Class<T> klass) throws Exception {
0118     byte[] data = db().get(key);
0119     if (data == null) {
0120       throw new NoSuchElementException(new String(key, UTF_8));
0121     }
0122     return serializer.deserialize(data, klass);
0123   }
0124 
0125   private void put(byte[] key, Object value) throws Exception {
0126     Preconditions.checkArgument(value != null, "Null values are not allowed.");
0127     db().put(key, serializer.serialize(value));
0128   }
0129 
0130   @Override
0131   public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
0132     Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
0133     byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
0134     return get(key, klass);
0135   }
0136 
0137   @Override
0138   public void write(Object value) throws Exception {
0139     Preconditions.checkArgument(value != null, "Null values are not allowed.");
0140     LevelDBTypeInfo ti = getTypeInfo(value.getClass());
0141 
0142     try (WriteBatch batch = db().createWriteBatch()) {
0143       byte[] data = serializer.serialize(value);
0144       synchronized (ti) {
0145         Object existing;
0146         try {
0147           existing = get(ti.naturalIndex().entityKey(null, value), value.getClass());
0148         } catch (NoSuchElementException e) {
0149           existing = null;
0150         }
0151 
0152         PrefixCache cache = new PrefixCache(value);
0153         byte[] naturalKey = ti.naturalIndex().toKey(ti.naturalIndex().getValue(value));
0154         for (LevelDBTypeInfo.Index idx : ti.indices()) {
0155           byte[] prefix = cache.getPrefix(idx);
0156           idx.add(batch, value, existing, data, naturalKey, prefix);
0157         }
0158         db().write(batch);
0159       }
0160     }
0161   }
0162 
0163   @Override
0164   public void delete(Class<?> type, Object naturalKey) throws Exception {
0165     Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
0166     try (WriteBatch batch = db().createWriteBatch()) {
0167       LevelDBTypeInfo ti = getTypeInfo(type);
0168       byte[] key = ti.naturalIndex().start(null, naturalKey);
0169       synchronized (ti) {
0170         byte[] data = db().get(key);
0171         if (data != null) {
0172           Object existing = serializer.deserialize(data, type);
0173           PrefixCache cache = new PrefixCache(existing);
0174           byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
0175           for (LevelDBTypeInfo.Index idx : ti.indices()) {
0176             idx.remove(batch, existing, keyBytes, cache.getPrefix(idx));
0177           }
0178           db().write(batch);
0179         }
0180       }
0181     } catch (NoSuchElementException nse) {
0182       // Ignore.
0183     }
0184   }
0185 
0186   @Override
0187   public <T> KVStoreView<T> view(Class<T> type) throws Exception {
0188     return new KVStoreView<T>() {
0189       @Override
0190       public Iterator<T> iterator() {
0191         try {
0192           return new LevelDBIterator<>(type, LevelDB.this, this);
0193         } catch (Exception e) {
0194           throw Throwables.propagate(e);
0195         }
0196       }
0197     };
0198   }
0199 
0200   @Override
0201   public <T> boolean removeAllByIndexValues(
0202       Class<T> klass,
0203       String index,
0204       Collection<?> indexValues) throws Exception {
0205     LevelDBTypeInfo.Index naturalIndex = getTypeInfo(klass).naturalIndex();
0206     boolean removed = false;
0207     KVStoreView<T> view = view(klass).index(index);
0208 
0209     for (Object indexValue : indexValues) {
0210       for (T value: view.first(indexValue).last(indexValue)) {
0211         Object itemKey = naturalIndex.getValue(value);
0212         delete(klass, itemKey);
0213         removed = true;
0214       }
0215     }
0216 
0217     return removed;
0218   }
0219 
0220   @Override
0221   public long count(Class<?> type) throws Exception {
0222     LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
0223     return idx.getCount(idx.end(null));
0224   }
0225 
0226   @Override
0227   public long count(Class<?> type, String index, Object indexedValue) throws Exception {
0228     LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index);
0229     return idx.getCount(idx.end(null, indexedValue));
0230   }
0231 
0232   @Override
0233   public void close() throws IOException {
0234     synchronized (this._db) {
0235       DB _db = this._db.getAndSet(null);
0236       if (_db == null) {
0237         return;
0238       }
0239 
0240       try {
0241         _db.close();
0242       } catch (IOException ioe) {
0243         throw ioe;
0244       } catch (Exception e) {
0245         throw new IOException(e.getMessage(), e);
0246       }
0247     }
0248   }
0249 
0250   /**
0251    * Closes the given iterator if the DB is still open. Trying to close a JNI LevelDB handle
0252    * with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
0253    */
0254   void closeIterator(LevelDBIterator<?> it) throws IOException {
0255     synchronized (this._db) {
0256       DB _db = this._db.get();
0257       if (_db != null) {
0258         it.close();
0259       }
0260     }
0261   }
0262 
0263   /** Returns metadata about indices for the given type. */
0264   LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
0265     LevelDBTypeInfo ti = types.get(type);
0266     if (ti == null) {
0267       LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, getTypeAlias(type));
0268       ti = types.putIfAbsent(type, tmp);
0269       if (ti == null) {
0270         ti = tmp;
0271       }
0272     }
0273     return ti;
0274   }
0275 
0276   /**
0277    * Try to avoid use-after close since that has the tendency of crashing the JVM. This doesn't
0278    * prevent methods that retrieved the instance from using it after close, but hopefully will
0279    * catch most cases; otherwise, we'll need some kind of locking.
0280    */
0281   DB db() {
0282     DB _db = this._db.get();
0283     if (_db == null) {
0284       throw new IllegalStateException("DB is closed.");
0285     }
0286     return _db;
0287   }
0288 
0289   private byte[] getTypeAlias(Class<?> klass) throws Exception {
0290     byte[] alias = typeAliases.get(klass.getName());
0291     if (alias == null) {
0292       synchronized (typeAliases) {
0293         byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
0294         alias = typeAliases.putIfAbsent(klass.getName(), tmp);
0295         if (alias == null) {
0296           alias = tmp;
0297           put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
0298         }
0299       }
0300     }
0301     return alias;
0302   }
0303 
0304   /** Needs to be public for Jackson. */
0305   public static class TypeAliases {
0306 
0307     public Map<String, byte[]> aliases;
0308 
0309     TypeAliases(Map<String, byte[]> aliases) {
0310       this.aliases = aliases;
0311     }
0312 
0313     TypeAliases() {
0314       this(null);
0315     }
0316 
0317   }
0318 
0319   private static class PrefixCache {
0320 
0321     private final Object entity;
0322     private final Map<LevelDBTypeInfo.Index, byte[]> prefixes;
0323 
0324     PrefixCache(Object entity) {
0325       this.entity = entity;
0326       this.prefixes = new HashMap<>();
0327     }
0328 
0329     byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception {
0330       byte[] prefix = null;
0331       if (idx.isChild()) {
0332         prefix = prefixes.get(idx.parent());
0333         if (prefix == null) {
0334           prefix = idx.parent().childPrefix(idx.parent().getValue(entity));
0335           prefixes.put(idx.parent(), prefix);
0336         }
0337       }
0338       return prefix;
0339     }
0340 
0341   }
0342 
0343 }