« Back to Index

[Python Tornado Decorator with arguments]

View original Gist on GitHub

Tags: #python #decorator #tornado #dict #compare

Python Tornado Decorator with arguments.py

def auth(*args, **kwargs):
    client = kwargs['client']

    def wrapper(handler_method):
        """input will be, for example, RequestHandler.get or RequestHandler.post"""

        @wraps(handler_method)
        async def request_handler_wrapper(*args, **kwargs):
            """input is the request handler class and params from tornado.routing.Rule"""

            request_handler = args[0]
            cookie = acquire_cookie(request_handler)
            if invalid_cookie(cookie):
                return request_handler.send_error(status_code=401)
                # raise tornado.web.HTTPError(401)

            tokens = extract_tokens_from_cookie(cookie)
            if not tokens:
                return request_handler.send_error(status_code=401)
                # raise tornado.web.HTTPError(401)

            id_payload = await verify_tokens(tokens, client)
            if not id_payload:
                return request_handler.send_error(status_code=401)
                # raise tornado.web.HTTPError(401)

            kwargs = {"auth_data": id_payload, **kwargs}

            return await handler_method(*args, **kwargs)
        return request_handler_wrapper
    return wrapper

  
def acquire_cookie(request_handler):
    return request_handler.get_cookie(cookie_name)


def invalid_cookie(cookie):
    if not cookie:
        return True
    if cookie.find('access_token') == -1:
        return True
    return False


def extract_tokens_from_cookie(cookie):
    p = re.compile(r'id_token=(?P<id_token>.+),access_token=(?P<access_token>.+),refresh_token=(?P<refresh_token>.+)')  # noqa
    tokens = p.search(cookie)
    return tokens


def build_endpoint(host, path) -> str:
    not_prod = settings.get('cluster') != 'prod'
    buzzfeed = 'buzzfeed.com' in host
    gateway = re.search('api-(?:public-)?stage.buzzfeed.com', host)

    if not_prod and buzzfeed and not gateway:
        creds = '{}:{}'.format(webapp_stage_user, webapp_stage_pass)
        host = '{}@{}'.format(creds, host)

    return 'https://{}{}'.format(host, path)


async def verify_tokens(tokens, api_gateway):
    if not tokens:
        return

    api_path = '/bfauth/tokens/verify'
    endpoint = build_endpoint(api_gateway_host, api_path)

    post_data = {'id_token': tokens.group('id_token'),
                 'access_token': tokens.group('access_token'),
                 'refresh_token': tokens.group('refresh_token')}

    body = urllib.parse.urlencode(post_data)

    try:
        response = await api_gateway.post(endpoint, body=body, debug=debug, request_timeout=10)
    except bf_api_gateway.errors.BackoffError as err:
        metrics.incr("verify_tokens", tags={"service": service,
                                            "status": "error",
                                            "context": "{}. {}".format(type(err), str(err))})
        return
    except bf_api_gateway.errors.APIGatewayError as err:
        metrics.incr("verify_tokens", tags={"service": service,
                                            "status": "error",
                                            "context": "{}. {}".format(type(err), str(err))})
        return

    try:
        response_data = json.loads(response.body)
    except Exception as err:
        metrics.incr("verify_tokens", tags={"service": service,
                                            "status": "error",
                                            "context": "{}. {}".format(type(err), str(err))})
        return

    if response_data.get('status') == 'error':
        metrics.incr("verify_tokens", tags={"service": service,
                                            "status": "error",
                                            "context": "status returned error"})
        return

    return response_data.get('payload')

class AuthTest(tornado.web.RequestHandler):
    @auth('get', client=mock_apigateway)
    async def get(self, *args, **kwargs):
        """ kwargs = {"auth_data": "...", "path": "protected"} """
        self.finish(kwargs)

return tornado.web.Application([
    (r'/(?P<path>protected)', AuthTest),
])

@mock.patch("bf_auth.subject.build_endpoint")
def test_successful_authentication(self, mock_endpoint):
    """Test a cookie is acquired, and the tokens are extracted and verified."""

    mock_cookie = 'mycookiename=id_token=1,access_token=2,refresh_token=3;'
    mock_endpoint.return_value = 'https://example-api.com'
    mock_apigateway.post.side_effect = mock_verify_tokens_success

    response = self.fetch('/protected', headers={"Cookie": mock_cookie})
    assert response.code == 200

    # Python 3.5 doesn't guarantee dictionary ordering, so we use json sort_keys
    actual = json.dumps(json.loads(response.body.decode()), sort_keys=True)
    expected = '{"auth_data": "foobar", "path": "protected"}'
    assert actual == expected