0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.util;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.nio.charset.StandardCharsets;
0023
0024 import com.fasterxml.jackson.annotation.JsonCreator;
0025 import com.fasterxml.jackson.annotation.JsonProperty;
0026 import com.fasterxml.jackson.databind.ObjectMapper;
0027 import org.fusesource.leveldbjni.JniDBFactory;
0028 import org.fusesource.leveldbjni.internal.NativeDB;
0029 import org.iq80.leveldb.DB;
0030 import org.iq80.leveldb.Options;
0031 import org.slf4j.Logger;
0032 import org.slf4j.LoggerFactory;
0033
0034
0035
0036
0037 public class LevelDBProvider {
0038 private static final Logger logger = LoggerFactory.getLogger(LevelDBProvider.class);
0039
0040 public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper mapper) throws
0041 IOException {
0042 DB tmpDb = null;
0043 if (dbFile != null) {
0044 Options options = new Options();
0045 options.createIfMissing(false);
0046 options.logger(new LevelDBLogger());
0047 try {
0048 tmpDb = JniDBFactory.factory.open(dbFile, options);
0049 } catch (NativeDB.DBException e) {
0050 if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
0051 logger.info("Creating state database at " + dbFile);
0052 options.createIfMissing(true);
0053 try {
0054 tmpDb = JniDBFactory.factory.open(dbFile, options);
0055 } catch (NativeDB.DBException dbExc) {
0056 throw new IOException("Unable to create state store", dbExc);
0057 }
0058 } else {
0059
0060
0061 logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
0062 "recover state for existing applications", dbFile, e);
0063 if (dbFile.isDirectory()) {
0064 for (File f : dbFile.listFiles()) {
0065 if (!f.delete()) {
0066 logger.warn("error deleting {}", f.getPath());
0067 }
0068 }
0069 }
0070 if (!dbFile.delete()) {
0071 logger.warn("error deleting {}", dbFile.getPath());
0072 }
0073 options.createIfMissing(true);
0074 try {
0075 tmpDb = JniDBFactory.factory.open(dbFile, options);
0076 } catch (NativeDB.DBException dbExc) {
0077 throw new IOException("Unable to create state store", dbExc);
0078 }
0079
0080 }
0081 }
0082
0083 checkVersion(tmpDb, version, mapper);
0084 }
0085 return tmpDb;
0086 }
0087
0088 private static class LevelDBLogger implements org.iq80.leveldb.Logger {
0089 private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);
0090
0091 @Override
0092 public void log(String message) {
0093 LOG.info(message);
0094 }
0095 }
0096
0097
0098
0099
0100
0101
0102 public static void checkVersion(DB db, StoreVersion newversion, ObjectMapper mapper) throws
0103 IOException {
0104 byte[] bytes = db.get(StoreVersion.KEY);
0105 if (bytes == null) {
0106 storeVersion(db, newversion, mapper);
0107 } else {
0108 StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
0109 if (version.major != newversion.major) {
0110 throw new IOException("cannot read state DB with version " + version + ", incompatible " +
0111 "with current version " + newversion);
0112 }
0113 storeVersion(db, newversion, mapper);
0114 }
0115 }
0116
0117 public static void storeVersion(DB db, StoreVersion version, ObjectMapper mapper)
0118 throws IOException {
0119 db.put(StoreVersion.KEY, mapper.writeValueAsBytes(version));
0120 }
0121
0122 public static class StoreVersion {
0123
0124 static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
0125
0126 public final int major;
0127 public final int minor;
0128
0129 @JsonCreator
0130 public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) {
0131 this.major = major;
0132 this.minor = minor;
0133 }
0134
0135 @Override
0136 public boolean equals(Object o) {
0137 if (this == o) return true;
0138 if (o == null || getClass() != o.getClass()) return false;
0139
0140 StoreVersion that = (StoreVersion) o;
0141
0142 return major == that.major && minor == that.minor;
0143 }
0144
0145 @Override
0146 public int hashCode() {
0147 int result = major;
0148 result = 31 * result + minor;
0149 return result;
0150 }
0151 }
0152 }