« Back to Index

Python: AWS Lambda sending S3 Fastly streamed logs to Datadog

View original Gist on GitHub

Tags: #aws #python #logs

AWS Python Lambda sending S3 Fastly streamed logs to Datadog.py

'''
This script is a modified/simplified version of a Python2 script provided by
Datadog for getting log data from S3 into their log aggregation pipeline.

The original script can be found here:
https://github.com/DataDog/datadog-serverless-functions/tree/master/aws/logs_monitoring

We've removed any code that wasn't relevant for our requirements, and have also
updated it to work with Python3.
'''

#!/usr/bin/env python3

# standard library modules

import gzip
import json
import logging
import os
import re
import socket
import ssl
import urllib
from io import BufferedReader, BytesIO

# third party modules

import boto3


# configuration

CLUSTER = os.environ['CLUSTER']
DD_URL = os.getenv('DD_URL', default='lambda-intake.logs.datadoghq.com')
DD_PORT = os.getenv('DD_PORT', default=10516)
IP_REGEX = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', re.I)
SECRETS_MANAGER_ID = os.environ['SECRETS_MANAGER_ID']

# service clients configuration

s3 = boto3.client('s3')
secretsmanager = boto3.client('secretsmanager')

try:
    aws_secret_response = secretsmanager.get_secret_value(SecretId=SECRETS_MANAGER_ID)
    DD_API_KEY = aws_secret_response.get('SecretString')
except Exception:
    raise Exception('unable to acquire datadog secret api key')


class DatadogConnection(object):
    def __init__(self, host, port, ddApiKey):
        self.host = host
        self.port = port
        self.api_key = ddApiKey
        self._sock = None

    def _connect(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s = ssl.wrap_socket(s)
        s.connect((self.host, self.port))
        return s

    def safe_submit_log(self, log_entry, context):
        try:
            self.send_entry(log_entry, context)
        except Exception:
            # retry once
            self._sock = self._connect()
            self.send_entry(log_entry, context)
        return self

    def send_entry(self, log_entry, context):
        if not isinstance(log_entry, dict):
            raise Exception(f'log entry needs to be of type: dict: {log_entry}')

        # ensure rig metadata is passed through
        log_entry.update({'rig': {'cluster': CLUSTER, 'service': 'rig_cdn_logs_to_datadog'}})

        # datadog expects log to be wrapped in a 'message' field
        if 'message' not in log_entry:
            log_entry = {'message': log_entry}

        # documentation:
        # https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html
        tags = {
            'functionName': context.function_name.lower(),
            'functionVersion': context.function_version,
            'memorysize': context.memory_limit_in_mb
        }

        log_entry.update({
            'ddtags': ', '.join([f'{k}:{v}' for k, v in tags.items()]),
            'ddsource': 'aws',
            'source': 'fastly'
        })

        str_entry = json.dumps(log_entry)

        if os.getenv('REDACT_IP'):
            try:
                str_entry = IP_REGEX.sub('xxx.xxx.xxx.xx', str_entry)
            except Exception as e:
                print(f'Unexpected exception while scrubbing logs: {str(e)} for event {str_entry}')

        message = f'{self.api_key} {str_entry}\n'.encode('UTF-8')

        return self._sock.send(message)

    def __enter__(self):
        self._sock = self._connect()
        return self

    def __exit__(self, ex_type, ex_value, traceback):
        if self._sock:
            self._sock.close()
        if ex_type is not None:
            print('DatadogConnection exit: ', ex_type, ex_value, traceback)


def parse_cdn_logs(body, key):
    data = body.read()

    # all s3 objects from fastly should be streamed compressed as gzip, but
    # it's a good safety precaution just in case to check the extension.
    if key[-3:] == '.gz':
        with gzip.GzipFile(fileobj=BytesIO(data)) as decompress_stream:
            stream = BufferedReader(decompress_stream)
            for raw_line in stream:
                try:
                    line = json.loads(raw_line)
                except Exception:
                    logging.error(f'error parsing JSON. raw_line: {raw_line}')

                    # allow execution to continue, but the set fastlyState will
                    # result in the log not being sent to Datadog.
                    line = {'network': {'server': {'state': 'FORMAT_ERROR'}}}

                # we check if the request's final state was uncached and, if so, we'll
                # send those request log lines to Datadog.
                #
                # the reason we only send 'uncached' requests is because we need to
                # avoid running over our current Datadog limits (which can be costly).
                #
                # all available states for fastly_info.state can be found here:
                # https://support.fastly.com/hc/en-us/community/posts/360040168391-Useful-variables-to-log
                if re.search('^(MISS|PASS)', line.get('network', {}).get('server', {}).get('state', '')):
                    # any other coercion we want to do on the logline before sending
                    yield line


def lambda_handler(event, context):
    records = event.get('Records', [])

    with DatadogConnection(DD_URL, DD_PORT, DD_API_KEY) as dd_conn:
        for record in records:
            s3_event = record.get('s3', {})

            if not s3_event:
                return

            s3_bucket = s3_event['bucket']['name']
            s3_object = urllib.parse.unquote(s3_event['object']['key'])

            response = s3.get_object(Bucket=s3_bucket, Key=s3_object)
            try:
                body = response['Body']
            except Exception as e:
                raise(f'Unexpected exception while parsing cdn_logs: {str(e)}')

            cdn_log_parser = parse_cdn_logs(body, s3_object)
            for log_line in cdn_log_parser:
                dd_conn.safe_submit_log(log_line, context)

Test Event.json

{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-1",
      "eventTime": "2019-04-08T13:00:00.964Z",
      "eventName": "ObjectCreated:CompleteMultipartUpload",
      "userIdentity": {
        "principalId": "AWS:123"
      },
      "requestParameters": {
        "sourceIPAddress": "199.27.72.31"
      },
      "responseElements": {
        "x-amz-request-id": "456",
        "x-amz-id-2": "789"
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "tf-s3-lambda-20190408124645838400000001",
        "bucket": {
          "name": "<your_bucket>",
          "ownerIdentity": {
            "principalId": "123"
          },
          "arn": "arn:aws:s3:::<your_bucket>"
        },
        "object": {
          "key": "fastly/json/<your_domain>/dt=2019-04-16/2019-04-16T09:45:00.000-_9Hq9QX5I8nj3FZQPsPH.log.gz",
          "size": 8921,
          "eTag": "456",
          "sequencer": "789"
        }
      }
    }
  ]
}