0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 >>> from pyspark.conf import SparkConf
0020 >>> from pyspark.context import SparkContext
0021 >>> conf = SparkConf()
0022 >>> conf.setMaster("local").setAppName("My app")
0023 <pyspark.conf.SparkConf object at ...>
0024 >>> conf.get("spark.master")
0025 u'local'
0026 >>> conf.get("spark.app.name")
0027 u'My app'
0028 >>> sc = SparkContext(conf=conf)
0029 >>> sc.master
0030 u'local'
0031 >>> sc.appName
0032 u'My app'
0033 >>> sc.sparkHome is None
0034 True
0035
0036 >>> conf = SparkConf(loadDefaults=False)
0037 >>> conf.setSparkHome("/path")
0038 <pyspark.conf.SparkConf object at ...>
0039 >>> conf.get("spark.home")
0040 u'/path'
0041 >>> conf.setExecutorEnv("VAR1", "value1")
0042 <pyspark.conf.SparkConf object at ...>
0043 >>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")])
0044 <pyspark.conf.SparkConf object at ...>
0045 >>> conf.get("spark.executorEnv.VAR1")
0046 u'value1'
0047 >>> print(conf.toDebugString())
0048 spark.executorEnv.VAR1=value1
0049 spark.executorEnv.VAR3=value3
0050 spark.executorEnv.VAR4=value4
0051 spark.home=/path
0052 >>> sorted(conf.getAll(), key=lambda p: p[0])
0053 [(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), \
0054 (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
0055 >>> conf._jconf.setExecutorEnv("VAR5", "value5")
0056 JavaObject id...
0057 >>> print(conf.toDebugString())
0058 spark.executorEnv.VAR1=value1
0059 spark.executorEnv.VAR3=value3
0060 spark.executorEnv.VAR4=value4
0061 spark.executorEnv.VAR5=value5
0062 spark.home=/path
0063 """
0064
0065 __all__ = ['SparkConf']
0066
0067 import sys
0068 import re
0069
0070 if sys.version > '3':
0071 unicode = str
0072 __doc__ = re.sub(r"(\W|^)[uU](['])", r'\1\2', __doc__)
0073
0074
0075 class SparkConf(object):
0076
0077 """
0078 Configuration for a Spark application. Used to set various Spark
0079 parameters as key-value pairs.
0080
0081 Most of the time, you would create a SparkConf object with
0082 ``SparkConf()``, which will load values from `spark.*` Java system
0083 properties as well. In this case, any parameters you set directly on
0084 the :class:`SparkConf` object take priority over system properties.
0085
0086 For unit tests, you can also call ``SparkConf(false)`` to skip
0087 loading external settings and get the same configuration no matter
0088 what the system properties are.
0089
0090 All setter methods in this class support chaining. For example,
0091 you can write ``conf.setMaster("local").setAppName("My app")``.
0092
0093 .. note:: Once a SparkConf object is passed to Spark, it is cloned
0094 and can no longer be modified by the user.
0095 """
0096
0097 def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
0098 """
0099 Create a new Spark configuration.
0100
0101 :param loadDefaults: whether to load values from Java system
0102 properties (True by default)
0103 :param _jvm: internal parameter used to pass a handle to the
0104 Java VM; does not need to be set by users
0105 :param _jconf: Optionally pass in an existing SparkConf handle
0106 to use its parameters
0107 """
0108 if _jconf:
0109 self._jconf = _jconf
0110 else:
0111 from pyspark.context import SparkContext
0112 _jvm = _jvm or SparkContext._jvm
0113
0114 if _jvm is not None:
0115
0116 self._jconf = _jvm.SparkConf(loadDefaults)
0117 self._conf = None
0118 else:
0119
0120 self._jconf = None
0121 self._conf = {}
0122
0123 def set(self, key, value):
0124 """Set a configuration property."""
0125
0126 if self._jconf is not None:
0127 self._jconf.set(key, unicode(value))
0128 else:
0129 self._conf[key] = unicode(value)
0130 return self
0131
0132 def setIfMissing(self, key, value):
0133 """Set a configuration property, if not already set."""
0134 if self.get(key) is None:
0135 self.set(key, value)
0136 return self
0137
0138 def setMaster(self, value):
0139 """Set master URL to connect to."""
0140 self.set("spark.master", value)
0141 return self
0142
0143 def setAppName(self, value):
0144 """Set application name."""
0145 self.set("spark.app.name", value)
0146 return self
0147
0148 def setSparkHome(self, value):
0149 """Set path where Spark is installed on worker nodes."""
0150 self.set("spark.home", value)
0151 return self
0152
0153 def setExecutorEnv(self, key=None, value=None, pairs=None):
0154 """Set an environment variable to be passed to executors."""
0155 if (key is not None and pairs is not None) or (key is None and pairs is None):
0156 raise Exception("Either pass one key-value pair or a list of pairs")
0157 elif key is not None:
0158 self.set("spark.executorEnv." + key, value)
0159 elif pairs is not None:
0160 for (k, v) in pairs:
0161 self.set("spark.executorEnv." + k, v)
0162 return self
0163
0164 def setAll(self, pairs):
0165 """
0166 Set multiple parameters, passed as a list of key-value pairs.
0167
0168 :param pairs: list of key-value pairs to set
0169 """
0170 for (k, v) in pairs:
0171 self.set(k, v)
0172 return self
0173
0174 def get(self, key, defaultValue=None):
0175 """Get the configured value for some key, or return a default otherwise."""
0176 if defaultValue is None:
0177 if self._jconf is not None:
0178 if not self._jconf.contains(key):
0179 return None
0180 return self._jconf.get(key)
0181 else:
0182 if key not in self._conf:
0183 return None
0184 return self._conf[key]
0185 else:
0186 if self._jconf is not None:
0187 return self._jconf.get(key, defaultValue)
0188 else:
0189 return self._conf.get(key, defaultValue)
0190
0191 def getAll(self):
0192 """Get all values as a list of key-value pairs."""
0193 if self._jconf is not None:
0194 return [(elem._1(), elem._2()) for elem in self._jconf.getAll()]
0195 else:
0196 return self._conf.items()
0197
0198 def contains(self, key):
0199 """Does this configuration contain a given key?"""
0200 if self._jconf is not None:
0201 return self._jconf.contains(key)
0202 else:
0203 return key in self._conf
0204
0205 def toDebugString(self):
0206 """
0207 Returns a printable version of the configuration, as a list of
0208 key=value pairs, one per line.
0209 """
0210 if self._jconf is not None:
0211 return self._jconf.toDebugString()
0212 else:
0213 return '\n'.join('%s=%s' % (k, v) for k, v in self._conf.items())
0214
0215
0216 def _test():
0217 import doctest
0218 (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
0219 if failure_count:
0220 sys.exit(-1)
0221
0222
0223 if __name__ == "__main__":
0224 _test()