Tags: #python3 #fastly #api #edge #dictionary #example #cdn
#!/usr/bin/env python3
"""
description:
allows engineers to cause requests to failover.example.com to skip
authentication (we use 'Basic' scheme https://tools.ietf.org/html/rfc7617).
dependencies:
tornado >= 6.0
usage:
GET
FASTLY_API_KEY=123 ./scripts/dr_west
SET
FASTLY_API_KEY=123 ./scripts/dr_west --set --skip-auth=<true|false>
"""
import argparse
import json
import os
import re
import sys
# required dependencies
try:
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
except ImportError:
print("this script requires tornado >= 6.0")
sys.exit(1)
if not sys.version_info >= (3, 7, 0):
print("this script requires the use of Python 3.7+")
sys.exit(1)
# cli flags
parser = argparse.ArgumentParser(description="DR failover.example.com")
parser.add_argument("--set", help="Set operation", action="store_true")
parser.add_argument("--skip-auth", help="Allow all requests to failover.example.com", default="false")
args = parser.parse_args()
# constants
SKIP_AUTH = bool(re.match("true", args.skip_auth, flags=re.IGNORECASE))
EDGE_DICT_ID = "123"
EDGE_DICT_KEY = "skip_auth"
FASTLY_API_KEY = os.environ.get("FASTLY_API_KEY")
FASTLY_SERVICE_ID = "456"
if not FASTLY_API_KEY:
print("Fastly API token is missing. Set in env var `FASTLY_API_KEY`")
sys.exit(1)
# configuration
AsyncHTTPClient.configure(None, defaults=dict(user_agent="Your-Organization"))
http_client = AsyncHTTPClient()
async def api_request(method="GET", body=None):
content_type = {}
if method == "PUT":
content_type = {"Content-Type": "application/x-www-form-urlencoded"}
headers = {"Fastly-Key": FASTLY_API_KEY, **content_type}
host = "https://api.fastly.com"
path = f"service/{FASTLY_SERVICE_ID}/dictionary/{EDGE_DICT_ID}/item/{EDGE_DICT_KEY}"
url = f"{host}/{path}"
resp = await http_client.fetch(url, method=method, headers=headers, body=body, raise_error=False)
data = json.loads(resp.body)
key = data['item_value']
print(f"{method} skip_auth: {key}")
async def edge_key():
"""retrieves edge dictionary (foo_dict) key (skip_auth) value."""
await api_request()
async def edge_key_update():
"""updates edge dictionary (foo_dict) with new key (skip_auth) value."""
value = "true" if SKIP_AUTH else "false"
body = f"item_value={value}"
await api_request(method="PUT", body=body)
async def main():
if args.set:
await edge_key_update()
else:
await edge_key()
io_loop = IOLoop.current()
io_loop.run_sync(main)