0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.IOException;
0021 import java.util.Arrays;
0022 import java.util.List;
0023
0024 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0025
0026
0027
0028
0029
0030
0031 public abstract class AbstractLauncher<T extends AbstractLauncher<T>> {
0032
0033 final SparkSubmitCommandBuilder builder;
0034
0035 AbstractLauncher() {
0036 this.builder = new SparkSubmitCommandBuilder();
0037 }
0038
0039
0040
0041
0042
0043
0044
0045 public T setPropertiesFile(String path) {
0046 checkNotNull(path, "path");
0047 builder.setPropertiesFile(path);
0048 return self();
0049 }
0050
0051
0052
0053
0054
0055
0056
0057
0058 public T setConf(String key, String value) {
0059 checkNotNull(key, "key");
0060 checkNotNull(value, "value");
0061 checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
0062 builder.conf.put(key, value);
0063 return self();
0064 }
0065
0066
0067
0068
0069
0070
0071
0072 public T setAppName(String appName) {
0073 checkNotNull(appName, "appName");
0074 builder.appName = appName;
0075 return self();
0076 }
0077
0078
0079
0080
0081
0082
0083
0084 public T setMaster(String master) {
0085 checkNotNull(master, "master");
0086 builder.master = master;
0087 return self();
0088 }
0089
0090
0091
0092
0093
0094
0095
0096 public T setDeployMode(String mode) {
0097 checkNotNull(mode, "mode");
0098 builder.deployMode = mode;
0099 return self();
0100 }
0101
0102
0103
0104
0105
0106
0107
0108
0109 public T setAppResource(String resource) {
0110 checkNotNull(resource, "resource");
0111 builder.appResource = resource;
0112 return self();
0113 }
0114
0115
0116
0117
0118
0119
0120
0121 public T setMainClass(String mainClass) {
0122 checkNotNull(mainClass, "mainClass");
0123 builder.mainClass = mainClass;
0124 return self();
0125 }
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139 public T addSparkArg(String arg) {
0140 SparkSubmitOptionParser validator = new ArgumentValidator(false);
0141 validator.parse(Arrays.asList(arg));
0142 builder.userArgs.add(arg);
0143 return self();
0144 }
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158
0159
0160
0161
0162 public T addSparkArg(String name, String value) {
0163 SparkSubmitOptionParser validator = new ArgumentValidator(true);
0164 if (validator.MASTER.equals(name)) {
0165 setMaster(value);
0166 } else if (validator.PROPERTIES_FILE.equals(name)) {
0167 setPropertiesFile(value);
0168 } else if (validator.CONF.equals(name)) {
0169 String[] vals = value.split("=", 2);
0170 setConf(vals[0], vals[1]);
0171 } else if (validator.CLASS.equals(name)) {
0172 setMainClass(value);
0173 } else if (validator.JARS.equals(name)) {
0174 builder.jars.clear();
0175 for (String jar : value.split(",")) {
0176 addJar(jar);
0177 }
0178 } else if (validator.FILES.equals(name)) {
0179 builder.files.clear();
0180 for (String file : value.split(",")) {
0181 addFile(file);
0182 }
0183 } else if (validator.PY_FILES.equals(name)) {
0184 builder.pyFiles.clear();
0185 for (String file : value.split(",")) {
0186 addPyFile(file);
0187 }
0188 } else {
0189 validator.parse(Arrays.asList(name, value));
0190 builder.userArgs.add(name);
0191 builder.userArgs.add(value);
0192 }
0193 return self();
0194 }
0195
0196
0197
0198
0199
0200
0201
0202 public T addAppArgs(String... args) {
0203 for (String arg : args) {
0204 checkNotNull(arg, "arg");
0205 builder.appArgs.add(arg);
0206 }
0207 return self();
0208 }
0209
0210
0211
0212
0213
0214
0215
0216 public T addJar(String jar) {
0217 checkNotNull(jar, "jar");
0218 builder.jars.add(jar);
0219 return self();
0220 }
0221
0222
0223
0224
0225
0226
0227
0228 public T addFile(String file) {
0229 checkNotNull(file, "file");
0230 builder.files.add(file);
0231 return self();
0232 }
0233
0234
0235
0236
0237
0238
0239
0240 public T addPyFile(String file) {
0241 checkNotNull(file, "file");
0242 builder.pyFiles.add(file);
0243 return self();
0244 }
0245
0246
0247
0248
0249
0250
0251
0252 public T setVerbose(boolean verbose) {
0253 builder.verbose = verbose;
0254 return self();
0255 }
0256
0257
0258
0259
0260
0261
0262
0263
0264
0265
0266
0267
0268
0269
0270
0271
0272
0273
0274 public abstract SparkAppHandle startApplication(SparkAppHandle.Listener... listeners)
0275 throws IOException;
0276
0277 abstract T self();
0278
0279 private static class ArgumentValidator extends SparkSubmitOptionParser {
0280
0281 private final boolean hasValue;
0282
0283 ArgumentValidator(boolean hasValue) {
0284 this.hasValue = hasValue;
0285 }
0286
0287 @Override
0288 protected boolean handle(String opt, String value) {
0289 if (value == null && hasValue) {
0290 throw new IllegalArgumentException(String.format("'%s' expects a value.", opt));
0291 }
0292 return true;
0293 }
0294
0295 @Override
0296 protected boolean handleUnknown(String opt) {
0297
0298 return true;
0299 }
0300
0301 protected void handleExtraArgs(List<String> extra) {
0302
0303 }
0304
0305 }
0306
0307 }