0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.util.kvstore;
0019
0020 import java.util.ArrayList;
0021 import java.util.Collection;
0022 import java.util.Collections;
0023 import java.util.Iterator;
0024 import java.util.HashSet;
0025 import java.util.List;
0026 import java.util.NoSuchElementException;
0027 import java.util.concurrent.ConcurrentHashMap;
0028 import java.util.concurrent.ConcurrentMap;
0029 import java.util.function.BiConsumer;
0030 import java.util.function.Predicate;
0031 import java.util.stream.Collectors;
0032 import java.util.stream.Stream;
0033
0034 import com.google.common.base.Objects;
0035 import com.google.common.base.Preconditions;
0036
0037 import org.apache.spark.annotation.Private;
0038
0039
0040
0041
0042
0043
0044 @Private
0045 public class InMemoryStore implements KVStore {
0046
0047 private Object metadata;
0048 private InMemoryLists inMemoryLists = new InMemoryLists();
0049
0050 @Override
0051 public <T> T getMetadata(Class<T> klass) {
0052 return klass.cast(metadata);
0053 }
0054
0055 @Override
0056 public void setMetadata(Object value) {
0057 this.metadata = value;
0058 }
0059
0060 @Override
0061 public long count(Class<?> type) {
0062 InstanceList<?> list = inMemoryLists.get(type);
0063 return list != null ? list.size() : 0;
0064 }
0065
0066 @Override
0067 public long count(Class<?> type, String index, Object indexedValue) throws Exception {
0068 InstanceList<?> list = inMemoryLists.get(type);
0069 int count = 0;
0070 Object comparable = asKey(indexedValue);
0071 KVTypeInfo.Accessor accessor = list.getIndexAccessor(index);
0072 for (Object o : view(type)) {
0073 if (Objects.equal(comparable, asKey(accessor.get(o)))) {
0074 count++;
0075 }
0076 }
0077 return count;
0078 }
0079
0080 @Override
0081 public <T> T read(Class<T> klass, Object naturalKey) {
0082 InstanceList<T> list = inMemoryLists.get(klass);
0083 T value = list != null ? list.get(naturalKey) : null;
0084 if (value == null) {
0085 throw new NoSuchElementException();
0086 }
0087 return value;
0088 }
0089
0090 @Override
0091 public void write(Object value) throws Exception {
0092 inMemoryLists.write(value);
0093 }
0094
0095 @Override
0096 public void delete(Class<?> type, Object naturalKey) {
0097 InstanceList<?> list = inMemoryLists.get(type);
0098 if (list != null) {
0099 list.delete(naturalKey);
0100 }
0101 }
0102
0103 @Override
0104 public <T> KVStoreView<T> view(Class<T> type){
0105 InstanceList<T> list = inMemoryLists.get(type);
0106 return list != null ? list.view() : emptyView();
0107 }
0108
0109 @Override
0110 public void close() {
0111 metadata = null;
0112 inMemoryLists.clear();
0113 }
0114
0115 @Override
0116 public <T> boolean removeAllByIndexValues(
0117 Class<T> klass,
0118 String index,
0119 Collection<?> indexValues) {
0120 InstanceList<T> list = inMemoryLists.get(klass);
0121
0122 if (list != null) {
0123 return list.countingRemoveAllByIndexValues(index, indexValues) > 0;
0124 } else {
0125 return false;
0126 }
0127 }
0128
0129 @SuppressWarnings("unchecked")
0130 private static Comparable<Object> asKey(Object in) {
0131 if (in.getClass().isArray()) {
0132 in = ArrayWrappers.forArray(in);
0133 }
0134 return (Comparable<Object>) in;
0135 }
0136
0137 @SuppressWarnings("unchecked")
0138 private static <T> KVStoreView<T> emptyView() {
0139 return (InMemoryView<T>) InMemoryView.EMPTY_VIEW;
0140 }
0141
0142
0143
0144
0145
0146 private static class InMemoryLists {
0147 private final ConcurrentMap<Class<?>, InstanceList<?>> data = new ConcurrentHashMap<>();
0148
0149 @SuppressWarnings("unchecked")
0150 public <T> InstanceList<T> get(Class<T> type) {
0151 return (InstanceList<T>) data.get(type);
0152 }
0153
0154 @SuppressWarnings("unchecked")
0155 public <T> void write(T value) throws Exception {
0156 InstanceList<T> list =
0157 (InstanceList<T>) data.computeIfAbsent(value.getClass(), InstanceList::new);
0158 list.put(value);
0159 }
0160
0161 public void clear() {
0162 data.clear();
0163 }
0164 }
0165
0166
0167
0168
0169
0170 private static class NaturalKeys extends ConcurrentHashMap<Comparable<Object>, Boolean> {}
0171
0172 private static class InstanceList<T> {
0173
0174
0175
0176
0177
0178
0179 private static class CountingRemoveIfForEach<T> implements BiConsumer<Comparable<Object>, T> {
0180 private final InstanceList<T> instanceList;
0181 private final Predicate<? super T> filter;
0182
0183
0184
0185
0186
0187
0188
0189
0190 private int count = 0;
0191
0192 CountingRemoveIfForEach(InstanceList<T> instanceList, Predicate<? super T> filter) {
0193 this.instanceList = instanceList;
0194 this.filter = filter;
0195 }
0196
0197 @Override
0198 public void accept(Comparable<Object> key, T value) {
0199 if (filter.test(value)) {
0200 if (instanceList.delete(key, value)) {
0201 count++;
0202 }
0203 }
0204 }
0205
0206 public int count() { return count; }
0207 }
0208
0209 private final KVTypeInfo ti;
0210 private final KVTypeInfo.Accessor naturalKey;
0211 private final ConcurrentMap<Comparable<Object>, T> data;
0212 private final String naturalParentIndexName;
0213 private final Boolean hasNaturalParentIndex;
0214
0215
0216 private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap;
0217
0218 private InstanceList(Class<?> klass) {
0219 this.ti = new KVTypeInfo(klass);
0220 this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
0221 this.data = new ConcurrentHashMap<>();
0222 this.naturalParentIndexName = ti.getParentIndexName(KVIndex.NATURAL_INDEX_NAME);
0223 this.parentToChildrenMap = new ConcurrentHashMap<>();
0224 this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
0225 }
0226
0227 KVTypeInfo.Accessor getIndexAccessor(String indexName) {
0228 return ti.getAccessor(indexName);
0229 }
0230
0231 int countingRemoveAllByIndexValues(String index, Collection<?> indexValues) {
0232 int count = 0;
0233 if (KVIndex.NATURAL_INDEX_NAME.equals(index)) {
0234 for (Object naturalKey : indexValues) {
0235 count += delete(asKey(naturalKey)) ? 1 : 0;
0236 }
0237 return count;
0238 } else if (hasNaturalParentIndex && naturalParentIndexName.equals(index)) {
0239
0240
0241
0242 for (Object indexValue : indexValues) {
0243 Comparable<Object> parentKey = asKey(indexValue);
0244 NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys());
0245 for (Comparable<Object> naturalKey : children.keySet()) {
0246 data.remove(naturalKey);
0247 count ++;
0248 }
0249 parentToChildrenMap.remove(parentKey);
0250 }
0251 return count;
0252 } else {
0253 Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
0254 CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(this, filter);
0255
0256
0257
0258 data.forEach(callback);
0259 return callback.count();
0260 }
0261 }
0262
0263 public T get(Object key) {
0264 return data.get(asKey(key));
0265 }
0266
0267 public void put(T value) throws Exception {
0268 data.put(asKey(naturalKey.get(value)), value);
0269 if (hasNaturalParentIndex) {
0270 Comparable<Object> parentKey = asKey(getIndexAccessor(naturalParentIndexName).get(value));
0271 NaturalKeys children =
0272 parentToChildrenMap.computeIfAbsent(parentKey, k -> new NaturalKeys());
0273 children.put(asKey(naturalKey.get(value)), true);
0274 }
0275 }
0276
0277 public boolean delete(Object key) {
0278 boolean entryExists = data.remove(asKey(key)) != null;
0279 if (entryExists) {
0280 deleteParentIndex(key);
0281 }
0282 return entryExists;
0283 }
0284
0285 public boolean delete(Object key, T value) {
0286 boolean entryExists = data.remove(asKey(key), value);
0287 if (entryExists) {
0288 deleteParentIndex(key);
0289 }
0290 return entryExists;
0291 }
0292
0293 private void deleteParentIndex(Object key) {
0294 if (hasNaturalParentIndex) {
0295 for (NaturalKeys v : parentToChildrenMap.values()) {
0296 if (v.remove(asKey(key)) != null) {
0297
0298
0299
0300
0301 break;
0302 }
0303 }
0304 }
0305 }
0306
0307 public int size() {
0308 return data.size();
0309 }
0310
0311 public InMemoryView<T> view() {
0312 return new InMemoryView<>(data, ti, naturalParentIndexName, parentToChildrenMap);
0313 }
0314
0315 private static <T> Predicate<? super T> getPredicate(
0316 KVTypeInfo.Accessor getter,
0317 Collection<?> values) {
0318 if (Comparable.class.isAssignableFrom(getter.getType())) {
0319 HashSet<?> set = new HashSet<>(values);
0320
0321 return (value) -> set.contains(indexValueForEntity(getter, value));
0322 } else {
0323 HashSet<Comparable<?>> set = new HashSet<>(values.size());
0324 for (Object key : values) {
0325 set.add(asKey(key));
0326 }
0327 return (value) -> set.contains(asKey(indexValueForEntity(getter, value)));
0328 }
0329 }
0330
0331 private static Object indexValueForEntity(KVTypeInfo.Accessor getter, Object entity) {
0332 try {
0333 return getter.get(entity);
0334 } catch (ReflectiveOperationException e) {
0335 throw new RuntimeException(e);
0336 }
0337 }
0338 }
0339
0340 private static class InMemoryView<T> extends KVStoreView<T> {
0341 private static final InMemoryView<?> EMPTY_VIEW =
0342 new InMemoryView<>(new ConcurrentHashMap<>(), null, "", new ConcurrentHashMap<>());
0343
0344 private final ConcurrentMap<Comparable<Object>, T> data;
0345 private final KVTypeInfo ti;
0346 private final KVTypeInfo.Accessor natural;
0347 private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap;
0348 private final String naturalParentIndexName;
0349 private final Boolean hasNaturalParentIndex;
0350
0351 InMemoryView(
0352 ConcurrentMap<Comparable<Object>, T> data,
0353 KVTypeInfo ti,
0354 String naturalParentIndexName,
0355 ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap) {
0356 this.data = data;
0357 this.ti = ti;
0358 this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
0359 this.naturalParentIndexName = naturalParentIndexName;
0360 this.parentToChildrenMap = parentToChildrenMap;
0361 this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
0362 }
0363
0364 @Override
0365 public Iterator<T> iterator() {
0366 if (data.isEmpty()) {
0367 return new InMemoryIterator<>(Collections.emptyIterator());
0368 }
0369
0370 KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
0371 int modifier = ascending ? 1 : -1;
0372
0373 final List<T> sorted = copyElements();
0374 sorted.sort((e1, e2) -> modifier * compare(e1, e2, getter));
0375 Stream<T> stream = sorted.stream();
0376
0377 if (first != null) {
0378 Comparable<?> firstKey = asKey(first);
0379 stream = stream.filter(e -> modifier * compare(e, getter, firstKey) >= 0);
0380 }
0381
0382 if (last != null) {
0383 Comparable<?> lastKey = asKey(last);
0384 stream = stream.filter(e -> modifier * compare(e, getter, lastKey) <= 0);
0385 }
0386
0387 if (skip > 0) {
0388 stream = stream.skip(skip);
0389 }
0390
0391 if (max < sorted.size()) {
0392 stream = stream.limit((int) max);
0393 }
0394
0395 return new InMemoryIterator<>(stream.iterator());
0396 }
0397
0398
0399
0400
0401 private List<T> copyElements() {
0402 if (parent != null) {
0403 Comparable<Object> parentKey = asKey(parent);
0404 if (hasNaturalParentIndex && naturalParentIndexName.equals(ti.getParentIndexName(index))) {
0405
0406
0407
0408 NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys());
0409 ArrayList<T> elements = new ArrayList<>();
0410 for (Comparable<Object> naturalKey : children.keySet()) {
0411 data.computeIfPresent(naturalKey, (k, v) -> {
0412 elements.add(v);
0413 return v;
0414 });
0415 }
0416 return elements;
0417 } else {
0418
0419
0420 KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
0421 Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
0422 return data.values().stream()
0423 .filter(e -> compare(e, parentGetter, parentKey) == 0)
0424 .collect(Collectors.toList());
0425 }
0426 } else {
0427 return new ArrayList<>(data.values());
0428 }
0429 }
0430
0431 private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
0432 try {
0433 int diff = compare(e1, getter, asKey(getter.get(e2)));
0434 if (diff == 0 && getter != natural) {
0435 diff = compare(e1, natural, asKey(natural.get(e2)));
0436 }
0437 return diff;
0438 } catch (ReflectiveOperationException e) {
0439 throw new RuntimeException(e);
0440 }
0441 }
0442
0443 private int compare(T e1, KVTypeInfo.Accessor getter, Comparable<?> v2) {
0444 try {
0445 return asKey(getter.get(e1)).compareTo(v2);
0446 } catch (ReflectiveOperationException e) {
0447 throw new RuntimeException(e);
0448 }
0449 }
0450 }
0451
0452 private static class InMemoryIterator<T> implements KVStoreIterator<T> {
0453
0454 private final Iterator<T> iter;
0455
0456 InMemoryIterator(Iterator<T> iter) {
0457 this.iter = iter;
0458 }
0459
0460 @Override
0461 public boolean hasNext() {
0462 return iter.hasNext();
0463 }
0464
0465 @Override
0466 public T next() {
0467 return iter.next();
0468 }
0469
0470 @Override
0471 public void remove() {
0472 throw new UnsupportedOperationException();
0473 }
0474
0475 @Override
0476 public List<T> next(int max) {
0477 List<T> list = new ArrayList<>(max);
0478 while (hasNext() && list.size() < max) {
0479 list.add(next());
0480 }
0481 return list;
0482 }
0483
0484 @Override
0485 public boolean skip(long n) {
0486 long skipped = 0;
0487 while (skipped < n) {
0488 if (hasNext()) {
0489 next();
0490 skipped++;
0491 } else {
0492 return false;
0493 }
0494 }
0495
0496 return hasNext();
0497 }
0498
0499 @Override
0500 public void close() {
0501
0502 }
0503
0504 }
0505
0506 }