Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 import os
0018 import sys
0019 import time
0020 import unittest
0021 
0022 from pyspark.serializers import read_int
0023 
0024 
0025 class DaemonTests(unittest.TestCase):
0026     def connect(self, port):
0027         from socket import socket, AF_INET, SOCK_STREAM
0028         sock = socket(AF_INET, SOCK_STREAM)
0029         sock.connect(('127.0.0.1', port))
0030         # send a split index of -1 to shutdown the worker
0031         sock.send(b"\xFF\xFF\xFF\xFF")
0032         sock.close()
0033         return True
0034 
0035     def do_termination_test(self, terminator):
0036         from subprocess import Popen, PIPE
0037         from errno import ECONNREFUSED
0038 
0039         # start daemon
0040         daemon_path = os.path.join(os.path.dirname(__file__), "..", "daemon.py")
0041         python_exec = sys.executable or os.environ.get("PYSPARK_PYTHON")
0042         daemon = Popen([python_exec, daemon_path], stdin=PIPE, stdout=PIPE)
0043 
0044         # read the port number
0045         port = read_int(daemon.stdout)
0046 
0047         # daemon should accept connections
0048         self.assertTrue(self.connect(port))
0049 
0050         # wait worker process spawned from daemon exit.
0051         time.sleep(1)
0052 
0053         # request shutdown
0054         terminator(daemon)
0055         time.sleep(1)
0056 
0057         # daemon should no longer accept connections
0058         try:
0059             self.connect(port)
0060         except EnvironmentError as exception:
0061             self.assertEqual(exception.errno, ECONNREFUSED)
0062         else:
0063             self.fail("Expected EnvironmentError to be raised")
0064 
0065     def test_termination_stdin(self):
0066         """Ensure that daemon and workers terminate when stdin is closed."""
0067         self.do_termination_test(lambda daemon: daemon.stdin.close())
0068 
0069     def test_termination_sigterm(self):
0070         """Ensure that daemon and workers terminate on SIGTERM."""
0071         from signal import SIGTERM
0072         self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
0073 
0074 
0075 if __name__ == "__main__":
0076     from pyspark.tests.test_daemon import *
0077 
0078     try:
0079         import xmlrunner
0080         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0081     except ImportError:
0082         testRunner = None
0083     unittest.main(testRunner=testRunner, verbosity=2)