0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 import os
0018 import sys
0019 import time
0020 import unittest
0021
0022 from pyspark.serializers import read_int
0023
0024
0025 class DaemonTests(unittest.TestCase):
0026 def connect(self, port):
0027 from socket import socket, AF_INET, SOCK_STREAM
0028 sock = socket(AF_INET, SOCK_STREAM)
0029 sock.connect(('127.0.0.1', port))
0030
0031 sock.send(b"\xFF\xFF\xFF\xFF")
0032 sock.close()
0033 return True
0034
0035 def do_termination_test(self, terminator):
0036 from subprocess import Popen, PIPE
0037 from errno import ECONNREFUSED
0038
0039
0040 daemon_path = os.path.join(os.path.dirname(__file__), "..", "daemon.py")
0041 python_exec = sys.executable or os.environ.get("PYSPARK_PYTHON")
0042 daemon = Popen([python_exec, daemon_path], stdin=PIPE, stdout=PIPE)
0043
0044
0045 port = read_int(daemon.stdout)
0046
0047
0048 self.assertTrue(self.connect(port))
0049
0050
0051 time.sleep(1)
0052
0053
0054 terminator(daemon)
0055 time.sleep(1)
0056
0057
0058 try:
0059 self.connect(port)
0060 except EnvironmentError as exception:
0061 self.assertEqual(exception.errno, ECONNREFUSED)
0062 else:
0063 self.fail("Expected EnvironmentError to be raised")
0064
0065 def test_termination_stdin(self):
0066 """Ensure that daemon and workers terminate when stdin is closed."""
0067 self.do_termination_test(lambda daemon: daemon.stdin.close())
0068
0069 def test_termination_sigterm(self):
0070 """Ensure that daemon and workers terminate on SIGTERM."""
0071 from signal import SIGTERM
0072 self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
0073
0074
0075 if __name__ == "__main__":
0076 from pyspark.tests.test_daemon import *
0077
0078 try:
0079 import xmlrunner
0080 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0081 except ImportError:
0082 testRunner = None
0083 unittest.main(testRunner=testRunner, verbosity=2)