Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.util.kvstore;
0019 
0020 import java.util.ArrayList;
0021 import java.util.Collection;
0022 import java.util.Collections;
0023 import java.util.Iterator;
0024 import java.util.HashSet;
0025 import java.util.List;
0026 import java.util.NoSuchElementException;
0027 import java.util.concurrent.ConcurrentHashMap;
0028 import java.util.concurrent.ConcurrentMap;
0029 import java.util.function.BiConsumer;
0030 import java.util.function.Predicate;
0031 import java.util.stream.Collectors;
0032 import java.util.stream.Stream;
0033 
0034 import com.google.common.base.Objects;
0035 import com.google.common.base.Preconditions;
0036 
0037 import org.apache.spark.annotation.Private;
0038 
0039 /**
0040  * Implementation of KVStore that keeps data deserialized in memory. This store does not index
0041  * data; instead, whenever iterating over an indexed field, the stored data is copied and sorted
0042  * according to the index. This saves memory but makes iteration more expensive.
0043  */
0044 @Private
0045 public class InMemoryStore implements KVStore {
0046 
0047   private Object metadata;
0048   private InMemoryLists inMemoryLists = new InMemoryLists();
0049 
0050   @Override
0051   public <T> T getMetadata(Class<T> klass) {
0052     return klass.cast(metadata);
0053   }
0054 
0055   @Override
0056   public void setMetadata(Object value) {
0057     this.metadata = value;
0058   }
0059 
0060   @Override
0061   public long count(Class<?> type) {
0062     InstanceList<?> list = inMemoryLists.get(type);
0063     return list != null ? list.size() : 0;
0064   }
0065 
0066   @Override
0067   public long count(Class<?> type, String index, Object indexedValue) throws Exception {
0068     InstanceList<?> list = inMemoryLists.get(type);
0069     int count = 0;
0070     Object comparable = asKey(indexedValue);
0071     KVTypeInfo.Accessor accessor = list.getIndexAccessor(index);
0072     for (Object o : view(type)) {
0073       if (Objects.equal(comparable, asKey(accessor.get(o)))) {
0074         count++;
0075       }
0076     }
0077     return count;
0078   }
0079 
0080   @Override
0081   public <T> T read(Class<T> klass, Object naturalKey) {
0082     InstanceList<T> list = inMemoryLists.get(klass);
0083     T value = list != null ? list.get(naturalKey) : null;
0084     if (value == null) {
0085       throw new NoSuchElementException();
0086     }
0087     return value;
0088   }
0089 
0090   @Override
0091   public void write(Object value) throws Exception {
0092     inMemoryLists.write(value);
0093   }
0094 
0095   @Override
0096   public void delete(Class<?> type, Object naturalKey) {
0097     InstanceList<?> list = inMemoryLists.get(type);
0098     if (list != null) {
0099       list.delete(naturalKey);
0100     }
0101   }
0102 
0103   @Override
0104   public <T> KVStoreView<T> view(Class<T> type){
0105     InstanceList<T> list = inMemoryLists.get(type);
0106     return list != null ? list.view() : emptyView();
0107   }
0108 
0109   @Override
0110   public void close() {
0111     metadata = null;
0112     inMemoryLists.clear();
0113   }
0114 
0115   @Override
0116   public <T> boolean removeAllByIndexValues(
0117       Class<T> klass,
0118       String index,
0119       Collection<?> indexValues) {
0120     InstanceList<T> list = inMemoryLists.get(klass);
0121 
0122     if (list != null) {
0123       return list.countingRemoveAllByIndexValues(index, indexValues) > 0;
0124     } else {
0125       return false;
0126     }
0127   }
0128 
0129   @SuppressWarnings("unchecked")
0130   private static Comparable<Object> asKey(Object in) {
0131     if (in.getClass().isArray()) {
0132       in = ArrayWrappers.forArray(in);
0133     }
0134     return (Comparable<Object>) in;
0135   }
0136 
0137   @SuppressWarnings("unchecked")
0138   private static <T> KVStoreView<T> emptyView() {
0139     return (InMemoryView<T>) InMemoryView.EMPTY_VIEW;
0140   }
0141 
0142   /**
0143    * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a
0144    * class of type T to an InstanceList of type T.
0145    */
0146   private static class InMemoryLists {
0147     private final ConcurrentMap<Class<?>, InstanceList<?>> data = new ConcurrentHashMap<>();
0148 
0149     @SuppressWarnings("unchecked")
0150     public <T> InstanceList<T> get(Class<T> type) {
0151       return (InstanceList<T>) data.get(type);
0152     }
0153 
0154     @SuppressWarnings("unchecked")
0155     public <T> void write(T value) throws Exception {
0156       InstanceList<T> list =
0157         (InstanceList<T>) data.computeIfAbsent(value.getClass(), InstanceList::new);
0158       list.put(value);
0159     }
0160 
0161     public void clear() {
0162       data.clear();
0163     }
0164   }
0165 
0166   /**
0167    * An alias class for the type "ConcurrentHashMap<Comparable<Object>, Boolean>", which is used
0168    * as a concurrent hashset for storing natural keys and the boolean value doesn't matter.
0169    */
0170   private static class NaturalKeys extends ConcurrentHashMap<Comparable<Object>, Boolean> {}
0171 
0172   private static class InstanceList<T> {
0173 
0174     /**
0175      * A BiConsumer to control multi-entity removal.  We use this in a forEach rather than an
0176      * iterator because there is a bug in jdk8 which affects remove() on all concurrent map
0177      * iterators.  https://bugs.openjdk.java.net/browse/JDK-8078645
0178      */
0179     private static class CountingRemoveIfForEach<T> implements BiConsumer<Comparable<Object>, T> {
0180       private final InstanceList<T> instanceList;
0181       private final Predicate<? super T> filter;
0182 
0183       /**
0184        * Keeps a count of the number of elements removed.  This count is not currently surfaced
0185        * to clients of KVStore as Java's generic removeAll() construct returns only a boolean,
0186        * but I found it handy to have the count of elements removed while debugging; a count being
0187        * no more complicated than a boolean, I've retained that behavior here, even though there
0188        * is no current requirement.
0189        */
0190       private int count = 0;
0191 
0192       CountingRemoveIfForEach(InstanceList<T> instanceList, Predicate<? super T> filter) {
0193         this.instanceList = instanceList;
0194         this.filter = filter;
0195       }
0196 
0197       @Override
0198       public void accept(Comparable<Object> key, T value) {
0199         if (filter.test(value)) {
0200           if (instanceList.delete(key, value)) {
0201             count++;
0202           }
0203         }
0204       }
0205 
0206       public int count() { return count; }
0207     }
0208 
0209     private final KVTypeInfo ti;
0210     private final KVTypeInfo.Accessor naturalKey;
0211     private final ConcurrentMap<Comparable<Object>, T> data;
0212     private final String naturalParentIndexName;
0213     private final Boolean hasNaturalParentIndex;
0214     // A mapping from parent to the natural keys of its children.
0215     // For example, a mapping from a stage ID to all the task IDs in the stage.
0216     private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap;
0217 
0218     private InstanceList(Class<?> klass) {
0219       this.ti = new KVTypeInfo(klass);
0220       this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
0221       this.data = new ConcurrentHashMap<>();
0222       this.naturalParentIndexName = ti.getParentIndexName(KVIndex.NATURAL_INDEX_NAME);
0223       this.parentToChildrenMap = new ConcurrentHashMap<>();
0224       this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
0225     }
0226 
0227     KVTypeInfo.Accessor getIndexAccessor(String indexName) {
0228       return ti.getAccessor(indexName);
0229     }
0230 
0231     int countingRemoveAllByIndexValues(String index, Collection<?> indexValues) {
0232       int count = 0;
0233       if (KVIndex.NATURAL_INDEX_NAME.equals(index)) {
0234         for (Object naturalKey : indexValues) {
0235           count += delete(asKey(naturalKey)) ? 1 : 0;
0236         }
0237         return count;
0238       } else if (hasNaturalParentIndex && naturalParentIndexName.equals(index)) {
0239         // If there is a parent index for the natural index and `index` happens to be it,
0240         // Spark can use the `parentToChildrenMap` to get the related natural keys, and then
0241         // delete them from `data`.
0242         for (Object indexValue : indexValues) {
0243           Comparable<Object> parentKey = asKey(indexValue);
0244           NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys());
0245           for (Comparable<Object> naturalKey : children.keySet()) {
0246             data.remove(naturalKey);
0247             count ++;
0248           }
0249           parentToChildrenMap.remove(parentKey);
0250         }
0251         return count;
0252       } else {
0253         Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
0254         CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(this, filter);
0255 
0256         // Go through all the values in `data` and delete objects that meets the predicate `filter`.
0257         // This can be slow when there is a large number of entries in `data`.
0258         data.forEach(callback);
0259         return callback.count();
0260       }
0261     }
0262 
0263     public T get(Object key) {
0264       return data.get(asKey(key));
0265     }
0266 
0267     public void put(T value) throws Exception {
0268       data.put(asKey(naturalKey.get(value)), value);
0269       if (hasNaturalParentIndex) {
0270         Comparable<Object> parentKey = asKey(getIndexAccessor(naturalParentIndexName).get(value));
0271         NaturalKeys children =
0272           parentToChildrenMap.computeIfAbsent(parentKey, k -> new NaturalKeys());
0273         children.put(asKey(naturalKey.get(value)), true);
0274       }
0275     }
0276 
0277     public boolean delete(Object key) {
0278       boolean entryExists = data.remove(asKey(key)) != null;
0279       if (entryExists) {
0280         deleteParentIndex(key);
0281       }
0282       return entryExists;
0283     }
0284 
0285     public boolean delete(Object key, T value) {
0286       boolean entryExists = data.remove(asKey(key), value);
0287       if (entryExists) {
0288         deleteParentIndex(key);
0289       }
0290       return entryExists;
0291     }
0292 
0293     private void deleteParentIndex(Object key) {
0294       if (hasNaturalParentIndex) {
0295         for (NaturalKeys v : parentToChildrenMap.values()) {
0296           if (v.remove(asKey(key)) != null) {
0297             // `v` can be empty after removing the natural key and we can remove it from
0298             // `parentToChildrenMap`. However, `parentToChildrenMap` is a ConcurrentMap and such
0299             // checking and deleting can be slow.
0300             // This method is to delete one object with certain key, let's make it simple here.
0301             break;
0302           }
0303         }
0304       }
0305     }
0306 
0307     public int size() {
0308       return data.size();
0309     }
0310 
0311     public InMemoryView<T> view() {
0312       return new InMemoryView<>(data, ti, naturalParentIndexName, parentToChildrenMap);
0313     }
0314 
0315     private static <T> Predicate<? super T> getPredicate(
0316         KVTypeInfo.Accessor getter,
0317         Collection<?> values) {
0318       if (Comparable.class.isAssignableFrom(getter.getType())) {
0319         HashSet<?> set = new HashSet<>(values);
0320 
0321         return (value) -> set.contains(indexValueForEntity(getter, value));
0322       } else {
0323         HashSet<Comparable<?>> set = new HashSet<>(values.size());
0324         for (Object key : values) {
0325           set.add(asKey(key));
0326         }
0327         return (value) -> set.contains(asKey(indexValueForEntity(getter, value)));
0328       }
0329     }
0330 
0331     private static Object indexValueForEntity(KVTypeInfo.Accessor getter, Object entity) {
0332       try {
0333         return getter.get(entity);
0334       } catch (ReflectiveOperationException e) {
0335         throw new RuntimeException(e);
0336       }
0337     }
0338   }
0339 
0340   private static class InMemoryView<T> extends KVStoreView<T> {
0341     private static final InMemoryView<?> EMPTY_VIEW =
0342       new InMemoryView<>(new ConcurrentHashMap<>(), null, "", new ConcurrentHashMap<>());
0343 
0344     private final ConcurrentMap<Comparable<Object>, T> data;
0345     private final KVTypeInfo ti;
0346     private final KVTypeInfo.Accessor natural;
0347     private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap;
0348     private final String naturalParentIndexName;
0349     private final Boolean hasNaturalParentIndex;
0350 
0351     InMemoryView(
0352         ConcurrentMap<Comparable<Object>, T> data,
0353         KVTypeInfo ti,
0354         String naturalParentIndexName,
0355         ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap) {
0356       this.data = data;
0357       this.ti = ti;
0358       this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
0359       this.naturalParentIndexName = naturalParentIndexName;
0360       this.parentToChildrenMap = parentToChildrenMap;
0361       this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
0362     }
0363 
0364     @Override
0365     public Iterator<T> iterator() {
0366       if (data.isEmpty()) {
0367         return new InMemoryIterator<>(Collections.emptyIterator());
0368       }
0369 
0370       KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
0371       int modifier = ascending ? 1 : -1;
0372 
0373       final List<T> sorted = copyElements();
0374       sorted.sort((e1, e2) -> modifier * compare(e1, e2, getter));
0375       Stream<T> stream = sorted.stream();
0376 
0377       if (first != null) {
0378         Comparable<?> firstKey = asKey(first);
0379         stream = stream.filter(e -> modifier * compare(e, getter, firstKey) >= 0);
0380       }
0381 
0382       if (last != null) {
0383         Comparable<?> lastKey = asKey(last);
0384         stream = stream.filter(e -> modifier * compare(e, getter, lastKey) <= 0);
0385       }
0386 
0387       if (skip > 0) {
0388         stream = stream.skip(skip);
0389       }
0390 
0391       if (max < sorted.size()) {
0392         stream = stream.limit((int) max);
0393       }
0394 
0395       return new InMemoryIterator<>(stream.iterator());
0396     }
0397 
0398     /**
0399      * Create a copy of the input elements, filtering the values for child indices if needed.
0400      */
0401     private List<T> copyElements() {
0402       if (parent != null) {
0403         Comparable<Object> parentKey = asKey(parent);
0404         if (hasNaturalParentIndex && naturalParentIndexName.equals(ti.getParentIndexName(index))) {
0405           // If there is a parent index for the natural index and the parent of `index` happens to
0406           // be it, Spark can use the `parentToChildrenMap` to get the related natural keys, and
0407           // then copy them from `data`.
0408           NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys());
0409           ArrayList<T> elements = new ArrayList<>();
0410           for (Comparable<Object> naturalKey : children.keySet()) {
0411             data.computeIfPresent(naturalKey, (k, v) -> {
0412               elements.add(v);
0413               return v;
0414             });
0415           }
0416           return elements;
0417         } else {
0418           // Go through all the values in `data` and collect all the objects has certain parent
0419           // value. This can be slow when there is a large number of entries in `data`.
0420           KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
0421           Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
0422           return data.values().stream()
0423             .filter(e -> compare(e, parentGetter, parentKey) == 0)
0424             .collect(Collectors.toList());
0425         }
0426       } else {
0427         return new ArrayList<>(data.values());
0428       }
0429     }
0430 
0431     private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
0432       try {
0433         int diff = compare(e1, getter, asKey(getter.get(e2)));
0434         if (diff == 0 && getter != natural) {
0435           diff = compare(e1, natural, asKey(natural.get(e2)));
0436         }
0437         return diff;
0438       } catch (ReflectiveOperationException e) {
0439         throw new RuntimeException(e);
0440       }
0441     }
0442 
0443     private int compare(T e1, KVTypeInfo.Accessor getter, Comparable<?> v2) {
0444       try {
0445         return asKey(getter.get(e1)).compareTo(v2);
0446       } catch (ReflectiveOperationException e) {
0447         throw new RuntimeException(e);
0448       }
0449     }
0450   }
0451 
0452   private static class InMemoryIterator<T> implements KVStoreIterator<T> {
0453 
0454     private final Iterator<T> iter;
0455 
0456     InMemoryIterator(Iterator<T> iter) {
0457       this.iter = iter;
0458     }
0459 
0460     @Override
0461     public boolean hasNext() {
0462       return iter.hasNext();
0463     }
0464 
0465     @Override
0466     public T next() {
0467       return iter.next();
0468     }
0469 
0470     @Override
0471     public void remove() {
0472       throw new UnsupportedOperationException();
0473     }
0474 
0475     @Override
0476     public List<T> next(int max) {
0477       List<T> list = new ArrayList<>(max);
0478       while (hasNext() && list.size() < max) {
0479         list.add(next());
0480       }
0481       return list;
0482     }
0483 
0484     @Override
0485     public boolean skip(long n) {
0486       long skipped = 0;
0487       while (skipped < n) {
0488         if (hasNext()) {
0489           next();
0490           skipped++;
0491         } else {
0492           return false;
0493         }
0494       }
0495 
0496       return hasNext();
0497     }
0498 
0499     @Override
0500     public void close() {
0501       // no op.
0502     }
0503 
0504   }
0505 
0506 }