0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 from collections import namedtuple
0019
0020 __all__ = ["SparkJobInfo", "SparkStageInfo", "StatusTracker"]
0021
0022
0023 class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")):
0024 """
0025 Exposes information about Spark Jobs.
0026 """
0027
0028
0029 class SparkStageInfo(namedtuple("SparkStageInfo",
0030 "stageId currentAttemptId name numTasks numActiveTasks "
0031 "numCompletedTasks numFailedTasks")):
0032 """
0033 Exposes information about Spark Stages.
0034 """
0035
0036
0037 class StatusTracker(object):
0038 """
0039 Low-level status reporting APIs for monitoring job and stage progress.
0040
0041 These APIs intentionally provide very weak consistency semantics;
0042 consumers of these APIs should be prepared to handle empty / missing
0043 information. For example, a job's stage ids may be known but the status
0044 API may not have any information about the details of those stages, so
0045 `getStageInfo` could potentially return `None` for a valid stage id.
0046
0047 To limit memory usage, these APIs only provide information on recent
0048 jobs / stages. These APIs will provide information for the last
0049 `spark.ui.retainedStages` stages and `spark.ui.retainedJobs` jobs.
0050 """
0051 def __init__(self, jtracker):
0052 self._jtracker = jtracker
0053
0054 def getJobIdsForGroup(self, jobGroup=None):
0055 """
0056 Return a list of all known jobs in a particular job group. If
0057 `jobGroup` is None, then returns all known jobs that are not
0058 associated with a job group.
0059
0060 The returned list may contain running, failed, and completed jobs,
0061 and may vary across invocations of this method. This method does
0062 not guarantee the order of the elements in its result.
0063 """
0064 return list(self._jtracker.getJobIdsForGroup(jobGroup))
0065
0066 def getActiveStageIds(self):
0067 """
0068 Returns an array containing the ids of all active stages.
0069 """
0070 return sorted(list(self._jtracker.getActiveStageIds()))
0071
0072 def getActiveJobsIds(self):
0073 """
0074 Returns an array containing the ids of all active jobs.
0075 """
0076 return sorted((list(self._jtracker.getActiveJobIds())))
0077
0078 def getJobInfo(self, jobId):
0079 """
0080 Returns a :class:`SparkJobInfo` object, or None if the job info
0081 could not be found or was garbage collected.
0082 """
0083 job = self._jtracker.getJobInfo(jobId)
0084 if job is not None:
0085 return SparkJobInfo(jobId, job.stageIds(), str(job.status()))
0086
0087 def getStageInfo(self, stageId):
0088 """
0089 Returns a :class:`SparkStageInfo` object, or None if the stage
0090 info could not be found or was garbage collected.
0091 """
0092 stage = self._jtracker.getStageInfo(stageId)
0093 if stage is not None:
0094
0095 attrs = [getattr(stage, f)() for f in SparkStageInfo._fields[1:]]
0096 return SparkStageInfo(stageId, *attrs)