0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import cProfile
0019 import pstats
0020 import os
0021 import atexit
0022 import sys
0023
0024 from pyspark.accumulators import AccumulatorParam
0025
0026
0027 class ProfilerCollector(object):
0028 """
0029 This class keeps track of different profilers on a per
0030 stage basis. Also this is used to create new profilers for
0031 the different stages.
0032 """
0033
0034 def __init__(self, profiler_cls, dump_path=None):
0035 self.profiler_cls = profiler_cls
0036 self.profile_dump_path = dump_path
0037 self.profilers = []
0038
0039 def new_profiler(self, ctx):
0040 """ Create a new profiler using class `profiler_cls` """
0041 return self.profiler_cls(ctx)
0042
0043 def add_profiler(self, id, profiler):
0044 """ Add a profiler for RDD `id` """
0045 if not self.profilers:
0046 if self.profile_dump_path:
0047 atexit.register(self.dump_profiles, self.profile_dump_path)
0048 else:
0049 atexit.register(self.show_profiles)
0050
0051 self.profilers.append([id, profiler, False])
0052
0053 def dump_profiles(self, path):
0054 """ Dump the profile stats into directory `path` """
0055 for id, profiler, _ in self.profilers:
0056 profiler.dump(id, path)
0057 self.profilers = []
0058
0059 def show_profiles(self):
0060 """ Print the profile stats to stdout """
0061 for i, (id, profiler, showed) in enumerate(self.profilers):
0062 if not showed and profiler:
0063 profiler.show(id)
0064
0065 self.profilers[i][2] = True
0066
0067
0068 class Profiler(object):
0069 """
0070 .. note:: DeveloperApi
0071
0072 PySpark supports custom profilers, this is to allow for different profilers to
0073 be used as well as outputting to different formats than what is provided in the
0074 BasicProfiler.
0075
0076 A custom profiler has to define or inherit the following methods:
0077 profile - will produce a system profile of some sort.
0078 stats - return the collected stats.
0079 dump - dumps the profiles to a path
0080 add - adds a profile to the existing accumulated profile
0081
0082 The profiler class is chosen when creating a SparkContext
0083
0084 >>> from pyspark import SparkConf, SparkContext
0085 >>> from pyspark import BasicProfiler
0086 >>> class MyCustomProfiler(BasicProfiler):
0087 ... def show(self, id):
0088 ... print("My custom profiles for RDD:%s" % id)
0089 ...
0090 >>> conf = SparkConf().set("spark.python.profile", "true")
0091 >>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
0092 >>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
0093 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
0094 >>> sc.parallelize(range(1000)).count()
0095 1000
0096 >>> sc.show_profiles()
0097 My custom profiles for RDD:1
0098 My custom profiles for RDD:3
0099 >>> sc.stop()
0100 """
0101
0102 def __init__(self, ctx):
0103 pass
0104
0105 def profile(self, func):
0106 """ Do profiling on the function `func`"""
0107 raise NotImplementedError
0108
0109 def stats(self):
0110 """ Return the collected profiling stats (pstats.Stats)"""
0111 raise NotImplementedError
0112
0113 def show(self, id):
0114 """ Print the profile stats to stdout, id is the RDD id """
0115 stats = self.stats()
0116 if stats:
0117 print("=" * 60)
0118 print("Profile of RDD<id=%d>" % id)
0119 print("=" * 60)
0120 stats.sort_stats("time", "cumulative").print_stats()
0121
0122 def dump(self, id, path):
0123 """ Dump the profile into path, id is the RDD id """
0124 if not os.path.exists(path):
0125 os.makedirs(path)
0126 stats = self.stats()
0127 if stats:
0128 p = os.path.join(path, "rdd_%d.pstats" % id)
0129 stats.dump_stats(p)
0130
0131
0132 class PStatsParam(AccumulatorParam):
0133 """PStatsParam is used to merge pstats.Stats"""
0134
0135 @staticmethod
0136 def zero(value):
0137 return None
0138
0139 @staticmethod
0140 def addInPlace(value1, value2):
0141 if value1 is None:
0142 return value2
0143 value1.add(value2)
0144 return value1
0145
0146
0147 class BasicProfiler(Profiler):
0148 """
0149 BasicProfiler is the default profiler, which is implemented based on
0150 cProfile and Accumulator
0151 """
0152 def __init__(self, ctx):
0153 Profiler.__init__(self, ctx)
0154
0155
0156 self._accumulator = ctx.accumulator(None, PStatsParam)
0157
0158 def profile(self, func):
0159 """ Runs and profiles the method to_profile passed in. A profile object is returned. """
0160 pr = cProfile.Profile()
0161 pr.runcall(func)
0162 st = pstats.Stats(pr)
0163 st.stream = None
0164 st.strip_dirs()
0165
0166
0167 self._accumulator.add(st)
0168
0169 def stats(self):
0170 return self._accumulator.value
0171
0172
0173 if __name__ == "__main__":
0174 import doctest
0175 (failure_count, test_count) = doctest.testmod()
0176 if failure_count:
0177 sys.exit(-1)