0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import numbers
0019 import os
0020 import signal
0021 import select
0022 import socket
0023 import sys
0024 import traceback
0025 import time
0026 import gc
0027 from errno import EINTR, EAGAIN
0028 from socket import AF_INET, SOCK_STREAM, SOMAXCONN
0029 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
0030
0031 from pyspark.worker import main as worker_main
0032 from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer
0033
0034
0035 def compute_real_exit_code(exit_code):
0036
0037 if isinstance(exit_code, numbers.Integral):
0038 return exit_code
0039 else:
0040 return 1
0041
0042
0043 def worker(sock, authenticated):
0044 """
0045 Called by a worker process after the fork().
0046 """
0047 signal.signal(SIGHUP, SIG_DFL)
0048 signal.signal(SIGCHLD, SIG_DFL)
0049 signal.signal(SIGTERM, SIG_DFL)
0050
0051
0052 signal.signal(SIGINT, signal.default_int_handler)
0053
0054
0055
0056
0057 buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
0058 infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size)
0059 outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size)
0060
0061 if not authenticated:
0062 client_secret = UTF8Deserializer().loads(infile)
0063 if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret:
0064 write_with_length("ok".encode("utf-8"), outfile)
0065 outfile.flush()
0066 else:
0067 write_with_length("err".encode("utf-8"), outfile)
0068 outfile.flush()
0069 sock.close()
0070 return 1
0071
0072 exit_code = 0
0073 try:
0074 worker_main(infile, outfile)
0075 except SystemExit as exc:
0076 exit_code = compute_real_exit_code(exc.code)
0077 finally:
0078 try:
0079 outfile.flush()
0080 except Exception:
0081 pass
0082 return exit_code
0083
0084
0085 def manager():
0086
0087 os.setpgid(0, 0)
0088
0089
0090 listen_sock = socket.socket(AF_INET, SOCK_STREAM)
0091 listen_sock.bind(('127.0.0.1', 0))
0092 listen_sock.listen(max(1024, SOMAXCONN))
0093 listen_host, listen_port = listen_sock.getsockname()
0094
0095
0096 stdin_bin = os.fdopen(sys.stdin.fileno(), 'rb', 4)
0097 stdout_bin = os.fdopen(sys.stdout.fileno(), 'wb', 4)
0098 write_int(listen_port, stdout_bin)
0099 stdout_bin.flush()
0100
0101 def shutdown(code):
0102 signal.signal(SIGTERM, SIG_DFL)
0103
0104 os.kill(0, SIGHUP)
0105 sys.exit(code)
0106
0107 def handle_sigterm(*args):
0108 shutdown(1)
0109 signal.signal(SIGTERM, handle_sigterm)
0110 signal.signal(SIGHUP, SIG_IGN)
0111 signal.signal(SIGCHLD, SIG_IGN)
0112
0113 reuse = os.environ.get("SPARK_REUSE_WORKER")
0114
0115
0116 try:
0117 while True:
0118 try:
0119 ready_fds = select.select([0, listen_sock], [], [], 1)[0]
0120 except select.error as ex:
0121 if ex[0] == EINTR:
0122 continue
0123 else:
0124 raise
0125
0126 if 0 in ready_fds:
0127 try:
0128 worker_pid = read_int(stdin_bin)
0129 except EOFError:
0130
0131 shutdown(0)
0132 try:
0133 os.kill(worker_pid, signal.SIGKILL)
0134 except OSError:
0135 pass
0136
0137 if listen_sock in ready_fds:
0138 try:
0139 sock, _ = listen_sock.accept()
0140 except OSError as e:
0141 if e.errno == EINTR:
0142 continue
0143 raise
0144
0145
0146 try:
0147 pid = os.fork()
0148 except OSError as e:
0149 if e.errno in (EAGAIN, EINTR):
0150 time.sleep(1)
0151 pid = os.fork()
0152 else:
0153 outfile = sock.makefile(mode='wb')
0154 write_int(e.errno, outfile)
0155 outfile.flush()
0156 outfile.close()
0157 sock.close()
0158 continue
0159
0160 if pid == 0:
0161
0162 listen_sock.close()
0163
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173
0174 devnull = open(os.devnull, 'r')
0175 os.dup2(devnull.fileno(), 0)
0176 devnull.close()
0177
0178 try:
0179
0180 outfile = sock.makefile(mode="wb")
0181 write_int(os.getpid(), outfile)
0182 outfile.flush()
0183 outfile.close()
0184 authenticated = False
0185 while True:
0186 code = worker(sock, authenticated)
0187 if code == 0:
0188 authenticated = True
0189 if not reuse or code:
0190
0191 try:
0192 while sock.recv(1024):
0193 pass
0194 except Exception:
0195 pass
0196 break
0197 gc.collect()
0198 except:
0199 traceback.print_exc()
0200 os._exit(1)
0201 else:
0202 os._exit(0)
0203 else:
0204 sock.close()
0205
0206 finally:
0207 shutdown(1)
0208
0209
0210 if __name__ == '__main__':
0211 manager()