Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 from __future__ import print_function
0019 
0020 import time
0021 import threading
0022 import sys
0023 if sys.version >= '3':
0024     import queue as Queue
0025 else:
0026     import Queue
0027 
0028 from pyspark import SparkConf, SparkContext
0029 
0030 
0031 def delayed(seconds):
0032     def f(x):
0033         time.sleep(seconds)
0034         return x
0035     return f
0036 
0037 
0038 def call_in_background(f, *args):
0039     result = Queue.Queue(1)
0040     t = threading.Thread(target=lambda: result.put(f(*args)))
0041     t.daemon = True
0042     t.start()
0043     return result
0044 
0045 
0046 def main():
0047     conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
0048     sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)
0049 
0050     def run():
0051         rdd = sc.parallelize(range(10), 10).map(delayed(2))
0052         reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
0053         return reduced.map(delayed(2)).collect()
0054 
0055     result = call_in_background(run)
0056     status = sc.statusTracker()
0057     while result.empty():
0058         ids = status.getJobIdsForGroup()
0059         for id in ids:
0060             job = status.getJobInfo(id)
0061             print("Job", id, "status: ", job.status)
0062             for sid in job.stageIds:
0063                 info = status.getStageInfo(sid)
0064                 if info:
0065                     print("Stage %d: %d tasks total (%d active, %d complete)" %
0066                           (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
0067         time.sleep(1)
0068 
0069     print("Job results are:", result.get())
0070     sc.stop()
0071 
0072 if __name__ == "__main__":
0073     main()