0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.util.kvstore;
0019
0020 import java.io.IOException;
0021 import java.util.ArrayList;
0022 import java.util.List;
0023 import java.util.Map;
0024 import java.util.NoSuchElementException;
0025
0026 import com.google.common.annotations.VisibleForTesting;
0027 import com.google.common.base.Preconditions;
0028 import com.google.common.base.Throwables;
0029 import org.iq80.leveldb.DBIterator;
0030
0031 class LevelDBIterator<T> implements KVStoreIterator<T> {
0032
0033 private final LevelDB db;
0034 private final boolean ascending;
0035 private final DBIterator it;
0036 private final Class<T> type;
0037 private final LevelDBTypeInfo ti;
0038 private final LevelDBTypeInfo.Index index;
0039 private final byte[] indexKeyPrefix;
0040 private final byte[] end;
0041 private final long max;
0042
0043 private boolean checkedNext;
0044 private byte[] next;
0045 private boolean closed;
0046 private long count;
0047
0048 LevelDBIterator(Class<T> type, LevelDB db, KVStoreView<T> params) throws Exception {
0049 this.db = db;
0050 this.ascending = params.ascending;
0051 this.it = db.db().iterator();
0052 this.type = type;
0053 this.ti = db.getTypeInfo(type);
0054 this.index = ti.index(params.index);
0055 this.max = params.max;
0056
0057 Preconditions.checkArgument(!index.isChild() || params.parent != null,
0058 "Cannot iterate over child index %s without parent value.", params.index);
0059 byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
0060
0061 this.indexKeyPrefix = index.keyPrefix(parent);
0062
0063 byte[] firstKey;
0064 if (params.first != null) {
0065 if (ascending) {
0066 firstKey = index.start(parent, params.first);
0067 } else {
0068 firstKey = index.end(parent, params.first);
0069 }
0070 } else if (ascending) {
0071 firstKey = index.keyPrefix(parent);
0072 } else {
0073 firstKey = index.end(parent);
0074 }
0075 it.seek(firstKey);
0076
0077 byte[] end = null;
0078 if (ascending) {
0079 if (params.last != null) {
0080 end = index.end(parent, params.last);
0081 } else {
0082 end = index.end(parent);
0083 }
0084 } else {
0085 if (params.last != null) {
0086 end = index.start(parent, params.last);
0087 }
0088 if (it.hasNext()) {
0089
0090
0091
0092
0093
0094
0095 byte[] nextKey = it.peekNext().getKey();
0096 if (compare(nextKey, indexKeyPrefix) <= 0) {
0097 it.next();
0098 }
0099 }
0100 }
0101 this.end = end;
0102
0103 if (params.skip > 0) {
0104 skip(params.skip);
0105 }
0106 }
0107
0108 @Override
0109 public boolean hasNext() {
0110 if (!checkedNext && !closed) {
0111 next = loadNext();
0112 checkedNext = true;
0113 }
0114 if (!closed && next == null) {
0115 try {
0116 close();
0117 } catch (IOException ioe) {
0118 throw Throwables.propagate(ioe);
0119 }
0120 }
0121 return next != null;
0122 }
0123
0124 @Override
0125 public T next() {
0126 if (!hasNext()) {
0127 throw new NoSuchElementException();
0128 }
0129 checkedNext = false;
0130
0131 try {
0132 T ret;
0133 if (index == null || index.isCopy()) {
0134 ret = db.serializer.deserialize(next, type);
0135 } else {
0136 byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), next);
0137 ret = db.get(key, type);
0138 }
0139 next = null;
0140 return ret;
0141 } catch (Exception e) {
0142 throw Throwables.propagate(e);
0143 }
0144 }
0145
0146 @Override
0147 public void remove() {
0148 throw new UnsupportedOperationException();
0149 }
0150
0151 @Override
0152 public List<T> next(int max) {
0153 List<T> list = new ArrayList<>(max);
0154 while (hasNext() && list.size() < max) {
0155 list.add(next());
0156 }
0157 return list;
0158 }
0159
0160 @Override
0161 public boolean skip(long n) {
0162 long skipped = 0;
0163 while (skipped < n) {
0164 if (next != null) {
0165 checkedNext = false;
0166 next = null;
0167 skipped++;
0168 continue;
0169 }
0170
0171 boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
0172 if (!hasNext) {
0173 checkedNext = true;
0174 return false;
0175 }
0176
0177 Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev();
0178 if (!isEndMarker(e.getKey())) {
0179 skipped++;
0180 }
0181 }
0182
0183 return hasNext();
0184 }
0185
0186 @Override
0187 public synchronized void close() throws IOException {
0188 if (!closed) {
0189 it.close();
0190 closed = true;
0191 }
0192 }
0193
0194
0195
0196
0197
0198
0199 @SuppressWarnings("deprecation")
0200 @Override
0201 protected void finalize() throws Throwable {
0202 db.closeIterator(this);
0203 }
0204
0205 private byte[] loadNext() {
0206 if (count >= max) {
0207 return null;
0208 }
0209
0210 while (true) {
0211 boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
0212 if (!hasNext) {
0213 return null;
0214 }
0215
0216 Map.Entry<byte[], byte[]> nextEntry;
0217 try {
0218
0219 nextEntry = ascending ? it.next() : it.prev();
0220 } catch (NoSuchElementException e) {
0221 return null;
0222 }
0223
0224 byte[] nextKey = nextEntry.getKey();
0225
0226 if (!startsWith(nextKey, indexKeyPrefix)) {
0227 return null;
0228 }
0229
0230
0231 if (isEndMarker(nextKey)) {
0232 continue;
0233 }
0234
0235
0236 if (end != null) {
0237 int comp = compare(nextKey, end) * (ascending ? 1 : -1);
0238 if (comp > 0) {
0239 return null;
0240 }
0241 }
0242
0243 count++;
0244
0245
0246 return nextEntry.getValue();
0247 }
0248 }
0249
0250 @VisibleForTesting
0251 static boolean startsWith(byte[] key, byte[] prefix) {
0252 if (key.length < prefix.length) {
0253 return false;
0254 }
0255
0256 for (int i = 0; i < prefix.length; i++) {
0257 if (key[i] != prefix[i]) {
0258 return false;
0259 }
0260 }
0261
0262 return true;
0263 }
0264
0265 private boolean isEndMarker(byte[] key) {
0266 return (key.length > 2 &&
0267 key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR &&
0268 key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]);
0269 }
0270
0271 static int compare(byte[] a, byte[] b) {
0272 int diff = 0;
0273 int minLen = Math.min(a.length, b.length);
0274 for (int i = 0; i < minLen; i++) {
0275 diff += (a[i] - b[i]);
0276 if (diff != 0) {
0277 return diff;
0278 }
0279 }
0280
0281 return a.length - b.length;
0282 }
0283
0284 }