Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import numbers
0019 import os
0020 import signal
0021 import select
0022 import socket
0023 import sys
0024 import traceback
0025 import time
0026 import gc
0027 from errno import EINTR, EAGAIN
0028 from socket import AF_INET, SOCK_STREAM, SOMAXCONN
0029 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
0030 
0031 from pyspark.worker import main as worker_main
0032 from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer
0033 
0034 
0035 def compute_real_exit_code(exit_code):
0036     # SystemExit's code can be integer or string, but os._exit only accepts integers
0037     if isinstance(exit_code, numbers.Integral):
0038         return exit_code
0039     else:
0040         return 1
0041 
0042 
0043 def worker(sock, authenticated):
0044     """
0045     Called by a worker process after the fork().
0046     """
0047     signal.signal(SIGHUP, SIG_DFL)
0048     signal.signal(SIGCHLD, SIG_DFL)
0049     signal.signal(SIGTERM, SIG_DFL)
0050     # restore the handler for SIGINT,
0051     # it's useful for debugging (show the stacktrace before exit)
0052     signal.signal(SIGINT, signal.default_int_handler)
0053 
0054     # Read the socket using fdopen instead of socket.makefile() because the latter
0055     # seems to be very slow; note that we need to dup() the file descriptor because
0056     # otherwise writes also cause a seek that makes us miss data on the read side.
0057     buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
0058     infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size)
0059     outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size)
0060 
0061     if not authenticated:
0062         client_secret = UTF8Deserializer().loads(infile)
0063         if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret:
0064             write_with_length("ok".encode("utf-8"), outfile)
0065             outfile.flush()
0066         else:
0067             write_with_length("err".encode("utf-8"), outfile)
0068             outfile.flush()
0069             sock.close()
0070             return 1
0071 
0072     exit_code = 0
0073     try:
0074         worker_main(infile, outfile)
0075     except SystemExit as exc:
0076         exit_code = compute_real_exit_code(exc.code)
0077     finally:
0078         try:
0079             outfile.flush()
0080         except Exception:
0081             pass
0082     return exit_code
0083 
0084 
0085 def manager():
0086     # Create a new process group to corral our children
0087     os.setpgid(0, 0)
0088 
0089     # Create a listening socket on the AF_INET loopback interface
0090     listen_sock = socket.socket(AF_INET, SOCK_STREAM)
0091     listen_sock.bind(('127.0.0.1', 0))
0092     listen_sock.listen(max(1024, SOMAXCONN))
0093     listen_host, listen_port = listen_sock.getsockname()
0094 
0095     # re-open stdin/stdout in 'wb' mode
0096     stdin_bin = os.fdopen(sys.stdin.fileno(), 'rb', 4)
0097     stdout_bin = os.fdopen(sys.stdout.fileno(), 'wb', 4)
0098     write_int(listen_port, stdout_bin)
0099     stdout_bin.flush()
0100 
0101     def shutdown(code):
0102         signal.signal(SIGTERM, SIG_DFL)
0103         # Send SIGHUP to notify workers of shutdown
0104         os.kill(0, SIGHUP)
0105         sys.exit(code)
0106 
0107     def handle_sigterm(*args):
0108         shutdown(1)
0109     signal.signal(SIGTERM, handle_sigterm)  # Gracefully exit on SIGTERM
0110     signal.signal(SIGHUP, SIG_IGN)  # Don't die on SIGHUP
0111     signal.signal(SIGCHLD, SIG_IGN)
0112 
0113     reuse = os.environ.get("SPARK_REUSE_WORKER")
0114 
0115     # Initialization complete
0116     try:
0117         while True:
0118             try:
0119                 ready_fds = select.select([0, listen_sock], [], [], 1)[0]
0120             except select.error as ex:
0121                 if ex[0] == EINTR:
0122                     continue
0123                 else:
0124                     raise
0125 
0126             if 0 in ready_fds:
0127                 try:
0128                     worker_pid = read_int(stdin_bin)
0129                 except EOFError:
0130                     # Spark told us to exit by closing stdin
0131                     shutdown(0)
0132                 try:
0133                     os.kill(worker_pid, signal.SIGKILL)
0134                 except OSError:
0135                     pass  # process already died
0136 
0137             if listen_sock in ready_fds:
0138                 try:
0139                     sock, _ = listen_sock.accept()
0140                 except OSError as e:
0141                     if e.errno == EINTR:
0142                         continue
0143                     raise
0144 
0145                 # Launch a worker process
0146                 try:
0147                     pid = os.fork()
0148                 except OSError as e:
0149                     if e.errno in (EAGAIN, EINTR):
0150                         time.sleep(1)
0151                         pid = os.fork()  # error here will shutdown daemon
0152                     else:
0153                         outfile = sock.makefile(mode='wb')
0154                         write_int(e.errno, outfile)  # Signal that the fork failed
0155                         outfile.flush()
0156                         outfile.close()
0157                         sock.close()
0158                         continue
0159 
0160                 if pid == 0:
0161                     # in child process
0162                     listen_sock.close()
0163 
0164                     # It should close the standard input in the child process so that
0165                     # Python native function executions stay intact.
0166                     #
0167                     # Note that if we just close the standard input (file descriptor 0),
0168                     # the lowest file descriptor (file descriptor 0) will be allocated,
0169                     # later when other file descriptors should happen to open.
0170                     #
0171                     # Therefore, here we redirects it to '/dev/null' by duplicating
0172                     # another file descriptor for '/dev/null' to the standard input (0).
0173                     # See SPARK-26175.
0174                     devnull = open(os.devnull, 'r')
0175                     os.dup2(devnull.fileno(), 0)
0176                     devnull.close()
0177 
0178                     try:
0179                         # Acknowledge that the fork was successful
0180                         outfile = sock.makefile(mode="wb")
0181                         write_int(os.getpid(), outfile)
0182                         outfile.flush()
0183                         outfile.close()
0184                         authenticated = False
0185                         while True:
0186                             code = worker(sock, authenticated)
0187                             if code == 0:
0188                                 authenticated = True
0189                             if not reuse or code:
0190                                 # wait for closing
0191                                 try:
0192                                     while sock.recv(1024):
0193                                         pass
0194                                 except Exception:
0195                                     pass
0196                                 break
0197                             gc.collect()
0198                     except:
0199                         traceback.print_exc()
0200                         os._exit(1)
0201                     else:
0202                         os._exit(0)
0203                 else:
0204                     sock.close()
0205 
0206     finally:
0207         shutdown(1)
0208 
0209 
0210 if __name__ == '__main__':
0211     manager()