Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import cProfile
0019 import pstats
0020 import os
0021 import atexit
0022 import sys
0023 
0024 from pyspark.accumulators import AccumulatorParam
0025 
0026 
0027 class ProfilerCollector(object):
0028     """
0029     This class keeps track of different profilers on a per
0030     stage basis. Also this is used to create new profilers for
0031     the different stages.
0032     """
0033 
0034     def __init__(self, profiler_cls, dump_path=None):
0035         self.profiler_cls = profiler_cls
0036         self.profile_dump_path = dump_path
0037         self.profilers = []
0038 
0039     def new_profiler(self, ctx):
0040         """ Create a new profiler using class `profiler_cls` """
0041         return self.profiler_cls(ctx)
0042 
0043     def add_profiler(self, id, profiler):
0044         """ Add a profiler for RDD `id` """
0045         if not self.profilers:
0046             if self.profile_dump_path:
0047                 atexit.register(self.dump_profiles, self.profile_dump_path)
0048             else:
0049                 atexit.register(self.show_profiles)
0050 
0051         self.profilers.append([id, profiler, False])
0052 
0053     def dump_profiles(self, path):
0054         """ Dump the profile stats into directory `path` """
0055         for id, profiler, _ in self.profilers:
0056             profiler.dump(id, path)
0057         self.profilers = []
0058 
0059     def show_profiles(self):
0060         """ Print the profile stats to stdout """
0061         for i, (id, profiler, showed) in enumerate(self.profilers):
0062             if not showed and profiler:
0063                 profiler.show(id)
0064                 # mark it as showed
0065                 self.profilers[i][2] = True
0066 
0067 
0068 class Profiler(object):
0069     """
0070     .. note:: DeveloperApi
0071 
0072     PySpark supports custom profilers, this is to allow for different profilers to
0073     be used as well as outputting to different formats than what is provided in the
0074     BasicProfiler.
0075 
0076     A custom profiler has to define or inherit the following methods:
0077         profile - will produce a system profile of some sort.
0078         stats - return the collected stats.
0079         dump - dumps the profiles to a path
0080         add - adds a profile to the existing accumulated profile
0081 
0082     The profiler class is chosen when creating a SparkContext
0083 
0084     >>> from pyspark import SparkConf, SparkContext
0085     >>> from pyspark import BasicProfiler
0086     >>> class MyCustomProfiler(BasicProfiler):
0087     ...     def show(self, id):
0088     ...         print("My custom profiles for RDD:%s" % id)
0089     ...
0090     >>> conf = SparkConf().set("spark.python.profile", "true")
0091     >>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
0092     >>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
0093     [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
0094     >>> sc.parallelize(range(1000)).count()
0095     1000
0096     >>> sc.show_profiles()
0097     My custom profiles for RDD:1
0098     My custom profiles for RDD:3
0099     >>> sc.stop()
0100     """
0101 
0102     def __init__(self, ctx):
0103         pass
0104 
0105     def profile(self, func):
0106         """ Do profiling on the function `func`"""
0107         raise NotImplementedError
0108 
0109     def stats(self):
0110         """ Return the collected profiling stats (pstats.Stats)"""
0111         raise NotImplementedError
0112 
0113     def show(self, id):
0114         """ Print the profile stats to stdout, id is the RDD id """
0115         stats = self.stats()
0116         if stats:
0117             print("=" * 60)
0118             print("Profile of RDD<id=%d>" % id)
0119             print("=" * 60)
0120             stats.sort_stats("time", "cumulative").print_stats()
0121 
0122     def dump(self, id, path):
0123         """ Dump the profile into path, id is the RDD id """
0124         if not os.path.exists(path):
0125             os.makedirs(path)
0126         stats = self.stats()
0127         if stats:
0128             p = os.path.join(path, "rdd_%d.pstats" % id)
0129             stats.dump_stats(p)
0130 
0131 
0132 class PStatsParam(AccumulatorParam):
0133     """PStatsParam is used to merge pstats.Stats"""
0134 
0135     @staticmethod
0136     def zero(value):
0137         return None
0138 
0139     @staticmethod
0140     def addInPlace(value1, value2):
0141         if value1 is None:
0142             return value2
0143         value1.add(value2)
0144         return value1
0145 
0146 
0147 class BasicProfiler(Profiler):
0148     """
0149     BasicProfiler is the default profiler, which is implemented based on
0150     cProfile and Accumulator
0151     """
0152     def __init__(self, ctx):
0153         Profiler.__init__(self, ctx)
0154         # Creates a new accumulator for combining the profiles of different
0155         # partitions of a stage
0156         self._accumulator = ctx.accumulator(None, PStatsParam)
0157 
0158     def profile(self, func):
0159         """ Runs and profiles the method to_profile passed in. A profile object is returned. """
0160         pr = cProfile.Profile()
0161         pr.runcall(func)
0162         st = pstats.Stats(pr)
0163         st.stream = None  # make it picklable
0164         st.strip_dirs()
0165 
0166         # Adds a new profile to the existing accumulated value
0167         self._accumulator.add(st)
0168 
0169     def stats(self):
0170         return self._accumulator.value
0171 
0172 
0173 if __name__ == "__main__":
0174     import doctest
0175     (failure_count, test_count) = doctest.testmod()
0176     if failure_count:
0177         sys.exit(-1)