0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.BufferedReader;
0021 import java.io.File;
0022 import java.io.FileInputStream;
0023 import java.io.InputStreamReader;
0024 import java.io.IOException;
0025 import java.nio.charset.StandardCharsets;
0026 import java.util.ArrayList;
0027 import java.util.Arrays;
0028 import java.util.HashMap;
0029 import java.util.LinkedHashSet;
0030 import java.util.List;
0031 import java.util.Map;
0032 import java.util.Properties;
0033 import java.util.Set;
0034 import java.util.regex.Pattern;
0035
0036 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0037
0038
0039
0040
0041 abstract class AbstractCommandBuilder {
0042
0043 boolean verbose;
0044 String appName;
0045 String appResource;
0046 String deployMode;
0047 String javaHome;
0048 String mainClass;
0049 String master;
0050 protected String propertiesFile;
0051 final List<String> appArgs;
0052 final List<String> jars;
0053 final List<String> files;
0054 final List<String> pyFiles;
0055 final Map<String, String> childEnv;
0056 final Map<String, String> conf;
0057
0058
0059
0060 private Map<String, String> effectiveConfig;
0061
0062 AbstractCommandBuilder() {
0063 this.appArgs = new ArrayList<>();
0064 this.childEnv = new HashMap<>();
0065 this.conf = new HashMap<>();
0066 this.files = new ArrayList<>();
0067 this.jars = new ArrayList<>();
0068 this.pyFiles = new ArrayList<>();
0069 }
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079 abstract List<String> buildCommand(Map<String, String> env)
0080 throws IOException, IllegalArgumentException;
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092 List<String> buildJavaCommand(String extraClassPath) throws IOException {
0093 List<String> cmd = new ArrayList<>();
0094
0095 String[] candidateJavaHomes = new String[] {
0096 javaHome,
0097 childEnv.get("JAVA_HOME"),
0098 System.getenv("JAVA_HOME"),
0099 System.getProperty("java.home")
0100 };
0101 for (String javaHome : candidateJavaHomes) {
0102 if (javaHome != null) {
0103 cmd.add(join(File.separator, javaHome, "bin", "java"));
0104 break;
0105 }
0106 }
0107
0108
0109 File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
0110 if (javaOpts.isFile()) {
0111 try (BufferedReader br = new BufferedReader(new InputStreamReader(
0112 new FileInputStream(javaOpts), StandardCharsets.UTF_8))) {
0113 String line;
0114 while ((line = br.readLine()) != null) {
0115 addOptionString(cmd, line);
0116 }
0117 }
0118 }
0119
0120 cmd.add("-cp");
0121 cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
0122 return cmd;
0123 }
0124
0125 void addOptionString(List<String> cmd, String options) {
0126 if (!isEmpty(options)) {
0127 for (String opt : parseOptionString(options)) {
0128 cmd.add(opt);
0129 }
0130 }
0131 }
0132
0133
0134
0135
0136
0137
0138 List<String> buildClassPath(String appClassPath) throws IOException {
0139 String sparkHome = getSparkHome();
0140
0141 Set<String> cp = new LinkedHashSet<>();
0142 addToClassPath(cp, appClassPath);
0143
0144 addToClassPath(cp, getConfDir());
0145
0146 boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES"));
0147 boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
0148 if (prependClasses || isTesting) {
0149 String scala = getScalaVersion();
0150 List<String> projects = Arrays.asList(
0151 "common/kvstore",
0152 "common/network-common",
0153 "common/network-shuffle",
0154 "common/network-yarn",
0155 "common/sketch",
0156 "common/tags",
0157 "common/unsafe",
0158 "core",
0159 "examples",
0160 "graphx",
0161 "launcher",
0162 "mllib",
0163 "repl",
0164 "resource-managers/mesos",
0165 "resource-managers/yarn",
0166 "sql/catalyst",
0167 "sql/core",
0168 "sql/hive",
0169 "sql/hive-thriftserver",
0170 "streaming"
0171 );
0172 if (prependClasses) {
0173 if (!isTesting) {
0174 System.err.println(
0175 "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
0176 "assembly.");
0177 }
0178 for (String project : projects) {
0179 addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project,
0180 scala));
0181 }
0182 }
0183 if (isTesting) {
0184 for (String project : projects) {
0185 addToClassPath(cp, String.format("%s/%s/target/scala-%s/test-classes", sparkHome,
0186 project, scala));
0187 }
0188 }
0189
0190
0191
0192 addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
0193 addToClassPath(cp, String.format("%s/mllib/target/jars/*", sparkHome));
0194 }
0195
0196
0197
0198
0199 boolean isTestingSql = "1".equals(getenv("SPARK_SQL_TESTING"));
0200 String jarsDir = findJarsDir(getSparkHome(), getScalaVersion(), !isTesting && !isTestingSql);
0201 if (jarsDir != null) {
0202 addToClassPath(cp, join(File.separator, jarsDir, "*"));
0203 }
0204
0205 addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
0206 addToClassPath(cp, getenv("YARN_CONF_DIR"));
0207 addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH"));
0208 return new ArrayList<>(cp);
0209 }
0210
0211
0212
0213
0214
0215
0216
0217 private void addToClassPath(Set<String> cp, String entries) {
0218 if (isEmpty(entries)) {
0219 return;
0220 }
0221 String[] split = entries.split(Pattern.quote(File.pathSeparator));
0222 for (String entry : split) {
0223 if (!isEmpty(entry)) {
0224 if (new File(entry).isDirectory() && !entry.endsWith(File.separator)) {
0225 entry += File.separator;
0226 }
0227 cp.add(entry);
0228 }
0229 }
0230 }
0231
0232 String getScalaVersion() {
0233 String scala = getenv("SPARK_SCALA_VERSION");
0234 if (scala != null) {
0235 return scala;
0236 }
0237 String sparkHome = getSparkHome();
0238
0239 File scala212 = new File(sparkHome, "launcher/target/scala-2.12");
0240
0241
0242
0243
0244
0245
0246
0247
0248
0249
0250 checkState(scala212.isDirectory(), "Cannot find any build directories.");
0251 return "2.12";
0252 }
0253
0254 String getSparkHome() {
0255 String path = getenv(ENV_SPARK_HOME);
0256 if (path == null && "1".equals(getenv("SPARK_TESTING"))) {
0257 path = System.getProperty("spark.test.home");
0258 }
0259 checkState(path != null,
0260 "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
0261 return path;
0262 }
0263
0264 String getenv(String key) {
0265 return firstNonEmpty(childEnv.get(key), System.getenv(key));
0266 }
0267
0268 void setPropertiesFile(String path) {
0269 effectiveConfig = null;
0270 this.propertiesFile = path;
0271 }
0272
0273 Map<String, String> getEffectiveConfig() throws IOException {
0274 if (effectiveConfig == null) {
0275 effectiveConfig = new HashMap<>(conf);
0276 Properties p = loadPropertiesFile();
0277 for (String key : p.stringPropertyNames()) {
0278 if (!effectiveConfig.containsKey(key)) {
0279 effectiveConfig.put(key, p.getProperty(key));
0280 }
0281 }
0282 }
0283 return effectiveConfig;
0284 }
0285
0286
0287
0288
0289
0290
0291 private Properties loadPropertiesFile() throws IOException {
0292 Properties props = new Properties();
0293 File propsFile;
0294 if (propertiesFile != null) {
0295 propsFile = new File(propertiesFile);
0296 checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile);
0297 } else {
0298 propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE);
0299 }
0300
0301 if (propsFile.isFile()) {
0302 try (InputStreamReader isr = new InputStreamReader(
0303 new FileInputStream(propsFile), StandardCharsets.UTF_8)) {
0304 props.load(isr);
0305 for (Map.Entry<Object, Object> e : props.entrySet()) {
0306 e.setValue(e.getValue().toString().trim());
0307 }
0308 }
0309 }
0310 return props;
0311 }
0312
0313 private String getConfDir() {
0314 String confDir = getenv("SPARK_CONF_DIR");
0315 return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf");
0316 }
0317
0318 }