Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 from collections import namedtuple
0019 
0020 __all__ = ["SparkJobInfo", "SparkStageInfo", "StatusTracker"]
0021 
0022 
0023 class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")):
0024     """
0025     Exposes information about Spark Jobs.
0026     """
0027 
0028 
0029 class SparkStageInfo(namedtuple("SparkStageInfo",
0030                                 "stageId currentAttemptId name numTasks numActiveTasks "
0031                                 "numCompletedTasks numFailedTasks")):
0032     """
0033     Exposes information about Spark Stages.
0034     """
0035 
0036 
0037 class StatusTracker(object):
0038     """
0039     Low-level status reporting APIs for monitoring job and stage progress.
0040 
0041     These APIs intentionally provide very weak consistency semantics;
0042     consumers of these APIs should be prepared to handle empty / missing
0043     information. For example, a job's stage ids may be known but the status
0044     API may not have any information about the details of those stages, so
0045     `getStageInfo` could potentially return `None` for a valid stage id.
0046 
0047     To limit memory usage, these APIs only provide information on recent
0048     jobs / stages.  These APIs will provide information for the last
0049     `spark.ui.retainedStages` stages and `spark.ui.retainedJobs` jobs.
0050     """
0051     def __init__(self, jtracker):
0052         self._jtracker = jtracker
0053 
0054     def getJobIdsForGroup(self, jobGroup=None):
0055         """
0056         Return a list of all known jobs in a particular job group.  If
0057         `jobGroup` is None, then returns all known jobs that are not
0058         associated with a job group.
0059 
0060         The returned list may contain running, failed, and completed jobs,
0061         and may vary across invocations of this method. This method does
0062         not guarantee the order of the elements in its result.
0063         """
0064         return list(self._jtracker.getJobIdsForGroup(jobGroup))
0065 
0066     def getActiveStageIds(self):
0067         """
0068         Returns an array containing the ids of all active stages.
0069         """
0070         return sorted(list(self._jtracker.getActiveStageIds()))
0071 
0072     def getActiveJobsIds(self):
0073         """
0074         Returns an array containing the ids of all active jobs.
0075         """
0076         return sorted((list(self._jtracker.getActiveJobIds())))
0077 
0078     def getJobInfo(self, jobId):
0079         """
0080         Returns a :class:`SparkJobInfo` object, or None if the job info
0081         could not be found or was garbage collected.
0082         """
0083         job = self._jtracker.getJobInfo(jobId)
0084         if job is not None:
0085             return SparkJobInfo(jobId, job.stageIds(), str(job.status()))
0086 
0087     def getStageInfo(self, stageId):
0088         """
0089         Returns a :class:`SparkStageInfo` object, or None if the stage
0090         info could not be found or was garbage collected.
0091         """
0092         stage = self._jtracker.getStageInfo(stageId)
0093         if stage is not None:
0094             # TODO: fetch them in batch for better performance
0095             attrs = [getattr(stage, f)() for f in SparkStageInfo._fields[1:]]
0096             return SparkStageInfo(stageId, *attrs)