Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 >>> from pyspark.conf import SparkConf
0020 >>> from pyspark.context import SparkContext
0021 >>> conf = SparkConf()
0022 >>> conf.setMaster("local").setAppName("My app")
0023 <pyspark.conf.SparkConf object at ...>
0024 >>> conf.get("spark.master")
0025 u'local'
0026 >>> conf.get("spark.app.name")
0027 u'My app'
0028 >>> sc = SparkContext(conf=conf)
0029 >>> sc.master
0030 u'local'
0031 >>> sc.appName
0032 u'My app'
0033 >>> sc.sparkHome is None
0034 True
0035 
0036 >>> conf = SparkConf(loadDefaults=False)
0037 >>> conf.setSparkHome("/path")
0038 <pyspark.conf.SparkConf object at ...>
0039 >>> conf.get("spark.home")
0040 u'/path'
0041 >>> conf.setExecutorEnv("VAR1", "value1")
0042 <pyspark.conf.SparkConf object at ...>
0043 >>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")])
0044 <pyspark.conf.SparkConf object at ...>
0045 >>> conf.get("spark.executorEnv.VAR1")
0046 u'value1'
0047 >>> print(conf.toDebugString())
0048 spark.executorEnv.VAR1=value1
0049 spark.executorEnv.VAR3=value3
0050 spark.executorEnv.VAR4=value4
0051 spark.home=/path
0052 >>> sorted(conf.getAll(), key=lambda p: p[0])
0053 [(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), \
0054 (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
0055 >>> conf._jconf.setExecutorEnv("VAR5", "value5")
0056 JavaObject id...
0057 >>> print(conf.toDebugString())
0058 spark.executorEnv.VAR1=value1
0059 spark.executorEnv.VAR3=value3
0060 spark.executorEnv.VAR4=value4
0061 spark.executorEnv.VAR5=value5
0062 spark.home=/path
0063 """
0064 
0065 __all__ = ['SparkConf']
0066 
0067 import sys
0068 import re
0069 
0070 if sys.version > '3':
0071     unicode = str
0072     __doc__ = re.sub(r"(\W|^)[uU](['])", r'\1\2', __doc__)
0073 
0074 
0075 class SparkConf(object):
0076 
0077     """
0078     Configuration for a Spark application. Used to set various Spark
0079     parameters as key-value pairs.
0080 
0081     Most of the time, you would create a SparkConf object with
0082     ``SparkConf()``, which will load values from `spark.*` Java system
0083     properties as well. In this case, any parameters you set directly on
0084     the :class:`SparkConf` object take priority over system properties.
0085 
0086     For unit tests, you can also call ``SparkConf(false)`` to skip
0087     loading external settings and get the same configuration no matter
0088     what the system properties are.
0089 
0090     All setter methods in this class support chaining. For example,
0091     you can write ``conf.setMaster("local").setAppName("My app")``.
0092 
0093     .. note:: Once a SparkConf object is passed to Spark, it is cloned
0094         and can no longer be modified by the user.
0095     """
0096 
0097     def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
0098         """
0099         Create a new Spark configuration.
0100 
0101         :param loadDefaults: whether to load values from Java system
0102                properties (True by default)
0103         :param _jvm: internal parameter used to pass a handle to the
0104                Java VM; does not need to be set by users
0105         :param _jconf: Optionally pass in an existing SparkConf handle
0106                to use its parameters
0107         """
0108         if _jconf:
0109             self._jconf = _jconf
0110         else:
0111             from pyspark.context import SparkContext
0112             _jvm = _jvm or SparkContext._jvm
0113 
0114             if _jvm is not None:
0115                 # JVM is created, so create self._jconf directly through JVM
0116                 self._jconf = _jvm.SparkConf(loadDefaults)
0117                 self._conf = None
0118             else:
0119                 # JVM is not created, so store data in self._conf first
0120                 self._jconf = None
0121                 self._conf = {}
0122 
0123     def set(self, key, value):
0124         """Set a configuration property."""
0125         # Try to set self._jconf first if JVM is created, set self._conf if JVM is not created yet.
0126         if self._jconf is not None:
0127             self._jconf.set(key, unicode(value))
0128         else:
0129             self._conf[key] = unicode(value)
0130         return self
0131 
0132     def setIfMissing(self, key, value):
0133         """Set a configuration property, if not already set."""
0134         if self.get(key) is None:
0135             self.set(key, value)
0136         return self
0137 
0138     def setMaster(self, value):
0139         """Set master URL to connect to."""
0140         self.set("spark.master", value)
0141         return self
0142 
0143     def setAppName(self, value):
0144         """Set application name."""
0145         self.set("spark.app.name", value)
0146         return self
0147 
0148     def setSparkHome(self, value):
0149         """Set path where Spark is installed on worker nodes."""
0150         self.set("spark.home", value)
0151         return self
0152 
0153     def setExecutorEnv(self, key=None, value=None, pairs=None):
0154         """Set an environment variable to be passed to executors."""
0155         if (key is not None and pairs is not None) or (key is None and pairs is None):
0156             raise Exception("Either pass one key-value pair or a list of pairs")
0157         elif key is not None:
0158             self.set("spark.executorEnv." + key, value)
0159         elif pairs is not None:
0160             for (k, v) in pairs:
0161                 self.set("spark.executorEnv." + k, v)
0162         return self
0163 
0164     def setAll(self, pairs):
0165         """
0166         Set multiple parameters, passed as a list of key-value pairs.
0167 
0168         :param pairs: list of key-value pairs to set
0169         """
0170         for (k, v) in pairs:
0171             self.set(k, v)
0172         return self
0173 
0174     def get(self, key, defaultValue=None):
0175         """Get the configured value for some key, or return a default otherwise."""
0176         if defaultValue is None:   # Py4J doesn't call the right get() if we pass None
0177             if self._jconf is not None:
0178                 if not self._jconf.contains(key):
0179                     return None
0180                 return self._jconf.get(key)
0181             else:
0182                 if key not in self._conf:
0183                     return None
0184                 return self._conf[key]
0185         else:
0186             if self._jconf is not None:
0187                 return self._jconf.get(key, defaultValue)
0188             else:
0189                 return self._conf.get(key, defaultValue)
0190 
0191     def getAll(self):
0192         """Get all values as a list of key-value pairs."""
0193         if self._jconf is not None:
0194             return [(elem._1(), elem._2()) for elem in self._jconf.getAll()]
0195         else:
0196             return self._conf.items()
0197 
0198     def contains(self, key):
0199         """Does this configuration contain a given key?"""
0200         if self._jconf is not None:
0201             return self._jconf.contains(key)
0202         else:
0203             return key in self._conf
0204 
0205     def toDebugString(self):
0206         """
0207         Returns a printable version of the configuration, as a list of
0208         key=value pairs, one per line.
0209         """
0210         if self._jconf is not None:
0211             return self._jconf.toDebugString()
0212         else:
0213             return '\n'.join('%s=%s' % (k, v) for k, v in self._conf.items())
0214 
0215 
0216 def _test():
0217     import doctest
0218     (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
0219     if failure_count:
0220         sys.exit(-1)
0221 
0222 
0223 if __name__ == "__main__":
0224     _test()