Back to home page

OSCL-LXR

 
 

    


0001 # -*- coding: utf-8 -*-
0002 #
0003 # Licensed to the Apache Software Foundation (ASF) under one or more
0004 # contributor license agreements.  See the NOTICE file distributed with
0005 # this work for additional information regarding copyright ownership.
0006 # The ASF licenses this file to You under the Apache License, Version 2.0
0007 # (the "License"); you may not use this file except in compliance with
0008 # the License.  You may obtain a copy of the License at
0009 #
0010 #    http://www.apache.org/licenses/LICENSE-2.0
0011 #
0012 # Unless required by applicable law or agreed to in writing, software
0013 # distributed under the License is distributed on an "AS IS" BASIS,
0014 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015 # See the License for the specific language governing permissions and
0016 # limitations under the License.
0017 #
0018 
0019 import re
0020 import sys
0021 import traceback
0022 import os
0023 import warnings
0024 import inspect
0025 from py4j.protocol import Py4JJavaError
0026 
0027 __all__ = []
0028 
0029 
0030 def _exception_message(excp):
0031     """Return the message from an exception as either a str or unicode object.  Supports both
0032     Python 2 and Python 3.
0033 
0034     >>> msg = "Exception message"
0035     >>> excp = Exception(msg)
0036     >>> msg == _exception_message(excp)
0037     True
0038 
0039     >>> msg = u"unicöde"
0040     >>> excp = Exception(msg)
0041     >>> msg == _exception_message(excp)
0042     True
0043     """
0044     if isinstance(excp, Py4JJavaError):
0045         # 'Py4JJavaError' doesn't contain the stack trace available on the Java side in 'message'
0046         # attribute in Python 2. We should call 'str' function on this exception in general but
0047         # 'Py4JJavaError' has an issue about addressing non-ascii strings. So, here we work
0048         # around by the direct call, '__str__()'. Please see SPARK-23517.
0049         return excp.__str__()
0050     if hasattr(excp, "message"):
0051         return excp.message
0052     return str(excp)
0053 
0054 
0055 def _get_argspec(f):
0056     """
0057     Get argspec of a function. Supports both Python 2 and Python 3.
0058     """
0059     if sys.version_info[0] < 3:
0060         argspec = inspect.getargspec(f)
0061     else:
0062         # `getargspec` is deprecated since python3.0 (incompatible with function annotations).
0063         # See SPARK-23569.
0064         argspec = inspect.getfullargspec(f)
0065     return argspec
0066 
0067 
0068 def print_exec(stream):
0069     ei = sys.exc_info()
0070     traceback.print_exception(ei[0], ei[1], ei[2], None, stream)
0071 
0072 
0073 class VersionUtils(object):
0074     """
0075     Provides utility method to determine Spark versions with given input string.
0076     """
0077     @staticmethod
0078     def majorMinorVersion(sparkVersion):
0079         """
0080         Given a Spark version string, return the (major version number, minor version number).
0081         E.g., for 2.0.1-SNAPSHOT, return (2, 0).
0082 
0083         >>> sparkVersion = "2.4.0"
0084         >>> VersionUtils.majorMinorVersion(sparkVersion)
0085         (2, 4)
0086         >>> sparkVersion = "2.3.0-SNAPSHOT"
0087         >>> VersionUtils.majorMinorVersion(sparkVersion)
0088         (2, 3)
0089 
0090         """
0091         m = re.search(r'^(\d+)\.(\d+)(\..*)?$', sparkVersion)
0092         if m is not None:
0093             return (int(m.group(1)), int(m.group(2)))
0094         else:
0095             raise ValueError("Spark tried to parse '%s' as a Spark" % sparkVersion +
0096                              " version string, but it could not find the major and minor" +
0097                              " version numbers.")
0098 
0099 
0100 def fail_on_stopiteration(f):
0101     """
0102     Wraps the input function to fail on 'StopIteration' by raising a 'RuntimeError'
0103     prevents silent loss of data when 'f' is used in a for loop in Spark code
0104     """
0105     def wrapper(*args, **kwargs):
0106         try:
0107             return f(*args, **kwargs)
0108         except StopIteration as exc:
0109             raise RuntimeError(
0110                 "Caught StopIteration thrown from user's code; failing the task",
0111                 exc
0112             )
0113 
0114     return wrapper
0115 
0116 
0117 def _warn_pin_thread(name):
0118     if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
0119         msg = (
0120             "PYSPARK_PIN_THREAD feature is enabled. "
0121             "However, note that it cannot inherit the local properties from the parent thread "
0122             "although it isolates each thread on PVM and JVM with its own local properties. "
0123             "\n"
0124             "To work around this, you should manually copy and set the local properties from "
0125             "the parent thread to the child thread when you create another thread.")
0126     else:
0127         msg = (
0128             "Currently, '%s' (set to local properties) with multiple threads does "
0129             "not properly work. "
0130             "\n"
0131             "Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
0132             "for multiple threads on PVM, which fails to isolate local properties for each "
0133             "thread on PVM. "
0134             "\n"
0135             "To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
0136             "However, note that it cannot inherit the local properties from the parent thread "
0137             "although it isolates each thread on PVM and JVM with its own local properties. "
0138             "\n"
0139             "To work around this, you should manually copy and set the local properties from "
0140             "the parent thread to the child thread when you create another thread." % name)
0141     warnings.warn(msg, UserWarning)
0142 
0143 
0144 def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version):
0145     print("""
0146 ________________________________________________________________________________________________
0147 
0148   Spark %(lib_name)s libraries not found in class path. Try one of the following.
0149 
0150   1. Include the %(lib_name)s library and its dependencies with in the
0151      spark-submit command as
0152 
0153      $ bin/spark-submit --packages org.apache.spark:spark-%(pkg_name)s:%(spark_version)s ...
0154 
0155   2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
0156      Group Id = org.apache.spark, Artifact Id = spark-%(jar_name)s, Version = %(spark_version)s.
0157      Then, include the jar in the spark-submit command as
0158 
0159      $ bin/spark-submit --jars <spark-%(jar_name)s.jar> ...
0160 
0161 ________________________________________________________________________________________________
0162 
0163 """ % {
0164         "lib_name": lib_name,
0165         "pkg_name": pkg_name,
0166         "jar_name": jar_name,
0167         "spark_version": spark_version
0168     })
0169 
0170 
0171 if __name__ == "__main__":
0172     import doctest
0173     (failure_count, test_count) = doctest.testmod()
0174     if failure_count:
0175         sys.exit(-1)