Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.util.kvstore;
0019 
0020 import java.io.IOException;
0021 import java.util.ArrayList;
0022 import java.util.List;
0023 import java.util.Map;
0024 import java.util.NoSuchElementException;
0025 
0026 import com.google.common.annotations.VisibleForTesting;
0027 import com.google.common.base.Preconditions;
0028 import com.google.common.base.Throwables;
0029 import org.iq80.leveldb.DBIterator;
0030 
0031 class LevelDBIterator<T> implements KVStoreIterator<T> {
0032 
0033   private final LevelDB db;
0034   private final boolean ascending;
0035   private final DBIterator it;
0036   private final Class<T> type;
0037   private final LevelDBTypeInfo ti;
0038   private final LevelDBTypeInfo.Index index;
0039   private final byte[] indexKeyPrefix;
0040   private final byte[] end;
0041   private final long max;
0042 
0043   private boolean checkedNext;
0044   private byte[] next;
0045   private boolean closed;
0046   private long count;
0047 
0048   LevelDBIterator(Class<T> type, LevelDB db, KVStoreView<T> params) throws Exception {
0049     this.db = db;
0050     this.ascending = params.ascending;
0051     this.it = db.db().iterator();
0052     this.type = type;
0053     this.ti = db.getTypeInfo(type);
0054     this.index = ti.index(params.index);
0055     this.max = params.max;
0056 
0057     Preconditions.checkArgument(!index.isChild() || params.parent != null,
0058       "Cannot iterate over child index %s without parent value.", params.index);
0059     byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
0060 
0061     this.indexKeyPrefix = index.keyPrefix(parent);
0062 
0063     byte[] firstKey;
0064     if (params.first != null) {
0065       if (ascending) {
0066         firstKey = index.start(parent, params.first);
0067       } else {
0068         firstKey = index.end(parent, params.first);
0069       }
0070     } else if (ascending) {
0071       firstKey = index.keyPrefix(parent);
0072     } else {
0073       firstKey = index.end(parent);
0074     }
0075     it.seek(firstKey);
0076 
0077     byte[] end = null;
0078     if (ascending) {
0079       if (params.last != null) {
0080         end = index.end(parent, params.last);
0081       } else {
0082         end = index.end(parent);
0083       }
0084     } else {
0085       if (params.last != null) {
0086         end = index.start(parent, params.last);
0087       }
0088       if (it.hasNext()) {
0089         // When descending, the caller may have set up the start of iteration at a non-existent
0090         // entry that is guaranteed to be after the desired entry. For example, if you have a
0091         // compound key (a, b) where b is a, integer, you may seek to the end of the elements that
0092         // have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not
0093         // exist in the database. So need to check here whether the next value actually belongs to
0094         // the set being returned by the iterator before advancing.
0095         byte[] nextKey = it.peekNext().getKey();
0096         if (compare(nextKey, indexKeyPrefix) <= 0) {
0097           it.next();
0098         }
0099       }
0100     }
0101     this.end = end;
0102 
0103     if (params.skip > 0) {
0104       skip(params.skip);
0105     }
0106   }
0107 
0108   @Override
0109   public boolean hasNext() {
0110     if (!checkedNext && !closed) {
0111       next = loadNext();
0112       checkedNext = true;
0113     }
0114     if (!closed && next == null) {
0115       try {
0116         close();
0117       } catch (IOException ioe) {
0118         throw Throwables.propagate(ioe);
0119       }
0120     }
0121     return next != null;
0122   }
0123 
0124   @Override
0125   public T next() {
0126     if (!hasNext()) {
0127       throw new NoSuchElementException();
0128     }
0129     checkedNext = false;
0130 
0131     try {
0132       T ret;
0133       if (index == null || index.isCopy()) {
0134         ret = db.serializer.deserialize(next, type);
0135       } else {
0136         byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), next);
0137         ret = db.get(key, type);
0138       }
0139       next = null;
0140       return ret;
0141     } catch (Exception e) {
0142       throw Throwables.propagate(e);
0143     }
0144   }
0145 
0146   @Override
0147   public void remove() {
0148     throw new UnsupportedOperationException();
0149   }
0150 
0151   @Override
0152   public List<T> next(int max) {
0153     List<T> list = new ArrayList<>(max);
0154     while (hasNext() && list.size() < max) {
0155       list.add(next());
0156     }
0157     return list;
0158   }
0159 
0160   @Override
0161   public boolean skip(long n) {
0162     long skipped = 0;
0163     while (skipped < n) {
0164       if (next != null) {
0165         checkedNext = false;
0166         next = null;
0167         skipped++;
0168         continue;
0169       }
0170 
0171       boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
0172       if (!hasNext) {
0173         checkedNext = true;
0174         return false;
0175       }
0176 
0177       Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev();
0178       if (!isEndMarker(e.getKey())) {
0179         skipped++;
0180       }
0181     }
0182 
0183     return hasNext();
0184   }
0185 
0186   @Override
0187   public synchronized void close() throws IOException {
0188     if (!closed) {
0189       it.close();
0190       closed = true;
0191     }
0192   }
0193 
0194   /**
0195    * Because it's tricky to expose closeable iterators through many internal APIs, especially
0196    * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
0197    * the iterator will eventually be released.
0198    */
0199   @SuppressWarnings("deprecation")
0200   @Override
0201   protected void finalize() throws Throwable {
0202     db.closeIterator(this);
0203   }
0204 
0205   private byte[] loadNext() {
0206     if (count >= max) {
0207       return null;
0208     }
0209 
0210     while (true) {
0211       boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
0212       if (!hasNext) {
0213         return null;
0214       }
0215 
0216       Map.Entry<byte[], byte[]> nextEntry;
0217       try {
0218         // Avoid races if another thread is updating the DB.
0219         nextEntry = ascending ? it.next() : it.prev();
0220       } catch (NoSuchElementException e) {
0221         return null;
0222       }
0223 
0224       byte[] nextKey = nextEntry.getKey();
0225       // Next key is not part of the index, stop.
0226       if (!startsWith(nextKey, indexKeyPrefix)) {
0227         return null;
0228       }
0229 
0230       // If the next key is an end marker, then skip it.
0231       if (isEndMarker(nextKey)) {
0232         continue;
0233       }
0234 
0235       // If there's a known end key and iteration has gone past it, stop.
0236       if (end != null) {
0237         int comp = compare(nextKey, end) * (ascending ? 1 : -1);
0238         if (comp > 0) {
0239           return null;
0240         }
0241       }
0242 
0243       count++;
0244 
0245       // Next element is part of the iteration, return it.
0246       return nextEntry.getValue();
0247     }
0248   }
0249 
0250   @VisibleForTesting
0251   static boolean startsWith(byte[] key, byte[] prefix) {
0252     if (key.length < prefix.length) {
0253       return false;
0254     }
0255 
0256     for (int i = 0; i < prefix.length; i++) {
0257       if (key[i] != prefix[i]) {
0258         return false;
0259       }
0260     }
0261 
0262     return true;
0263   }
0264 
0265   private boolean isEndMarker(byte[] key) {
0266     return (key.length > 2 &&
0267         key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR &&
0268         key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]);
0269   }
0270 
0271   static int compare(byte[] a, byte[] b) {
0272     int diff = 0;
0273     int minLen = Math.min(a.length, b.length);
0274     for (int i = 0; i < minLen; i++) {
0275       diff += (a[i] - b[i]);
0276       if (diff != 0) {
0277         return diff;
0278       }
0279     }
0280 
0281     return a.length - b.length;
0282   }
0283 
0284 }