0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 from __future__ import print_function
0019
0020 import time
0021 import threading
0022 import sys
0023 if sys.version >= '3':
0024 import queue as Queue
0025 else:
0026 import Queue
0027
0028 from pyspark import SparkConf, SparkContext
0029
0030
0031 def delayed(seconds):
0032 def f(x):
0033 time.sleep(seconds)
0034 return x
0035 return f
0036
0037
0038 def call_in_background(f, *args):
0039 result = Queue.Queue(1)
0040 t = threading.Thread(target=lambda: result.put(f(*args)))
0041 t.daemon = True
0042 t.start()
0043 return result
0044
0045
0046 def main():
0047 conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
0048 sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)
0049
0050 def run():
0051 rdd = sc.parallelize(range(10), 10).map(delayed(2))
0052 reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
0053 return reduced.map(delayed(2)).collect()
0054
0055 result = call_in_background(run)
0056 status = sc.statusTracker()
0057 while result.empty():
0058 ids = status.getJobIdsForGroup()
0059 for id in ids:
0060 job = status.getJobInfo(id)
0061 print("Job", id, "status: ", job.status)
0062 for sid in job.stageIds:
0063 info = status.getStageInfo(sid)
0064 if info:
0065 print("Stage %d: %d tasks total (%d active, %d complete)" %
0066 (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
0067 time.sleep(1)
0068
0069 print("Job results are:", result.get())
0070 sc.stop()
0071
0072 if __name__ == "__main__":
0073 main()