« Back to Index

[Python Tornado Graceful Shutdown]

View original Gist on GitHub

Tags: #python #python3 #tornado #graceful #shutdown #signals #sigterm #sigint

1. Summary.md

2. things tried.md

Working code (Python 3.7+)…

def sig_handler(server, sig, frame):
    io_loop = IOLoop.current()
    
    def stop_loop(deadline):
        now = time.time()
        if now < deadline and len(asyncio.all_tasks()) > 0:
            logging.info('waiting for next event loop tick')
            io_loop.add_timeout(now + 1, stop_loop, deadline)
        else:
            io_loop.stop()
            logging.info('event loop stopped')
            
    async def shutdown():
        logging.info('stop listening for new connections')
        server.stop()
        logging.info(f'stopping event loop in {SHUTDOWN_TIMEOUT} seconds')
        # await server.close_all_connections() << don't use unless you WANT to kill all client connections immediately
        stop_loop(time.time() + SHUTDOWN_TIMEOUT)
        
    io_loop.add_callback_from_signal(shutdown)

To recap what we had tried, that failed (iirc)…

3. OLD CODE TESTING SCRATCH PAD – Python Tornado Graceful Shutdown.py

#!/usr/bin/env python

"""
How to use it:
1. Just `kill -2 PROCESS_ID` or `kill -15 PROCESS_ID`,
   The Tornado Web Server Will shutdown after process all the request.
2. When you run it behind Nginx, it can graceful reboot your production server.

> Note: the original version of this code didn't account for closing in-flight requests. There is a method (not documented other than sifting through the tornado source code for its HTTPServer class) that supposedly handles this, called `close_all_connections` but that doesn't seem to work when I tested it (not sure why :shrugs:). But the below version does try and account for checking internal properties `_callbacks` or `_timeouts` (which are only available for older versions of tornado, so the code doesn't work for version 5+).

See alternative approach (uses tornado.gen.sleep instead of ioloop ticks):
https://github.com/tornadoweb/tornado/issues/1791#issuecomment-409258371

    async def shutdown():
        logging.info('Stopping http server')
        server.stop()
        await server.close_all_connections()
        logging.info('Will shutdown in %s seconds ...', SHUTDOWN_TIMEOUT)
        await tornado.gen.sleep(SHUTDOWN_TIMEOUT)
        ioloop.IOLoop.current().stop()
        
> Note: a lot of these examples are problematic or don't quite work due to the incompatible nature of tornado's IOLoop vs asyncio's. In Tornado version 5.0+ their IOLoop became a wrapper around asyncio's but even then they only expose a small number of methods that proxy to the underlying IOLoop, so methods like `run_until_complete` and `shutdown_asyncgens` (see below example) just don't work.

    def shutdown():
        logging.info('Stopping http server')
        server.stop()
        io_loop.run_until_complete(io_loop.shutdown_asyncgens())
        io_loop.close()

The following are all the same event loop instance...

- tornado.ioloop.IOLoop.current()
- tornado.ioloop.IOLoop.current().asyncio_loop
- asyncio.get_event_loop()

## UPDATE

The best I've been able to achieve with latest tornado (6.0.3) is...

def sig_handler(server, sig, frame):
    io_loop = IOLoop.current()

    async def shutdown():
        logging.info('stop listening for new connections')
        server.stop()
        logging.info(f'stopping event loop in {SHUTDOWN_TIMEOUT} seconds')
        await tornado.gen.sleep(SHUTDOWN_TIMEOUT)
        io_loop.stop()
        logging.info('event loop stopped')

    io_loop.add_callback_from_signal(shutdown)
    
## UPDATE 2 -- BETTER SOLUTION

def sig_handler(server, sig, frame):
    io_loop = IOLoop.current()

    def stop_loop(deadline):
        now = time.time()
        logging.info(f'len(asyncio.all_tasks()): {len(asyncio.all_tasks())}')
        if now < deadline and len(asyncio.all_tasks()) > 0:
            logging.info('waiting for next event loop tick')
            io_loop.add_timeout(now + 1, stop_loop, deadline)
        else:
            io_loop.stop()
            logging.info('event loop stopped')

    async def shutdown():
        logging.info('stop listening for new connections')
        server.stop()
        logging.info(f'stopping event loop in {SHUTDOWN_TIMEOUT} seconds')
        # await server.close_all_connections() << don't use unless you WANT to kill all client connections immediately
        stop_loop(time.time() + SHUTDOWN_TIMEOUT)

    io_loop.add_callback_from_signal(shutdown)
"""

import time
import signal
import logging
from functools import partial

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web

from tornado.options import define, options

define("port", default=8888, help="run on the given port", type=int)

MAX_WAIT_SECONDS_BEFORE_SHUTDOWN = 3


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")


def sig_handler(server, sig, frame):
    io_loop = tornado.ioloop.IOLoop.instance()

    def stop_loop(deadline):
        now = time.time()
        if now < deadline and (io_loop._callbacks or io_loop._timeouts):
            logging.info('Waiting for next tick')
            io_loop.add_timeout(now + 1, stop_loop, deadline)
        else:
            io_loop.stop()
            logging.info('Shutdown finally')

    async def shutdown():
        logging.info('Stopping http server')
        server.stop()
        logging.info('Will shutdown in %s seconds ...',
                     MAX_WAIT_SECONDS_BEFORE_SHUTDOWN)
        await server.close_all_connections()
        stop_loop(time.time() + MAX_WAIT_SECONDS_BEFORE_SHUTDOWN)

    logging.warning('Caught signal: %s', sig)
    io_loop.add_callback_from_signal(shutdown)


def main():
    tornado.options.parse_command_line()
    application = tornado.web.Application([
        (r"/", MainHandler),
    ])

    server = tornado.httpserver.HTTPServer(application)
    server.listen(options.port)

    signal.signal(signal.SIGTERM, partial(sig_handler, server))
    signal.signal(signal.SIGINT, partial(sig_handler, server))

    tornado.ioloop.IOLoop.instance().start()

    logging.info("Exit...")


if __name__ == "__main__":
    main()

4. Working Example – Python 3.7 Tornado 5+ Example with tests.md

Code

bf_tornado.signals module

"""signals module provides helper functions for tornado graceful shutdown."""

import asyncio
import functools
import signal
import time

from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop

SHUTDOWN_TIMEOUT = 30


async def shutdown(server: HTTPServer, timeout: int, io_loop: IOLoop):
    """Stop HTTPServer and schedule stopping of ioloop."""

    # stop listening for new connections
    server.stop()

    # schedule ioloop shutdown
    deadline = time.time() + timeout
    graceful_shutdown(deadline, io_loop)


def graceful_shutdown(deadline: float, io_loop: IOLoop):
    """Gracefully shutdown ioloop by allowing tasks to complete by deadline."""

    now = time.time()
    tasks = asyncio.all_tasks()

    if now < deadline and len(tasks) > 0:
        # defer shutdown until all tasks have a chance to complete
        io_loop.add_timeout(now + 1, graceful_shutdown, deadline, io_loop)
    else:
        stop_loop(io_loop)


def stop_loop(io_loop: IOLoop):
    """Stop the ioloop.

    This oneline function isn't inlined as it allows for simpler mocking within
    our bf_tornado test suite.
    """

    io_loop.stop()


def sig_handler(server: HTTPServer, timeout: int, sig, frame):
    """Schedules ioloop shutdown after specified timeout when TERM/INT signals received.

    In-flights tasks running on the asyncio event loop will be given the
    opportunity to finish before the loop is shutdown after specified timeout.

    Expects to be initiated using partial application:
        functools.partial(sig_handler, HTTPServer())
    This partial application is typically handled by signals.sig_listener.
    """

    io_loop = IOLoop.current()

    # execute callback on next event loop tick
    io_loop.add_callback_from_signal(shutdown, server, timeout, io_loop)


def sig_listener(server: HTTPServer, timeout: int = 0):
    """Configures listeners for TERM/INT signals.

    Timeout should be a positive integer, otherwise a default will be provided.
    """

    if not timeout:
        timeout = SHUTDOWN_TIMEOUT

    p = functools.partial(sig_handler, server, timeout)

    signal.signal(signal.SIGTERM, p)
    signal.signal(signal.SIGINT, p)

Tests

import asyncio
import os
import signal
import threading
import time
from unittest import mock

import bf_metrics

import bf_tornado.handlers
import bf_tornado.signals

import tornado.gen
import tornado.ioloop
import tornado.testing
import tornado.util
import tornado.web


class TestGracefulShutdown(tornado.testing.AsyncHTTPTestCase):
    """Verify server allows in-flight requests to complete before shutdown.

    Note: we're using tornado's `yield` abstraction instead of proper
    async/await syntax because of the use of `gen_test` decorator to work
    around fact the ioloop isn't running when running the test suite.

    This also means we need to use the `get_url` abstraction function to
    convert the requested path into a fully qualified path to the locally
    running web server.
    """
    def get_app(self):
        class FooHandler(bf_tornado.handlers.BaseHandler):
            metrics = bf_metrics.Metrics(namespace='foo', host='localhost')

            async def get(self):
                await asyncio.sleep(3)
                self.finish('OK')

        return tornado.web.Application([
            (r'/', FooHandler)
        ])

    def tearDown(self):
        """Ensure we verify the `stop_loop` mock was called."""

        try:
            self.mock_stop_loop.assert_called()
        except AssertionError:
            self.fail("mock_stop_loop assertion failed")

    @mock.patch("bf_tornado.signals.stop_loop")
    @tornado.testing.gen_test
    def test_inflight_completed(self, mock_stop_loop):
        """verify in-flight request completes after SIGINT is received.

        The test flow is:

            - make network request
            - issue interrupt signal (via separate thread)
            - network request completes before ioloop shutdown timeout

        We mock the `stop_loop` function which is what would normally trigger
        the ioloop to be stopped. So although we issue a SIGINT it won't
        actually stop the ioloop that is running this test.

        The request flow means the web server request will complete BEFORE the
        mocked `stop_loop` function has been called. This is a problem because
        the test function will finish executing and thus the assertion check at
        that time is likely to fail.

        To solve this problem, we sleep to enable enough time for the
        `stop_loop` function to be called, then we assign the mock to the class
        so that we can reference it within the tearDown function. That gives
        our asynchronous task time to be called.
        """

        # ensure ioloop waits long enough for request to complete
        shutdown_timeout = 10
        bf_tornado.signals.sig_listener(self.http_server, shutdown_timeout)

        pid = os.getpid()

        def trigger_signal():
            # defer SIGNINT long enough to allow HTTP request to tornado server
            time.sleep(1)
            os.kill(pid, signal.SIGINT)

        thread = threading.Thread(target=trigger_signal)
        thread.daemon = True
        thread.start()

        resp = yield self.http_client.fetch(self.get_url('/'))
        assert resp.code == 200

        # this sleep causes tornado's ioloop to context switch back to active
        # when this test function finishes (and before the tearDown) is
        # triggered, thus allowing the async task to call the stop_loop mock.
        time.sleep(1)

        # we can't assert the mock was called from this test function as the
        # test function itself is blocking the ioloop background task.
        #
        # the reason it blocks is because the http request is designed to
        # finish before the shutdown timeout and so once we're back inside the
        # test function we cannot context switch back to the asyncio task that
        # is trying to verify if it should call `stop_loop`.
        #
        # this means we must assert against the mock within a tearDown function.
        self.mock_stop_loop = mock_stop_loop

    @mock.patch("bf_tornado.signals.stop_loop")
    @tornado.testing.gen_test
    def test_inflight_dropped(self, mock_stop_loop):
        """verify in-flight request is dropped after SIGINT is received.

        The test flow is:

            - make network request
            - issue interrupt signal (via separate thread)
            - ioloop shuts down before network request completes

        We mock the `stop_loop` function which is what would normally trigger
        the ioloop to be stopped. So although we issue a SIGINT it won't
        actually stop the ioloop that is running this test.

        The request flow means the web server request will NOT complete before
        the mocked `stop_loop` function has been called.
        """

        # reset mock property
        self.mock_stop_loop = mock_stop_loop

        # ensure ioloop does NOT wait long enough for request to complete
        shutdown_timeout = 1
        bf_tornado.signals.sig_listener(self.http_server, shutdown_timeout)

        pid = os.getpid()

        def trigger_signal():
            # defer SIGNINT long enough to allow HTTP request to tornado server
            time.sleep(1)
            os.kill(pid, signal.SIGINT)

        thread = threading.Thread(target=trigger_signal)
        thread.daemon = True
        thread.start()

        resp = yield self.http_client.fetch(self.get_url('/'))
        assert resp.code == 200

        # we can assert the mock was called here because the background asyncio
        # task has completed (i.e. called `stop_loop`) by the time the http
        # request has completed.
        mock_stop_loop.assert_called()