Tags: #aws #python #logs
'''
This script is a modified/simplified version of a Python2 script provided by
Datadog for getting log data from S3 into their log aggregation pipeline.
The original script can be found here:
https://github.com/DataDog/datadog-serverless-functions/tree/master/aws/logs_monitoring
We've removed any code that wasn't relevant for our requirements, and have also
updated it to work with Python3.
'''
#!/usr/bin/env python3
# standard library modules
import gzip
import json
import logging
import os
import re
import socket
import ssl
import urllib
from io import BufferedReader, BytesIO
# third party modules
import boto3
# configuration
CLUSTER = os.environ['CLUSTER']
DD_URL = os.getenv('DD_URL', default='lambda-intake.logs.datadoghq.com')
DD_PORT = os.getenv('DD_PORT', default=10516)
IP_REGEX = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', re.I)
SECRETS_MANAGER_ID = os.environ['SECRETS_MANAGER_ID']
# service clients configuration
s3 = boto3.client('s3')
secretsmanager = boto3.client('secretsmanager')
try:
aws_secret_response = secretsmanager.get_secret_value(SecretId=SECRETS_MANAGER_ID)
DD_API_KEY = aws_secret_response.get('SecretString')
except Exception:
raise Exception('unable to acquire datadog secret api key')
class DatadogConnection(object):
def __init__(self, host, port, ddApiKey):
self.host = host
self.port = port
self.api_key = ddApiKey
self._sock = None
def _connect(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = ssl.wrap_socket(s)
s.connect((self.host, self.port))
return s
def safe_submit_log(self, log_entry, context):
try:
self.send_entry(log_entry, context)
except Exception:
# retry once
self._sock = self._connect()
self.send_entry(log_entry, context)
return self
def send_entry(self, log_entry, context):
if not isinstance(log_entry, dict):
raise Exception(f'log entry needs to be of type: dict: {log_entry}')
# ensure rig metadata is passed through
log_entry.update({'rig': {'cluster': CLUSTER, 'service': 'rig_cdn_logs_to_datadog'}})
# datadog expects log to be wrapped in a 'message' field
if 'message' not in log_entry:
log_entry = {'message': log_entry}
# documentation:
# https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html
tags = {
'functionName': context.function_name.lower(),
'functionVersion': context.function_version,
'memorysize': context.memory_limit_in_mb
}
log_entry.update({
'ddtags': ', '.join([f'{k}:{v}' for k, v in tags.items()]),
'ddsource': 'aws',
'source': 'fastly'
})
str_entry = json.dumps(log_entry)
if os.getenv('REDACT_IP'):
try:
str_entry = IP_REGEX.sub('xxx.xxx.xxx.xx', str_entry)
except Exception as e:
print(f'Unexpected exception while scrubbing logs: {str(e)} for event {str_entry}')
message = f'{self.api_key} {str_entry}\n'.encode('UTF-8')
return self._sock.send(message)
def __enter__(self):
self._sock = self._connect()
return self
def __exit__(self, ex_type, ex_value, traceback):
if self._sock:
self._sock.close()
if ex_type is not None:
print('DatadogConnection exit: ', ex_type, ex_value, traceback)
def parse_cdn_logs(body, key):
data = body.read()
# all s3 objects from fastly should be streamed compressed as gzip, but
# it's a good safety precaution just in case to check the extension.
if key[-3:] == '.gz':
with gzip.GzipFile(fileobj=BytesIO(data)) as decompress_stream:
stream = BufferedReader(decompress_stream)
for raw_line in stream:
try:
line = json.loads(raw_line)
except Exception:
logging.error(f'error parsing JSON. raw_line: {raw_line}')
# allow execution to continue, but the set fastlyState will
# result in the log not being sent to Datadog.
line = {'network': {'server': {'state': 'FORMAT_ERROR'}}}
# we check if the request's final state was uncached and, if so, we'll
# send those request log lines to Datadog.
#
# the reason we only send 'uncached' requests is because we need to
# avoid running over our current Datadog limits (which can be costly).
#
# all available states for fastly_info.state can be found here:
# https://support.fastly.com/hc/en-us/community/posts/360040168391-Useful-variables-to-log
if re.search('^(MISS|PASS)', line.get('network', {}).get('server', {}).get('state', '')):
# any other coercion we want to do on the logline before sending
yield line
def lambda_handler(event, context):
records = event.get('Records', [])
with DatadogConnection(DD_URL, DD_PORT, DD_API_KEY) as dd_conn:
for record in records:
s3_event = record.get('s3', {})
if not s3_event:
return
s3_bucket = s3_event['bucket']['name']
s3_object = urllib.parse.unquote(s3_event['object']['key'])
response = s3.get_object(Bucket=s3_bucket, Key=s3_object)
try:
body = response['Body']
except Exception as e:
raise(f'Unexpected exception while parsing cdn_logs: {str(e)}')
cdn_log_parser = parse_cdn_logs(body, s3_object)
for log_line in cdn_log_parser:
dd_conn.safe_submit_log(log_line, context)
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "2019-04-08T13:00:00.964Z",
"eventName": "ObjectCreated:CompleteMultipartUpload",
"userIdentity": {
"principalId": "AWS:123"
},
"requestParameters": {
"sourceIPAddress": "199.27.72.31"
},
"responseElements": {
"x-amz-request-id": "456",
"x-amz-id-2": "789"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "tf-s3-lambda-20190408124645838400000001",
"bucket": {
"name": "<your_bucket>",
"ownerIdentity": {
"principalId": "123"
},
"arn": "arn:aws:s3:::<your_bucket>"
},
"object": {
"key": "fastly/json/<your_domain>/dt=2019-04-16/2019-04-16T09:45:00.000-_9Hq9QX5I8nj3FZQPsPH.log.gz",
"size": 8921,
"eTag": "456",
"sequencer": "789"
}
}
}
]
}