« Back to Index

[Python Fastly API Edge Dictionary Example]

View original Gist on GitHub

Tags: #python3 #fastly #api #edge #dictionary #example #cdn

Python Fastly API Edge Dictionary Example.py

#!/usr/bin/env python3

"""
description:
allows engineers to cause requests to failover.example.com to skip
authentication (we use 'Basic' scheme https://tools.ietf.org/html/rfc7617).

dependencies:
tornado >= 6.0

usage:
GET
    FASTLY_API_KEY=123 ./scripts/dr_west
SET
    FASTLY_API_KEY=123 ./scripts/dr_west --set --skip-auth=<true|false>
"""

import argparse
import json
import os
import re
import sys

# required dependencies

try:
    from tornado.httpclient import AsyncHTTPClient
    from tornado.ioloop import IOLoop
except ImportError:
    print("this script requires tornado >= 6.0")
    sys.exit(1)

if not sys.version_info >= (3, 7, 0):
    print("this script requires the use of Python 3.7+")
    sys.exit(1)

# cli flags

parser = argparse.ArgumentParser(description="DR failover.example.com")
parser.add_argument("--set", help="Set operation", action="store_true")
parser.add_argument("--skip-auth", help="Allow all requests to failover.example.com", default="false")
args = parser.parse_args()

# constants

SKIP_AUTH = bool(re.match("true", args.skip_auth, flags=re.IGNORECASE))
EDGE_DICT_ID = "123"
EDGE_DICT_KEY = "skip_auth"
FASTLY_API_KEY = os.environ.get("FASTLY_API_KEY")
FASTLY_SERVICE_ID = "456"

if not FASTLY_API_KEY:
    print("Fastly API token is missing. Set in env var `FASTLY_API_KEY`")
    sys.exit(1)

# configuration

AsyncHTTPClient.configure(None, defaults=dict(user_agent="Your-Organization"))
http_client = AsyncHTTPClient()


async def api_request(method="GET", body=None):
    content_type = {}

    if method == "PUT":
        content_type = {"Content-Type": "application/x-www-form-urlencoded"}

    headers = {"Fastly-Key": FASTLY_API_KEY, **content_type}

    host = "https://api.fastly.com"
    path = f"service/{FASTLY_SERVICE_ID}/dictionary/{EDGE_DICT_ID}/item/{EDGE_DICT_KEY}"
    url = f"{host}/{path}"

    resp = await http_client.fetch(url, method=method, headers=headers, body=body, raise_error=False)
    data = json.loads(resp.body)
    key = data['item_value']

    print(f"{method} skip_auth: {key}")


async def edge_key():
    """retrieves edge dictionary (foo_dict) key (skip_auth) value."""

    await api_request()


async def edge_key_update():
    """updates edge dictionary (foo_dict) with new key (skip_auth) value."""

    value = "true" if SKIP_AUTH else "false"
    body = f"item_value={value}"

    await api_request(method="PUT", body=body)


async def main():
    if args.set:
        await edge_key_update()
    else:
        await edge_key()

io_loop = IOLoop.current()
io_loop.run_sync(main)