0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.util.kvstore;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.util.*;
0023 import java.util.concurrent.ConcurrentHashMap;
0024 import java.util.concurrent.ConcurrentMap;
0025 import java.util.concurrent.atomic.AtomicReference;
0026 import static java.nio.charset.StandardCharsets.UTF_8;
0027
0028 import com.google.common.annotations.VisibleForTesting;
0029 import com.google.common.base.Preconditions;
0030 import com.google.common.base.Throwables;
0031 import org.fusesource.leveldbjni.JniDBFactory;
0032 import org.iq80.leveldb.DB;
0033 import org.iq80.leveldb.Options;
0034 import org.iq80.leveldb.WriteBatch;
0035
0036 import org.apache.spark.annotation.Private;
0037
0038
0039
0040
0041 @Private
0042 public class LevelDB implements KVStore {
0043
0044 @VisibleForTesting
0045 static final long STORE_VERSION = 1L;
0046
0047 @VisibleForTesting
0048 static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
0049
0050
0051 private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
0052
0053
0054 private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
0055
0056 final AtomicReference<DB> _db;
0057 final KVStoreSerializer serializer;
0058
0059
0060
0061
0062
0063
0064 private final ConcurrentMap<String, byte[]> typeAliases;
0065 private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
0066
0067 public LevelDB(File path) throws Exception {
0068 this(path, new KVStoreSerializer());
0069 }
0070
0071 public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
0072 this.serializer = serializer;
0073 this.types = new ConcurrentHashMap<>();
0074
0075 Options options = new Options();
0076 options.createIfMissing(true);
0077 this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
0078
0079 byte[] versionData = db().get(STORE_VERSION_KEY);
0080 if (versionData != null) {
0081 long version = serializer.deserializeLong(versionData);
0082 if (version != STORE_VERSION) {
0083 close();
0084 throw new UnsupportedStoreVersionException();
0085 }
0086 } else {
0087 db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
0088 }
0089
0090 Map<String, byte[]> aliases;
0091 try {
0092 aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
0093 } catch (NoSuchElementException e) {
0094 aliases = new HashMap<>();
0095 }
0096 typeAliases = new ConcurrentHashMap<>(aliases);
0097 }
0098
0099 @Override
0100 public <T> T getMetadata(Class<T> klass) throws Exception {
0101 try {
0102 return get(METADATA_KEY, klass);
0103 } catch (NoSuchElementException nsee) {
0104 return null;
0105 }
0106 }
0107
0108 @Override
0109 public void setMetadata(Object value) throws Exception {
0110 if (value != null) {
0111 put(METADATA_KEY, value);
0112 } else {
0113 db().delete(METADATA_KEY);
0114 }
0115 }
0116
0117 <T> T get(byte[] key, Class<T> klass) throws Exception {
0118 byte[] data = db().get(key);
0119 if (data == null) {
0120 throw new NoSuchElementException(new String(key, UTF_8));
0121 }
0122 return serializer.deserialize(data, klass);
0123 }
0124
0125 private void put(byte[] key, Object value) throws Exception {
0126 Preconditions.checkArgument(value != null, "Null values are not allowed.");
0127 db().put(key, serializer.serialize(value));
0128 }
0129
0130 @Override
0131 public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
0132 Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
0133 byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
0134 return get(key, klass);
0135 }
0136
0137 @Override
0138 public void write(Object value) throws Exception {
0139 Preconditions.checkArgument(value != null, "Null values are not allowed.");
0140 LevelDBTypeInfo ti = getTypeInfo(value.getClass());
0141
0142 try (WriteBatch batch = db().createWriteBatch()) {
0143 byte[] data = serializer.serialize(value);
0144 synchronized (ti) {
0145 Object existing;
0146 try {
0147 existing = get(ti.naturalIndex().entityKey(null, value), value.getClass());
0148 } catch (NoSuchElementException e) {
0149 existing = null;
0150 }
0151
0152 PrefixCache cache = new PrefixCache(value);
0153 byte[] naturalKey = ti.naturalIndex().toKey(ti.naturalIndex().getValue(value));
0154 for (LevelDBTypeInfo.Index idx : ti.indices()) {
0155 byte[] prefix = cache.getPrefix(idx);
0156 idx.add(batch, value, existing, data, naturalKey, prefix);
0157 }
0158 db().write(batch);
0159 }
0160 }
0161 }
0162
0163 @Override
0164 public void delete(Class<?> type, Object naturalKey) throws Exception {
0165 Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
0166 try (WriteBatch batch = db().createWriteBatch()) {
0167 LevelDBTypeInfo ti = getTypeInfo(type);
0168 byte[] key = ti.naturalIndex().start(null, naturalKey);
0169 synchronized (ti) {
0170 byte[] data = db().get(key);
0171 if (data != null) {
0172 Object existing = serializer.deserialize(data, type);
0173 PrefixCache cache = new PrefixCache(existing);
0174 byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
0175 for (LevelDBTypeInfo.Index idx : ti.indices()) {
0176 idx.remove(batch, existing, keyBytes, cache.getPrefix(idx));
0177 }
0178 db().write(batch);
0179 }
0180 }
0181 } catch (NoSuchElementException nse) {
0182
0183 }
0184 }
0185
0186 @Override
0187 public <T> KVStoreView<T> view(Class<T> type) throws Exception {
0188 return new KVStoreView<T>() {
0189 @Override
0190 public Iterator<T> iterator() {
0191 try {
0192 return new LevelDBIterator<>(type, LevelDB.this, this);
0193 } catch (Exception e) {
0194 throw Throwables.propagate(e);
0195 }
0196 }
0197 };
0198 }
0199
0200 @Override
0201 public <T> boolean removeAllByIndexValues(
0202 Class<T> klass,
0203 String index,
0204 Collection<?> indexValues) throws Exception {
0205 LevelDBTypeInfo.Index naturalIndex = getTypeInfo(klass).naturalIndex();
0206 boolean removed = false;
0207 KVStoreView<T> view = view(klass).index(index);
0208
0209 for (Object indexValue : indexValues) {
0210 for (T value: view.first(indexValue).last(indexValue)) {
0211 Object itemKey = naturalIndex.getValue(value);
0212 delete(klass, itemKey);
0213 removed = true;
0214 }
0215 }
0216
0217 return removed;
0218 }
0219
0220 @Override
0221 public long count(Class<?> type) throws Exception {
0222 LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
0223 return idx.getCount(idx.end(null));
0224 }
0225
0226 @Override
0227 public long count(Class<?> type, String index, Object indexedValue) throws Exception {
0228 LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index);
0229 return idx.getCount(idx.end(null, indexedValue));
0230 }
0231
0232 @Override
0233 public void close() throws IOException {
0234 synchronized (this._db) {
0235 DB _db = this._db.getAndSet(null);
0236 if (_db == null) {
0237 return;
0238 }
0239
0240 try {
0241 _db.close();
0242 } catch (IOException ioe) {
0243 throw ioe;
0244 } catch (Exception e) {
0245 throw new IOException(e.getMessage(), e);
0246 }
0247 }
0248 }
0249
0250
0251
0252
0253
0254 void closeIterator(LevelDBIterator<?> it) throws IOException {
0255 synchronized (this._db) {
0256 DB _db = this._db.get();
0257 if (_db != null) {
0258 it.close();
0259 }
0260 }
0261 }
0262
0263
0264 LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
0265 LevelDBTypeInfo ti = types.get(type);
0266 if (ti == null) {
0267 LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, getTypeAlias(type));
0268 ti = types.putIfAbsent(type, tmp);
0269 if (ti == null) {
0270 ti = tmp;
0271 }
0272 }
0273 return ti;
0274 }
0275
0276
0277
0278
0279
0280
0281 DB db() {
0282 DB _db = this._db.get();
0283 if (_db == null) {
0284 throw new IllegalStateException("DB is closed.");
0285 }
0286 return _db;
0287 }
0288
0289 private byte[] getTypeAlias(Class<?> klass) throws Exception {
0290 byte[] alias = typeAliases.get(klass.getName());
0291 if (alias == null) {
0292 synchronized (typeAliases) {
0293 byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
0294 alias = typeAliases.putIfAbsent(klass.getName(), tmp);
0295 if (alias == null) {
0296 alias = tmp;
0297 put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
0298 }
0299 }
0300 }
0301 return alias;
0302 }
0303
0304
0305 public static class TypeAliases {
0306
0307 public Map<String, byte[]> aliases;
0308
0309 TypeAliases(Map<String, byte[]> aliases) {
0310 this.aliases = aliases;
0311 }
0312
0313 TypeAliases() {
0314 this(null);
0315 }
0316
0317 }
0318
0319 private static class PrefixCache {
0320
0321 private final Object entity;
0322 private final Map<LevelDBTypeInfo.Index, byte[]> prefixes;
0323
0324 PrefixCache(Object entity) {
0325 this.entity = entity;
0326 this.prefixes = new HashMap<>();
0327 }
0328
0329 byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception {
0330 byte[] prefix = null;
0331 if (idx.isChild()) {
0332 prefix = prefixes.get(idx.parent());
0333 if (prefix == null) {
0334 prefix = idx.parent().childPrefix(idx.parent().getValue(entity));
0335 prefixes.put(idx.parent(), prefix);
0336 }
0337 }
0338 return prefix;
0339 }
0340
0341 }
0342
0343 }