0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 import re
0020 import sys
0021 import traceback
0022 import os
0023 import warnings
0024 import inspect
0025 from py4j.protocol import Py4JJavaError
0026
0027 __all__ = []
0028
0029
0030 def _exception_message(excp):
0031 """Return the message from an exception as either a str or unicode object. Supports both
0032 Python 2 and Python 3.
0033
0034 >>> msg = "Exception message"
0035 >>> excp = Exception(msg)
0036 >>> msg == _exception_message(excp)
0037 True
0038
0039 >>> msg = u"unicöde"
0040 >>> excp = Exception(msg)
0041 >>> msg == _exception_message(excp)
0042 True
0043 """
0044 if isinstance(excp, Py4JJavaError):
0045
0046
0047
0048
0049 return excp.__str__()
0050 if hasattr(excp, "message"):
0051 return excp.message
0052 return str(excp)
0053
0054
0055 def _get_argspec(f):
0056 """
0057 Get argspec of a function. Supports both Python 2 and Python 3.
0058 """
0059 if sys.version_info[0] < 3:
0060 argspec = inspect.getargspec(f)
0061 else:
0062
0063
0064 argspec = inspect.getfullargspec(f)
0065 return argspec
0066
0067
0068 def print_exec(stream):
0069 ei = sys.exc_info()
0070 traceback.print_exception(ei[0], ei[1], ei[2], None, stream)
0071
0072
0073 class VersionUtils(object):
0074 """
0075 Provides utility method to determine Spark versions with given input string.
0076 """
0077 @staticmethod
0078 def majorMinorVersion(sparkVersion):
0079 """
0080 Given a Spark version string, return the (major version number, minor version number).
0081 E.g., for 2.0.1-SNAPSHOT, return (2, 0).
0082
0083 >>> sparkVersion = "2.4.0"
0084 >>> VersionUtils.majorMinorVersion(sparkVersion)
0085 (2, 4)
0086 >>> sparkVersion = "2.3.0-SNAPSHOT"
0087 >>> VersionUtils.majorMinorVersion(sparkVersion)
0088 (2, 3)
0089
0090 """
0091 m = re.search(r'^(\d+)\.(\d+)(\..*)?$', sparkVersion)
0092 if m is not None:
0093 return (int(m.group(1)), int(m.group(2)))
0094 else:
0095 raise ValueError("Spark tried to parse '%s' as a Spark" % sparkVersion +
0096 " version string, but it could not find the major and minor" +
0097 " version numbers.")
0098
0099
0100 def fail_on_stopiteration(f):
0101 """
0102 Wraps the input function to fail on 'StopIteration' by raising a 'RuntimeError'
0103 prevents silent loss of data when 'f' is used in a for loop in Spark code
0104 """
0105 def wrapper(*args, **kwargs):
0106 try:
0107 return f(*args, **kwargs)
0108 except StopIteration as exc:
0109 raise RuntimeError(
0110 "Caught StopIteration thrown from user's code; failing the task",
0111 exc
0112 )
0113
0114 return wrapper
0115
0116
0117 def _warn_pin_thread(name):
0118 if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
0119 msg = (
0120 "PYSPARK_PIN_THREAD feature is enabled. "
0121 "However, note that it cannot inherit the local properties from the parent thread "
0122 "although it isolates each thread on PVM and JVM with its own local properties. "
0123 "\n"
0124 "To work around this, you should manually copy and set the local properties from "
0125 "the parent thread to the child thread when you create another thread.")
0126 else:
0127 msg = (
0128 "Currently, '%s' (set to local properties) with multiple threads does "
0129 "not properly work. "
0130 "\n"
0131 "Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
0132 "for multiple threads on PVM, which fails to isolate local properties for each "
0133 "thread on PVM. "
0134 "\n"
0135 "To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
0136 "However, note that it cannot inherit the local properties from the parent thread "
0137 "although it isolates each thread on PVM and JVM with its own local properties. "
0138 "\n"
0139 "To work around this, you should manually copy and set the local properties from "
0140 "the parent thread to the child thread when you create another thread." % name)
0141 warnings.warn(msg, UserWarning)
0142
0143
0144 def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version):
0145 print("""
0146 ________________________________________________________________________________________________
0147
0148 Spark %(lib_name)s libraries not found in class path. Try one of the following.
0149
0150 1. Include the %(lib_name)s library and its dependencies with in the
0151 spark-submit command as
0152
0153 $ bin/spark-submit --packages org.apache.spark:spark-%(pkg_name)s:%(spark_version)s ...
0154
0155 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
0156 Group Id = org.apache.spark, Artifact Id = spark-%(jar_name)s, Version = %(spark_version)s.
0157 Then, include the jar in the spark-submit command as
0158
0159 $ bin/spark-submit --jars <spark-%(jar_name)s.jar> ...
0160
0161 ________________________________________________________________________________________________
0162
0163 """ % {
0164 "lib_name": lib_name,
0165 "pkg_name": pkg_name,
0166 "jar_name": jar_name,
0167 "spark_version": spark_version
0168 })
0169
0170
0171 if __name__ == "__main__":
0172 import doctest
0173 (failure_count, test_count) = doctest.testmod()
0174 if failure_count:
0175 sys.exit(-1)