Tags: #python
import argparse
import concurrent.futures
import json
import operator
import re
from datadog import api, initialize
options = {
"api_key": "foo",
"app_key": "foo"
}
initialize(**options)
def pprint(o):
"""pretty print data structures."""
print(json.dumps(o, indent=2, default=str), "\n")
def format_title(t):
"""print title in a format that makes it stand out visually.
example: "my title" > "\n########\nMY TITLE\n########\n"
"""
hashes = "#" * len(t)
print(f"\n{hashes}\n{t.upper()}\n{hashes}\n")
parser = argparse.ArgumentParser(
description="Datadog Metric Searcher (searches dashboards and monitors)")
parser.add_argument(
"-p", "--pattern", default=".",
help="regex pattern for filtering dashboard/monitor by name")
parser.add_argument(
"-m",
"--metrics",
help="comma-separated list of metrics",
required=True)
parser.add_argument(
"-u",
"--unused",
help="only display unused metrics",
action="store_true")
args = parser.parse_args()
metrics = [{"name": metric, "count": 0}
for metric in args.metrics.split(",") if metric]
def find_graphs(
widgets,
dashboard_title,
dashboard_url,
metrics,
matches=None):
"""recursively search dashboard graphs for those referencing our metrics.
Note: widgets can be nested multiple times, so this is a recursive function.
because this function is run in isolation within its own process we pass in
the dashboard title/url so we can report back within the main/parent process
which dashboard the graphs are associated with (as the results are received
based on which process is quickest to complete). we also pass in a list of
metrics to be looked up in the dashboards/graphs, as we can't manipulate the
metric list (defined in the parent process) from within a child process.
"""
if not matches:
matches = {}
for widget in widgets:
definition = widget.get("definition")
if definition["type"] != "note":
requests = definition.get("requests")
"""
example data:
{
"style": {
"palette": "green_to_orange",
"palette_flip": false
},
"group": [],
"title": "Hosts",
"node_type": "container",
"no_metric_hosts": true,
"scope": [
"$cluster",
"rig.service:user_auth_proxy"
],
"requests": {
"fill": {
"q": "avg:process.stat.container.io.wbps{$cluster,rig.service:user_auth_proxy} by {host}"
}
},
"no_group_hosts": true,
"type": "hostmap"
}
"""
if requests:
for request in requests:
metric_query = None
log_query = None
process_query = None
"""
the following if statement catches 'hostmap' graphs
whose requests key is a dict, not list[dict]
"requests": {
"fill": {
"q": "..."
}
}
"""
if isinstance(requests, dict) and request == "fill":
metric_query = requests.get(request, {}).get("q")
else:
try:
metric_query = request.get("q")
except AttributeError as err:
continue
log_query = request.get(
"log_query", {}).get(
"search", {}).get("query")
process_query = request.get(
"process_query", {}).get("metric")
if metric_query:
query = metric_query
elif log_query:
query = log_query
elif process_query:
query = process_query
else:
query = None
if not query:
continue
for metric in metrics:
if metric["name"] in query:
metric["count"] += 1
graph_title = definition.get("title", "N/A")
match = matches.get(graph_title)
if not match:
matches[graph_title] = []
match = matches[graph_title]
match.append({
"metric": query,
"type": definition["type"],
})
else:
nested_widgets = definition.get("widgets", [])
# recurse and ignore the dashboard title/url and metrics
# as from this stage of the function we don't care about them
_, _, _, d = find_graphs(
nested_widgets,
dashboard_title,
dashboard_url,
metrics,
matches
)
matches.update(d)
return dashboard_title, dashboard_url, metrics, matches
def all_dashboards():
"""acquire all dashboards."""
return api.Dashboard.get_all()
def all_monitors():
"""acquire all monitors."""
return api.Monitor.get_all()
def dashboard_get(dashboard: dict):
"""acquire dashboard by the given ID."""
return api.Dashboard.get(dashboard["id"])
def filter_dashboards(dashboards):
"""filter dashboards by pattern provided by -p/--pattern flag."""
filtered_dashboards = []
for dashboard in dashboards["dashboards"]:
if re.search(args.pattern, dashboard["title"], flags=re.IGNORECASE):
filtered_dashboards.append(
{
"title": dashboard["title"],
"id": dashboard["id"],
"url": dashboard["url"],
}
)
return sorted(filtered_dashboards, key=operator.itemgetter("title"))
def filter_monitors(monitors):
"""filter monitors by pattern provided by -p/--pattern flag."""
filtered_monitors = []
for monitor in monitors:
if re.search(args.pattern, monitor["name"], flags=re.IGNORECASE):
filtered_monitors.append(
{"name": monitor["name"],
"url": f"https://<YOUR_ORG>.datadoghq.com/monitors/{monitor['id']}",
"query": monitor["query"], })
return sorted(filtered_monitors, key=operator.itemgetter("name"))
def process():
"""asynchronously acquire dashboards and update metric count.
Note: the Datadog API is not asynchronous, so we must run API operations
within a threadpool, while also running the metric 'searching' algorithm
(a cpu heavy operation) within a processpool to help speed up the overall
program execution time.
"""
if not args.unused:
format_title("metrics to find")
pprint(metrics)
dashboards = None
monitors = None
with concurrent.futures.ThreadPoolExecutor() as executor:
wait_for = [
executor.submit(all_dashboards),
executor.submit(all_monitors)
]
for f in concurrent.futures.as_completed(wait_for):
"""identify which api finished first and assign to correct variable.
dashboard api returns a dictionary, while monitors returns a list.
"""
results = f.result()
if isinstance(results, dict):
dashboards = results
else:
monitors = results
if args.pattern == ".":
ld = len(dashboards['dashboards'])
lm = len(monitors)
d = f"{ld} dashboards"
m = f"{lm} monitors"
msg = f"\nno search pattern provided, meaning we'll search ALL {d} and {m}!\n"
print(msg)
filtered_dashboards = filter_dashboards(dashboards)
filtered_monitors = filter_monitors(monitors)
dashboards_metadata = []
track_dashboard_metrics = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
wait_for = [
executor.submit(dashboard_get, dashboard)
for dashboard in filtered_dashboards
]
for f in concurrent.futures.as_completed(wait_for):
dashboards_metadata.append(f.result())
if not args.unused:
format_title("dashboards")
with concurrent.futures.ProcessPoolExecutor() as executor:
metrics_copy = metrics.copy() # avoid accidental mutation
wait_for = [
executor.submit(
find_graphs,
dashboard["widgets"],
dashboard["title"],
dashboard["url"],
metrics_copy) for dashboard in dashboards_metadata]
for f in concurrent.futures.as_completed(wait_for):
title, url, metrics_mod, matches = f.result()
if matches:
if not args.unused:
print(title, "\n")
print(f"https://<YOUR_ORG>.datadoghq.com{url}\n")
pprint(matches)
print("---------\n")
for metric in metrics_mod:
if not track_dashboard_metrics.get(metric["name"]):
track_dashboard_metrics.update({metric["name"]:
metric["count"]})
else:
track_dashboard_metrics[metric["name"]] += metric["count"]
unused_dashboard_metrics = set()
unused_monitor_metrics = set()
used_monitor_metrics = set()
if not args.unused:
format_title("unused metrics in dashboards")
for metric, count in track_dashboard_metrics.items():
if count == 0:
unused_dashboard_metrics.add(metric)
print(metric)
if not args.unused:
format_title("monitors")
for monitor in filtered_monitors:
for metric in metrics:
if metric["name"] in monitor["query"]:
if not args.unused:
print(monitor["name"], "\n")
print(monitor["url"], "\n")
print(monitor["query"], "\n")
print("---------\n")
used_monitor_metrics.add(metric["name"])
else:
unused_monitor_metrics.add(metric["name"])
# avoid scenario where one monitor does reference the metric
# but a latter monitor DOES NOT reference it. when that happens
# we want to ensure we remove the metric name so it doesn't
# accidentally get marked later as being unused.
for metric in used_monitor_metrics:
try:
unused_monitor_metrics.remove(metric)
except KeyError:
pass
if not args.unused:
format_title("unused metrics in monitors")
for m in unused_monitor_metrics:
print(m)
format_title("unused metrics in monitors and dashboards")
unused_metrics = unused_dashboard_metrics.intersection(
unused_monitor_metrics)
for m in unused_metrics:
print(m)
print(f"\n{len(unused_metrics)} out of {len(metrics)} metrics are unused.")
if __name__ == '__main__':
process()
from datadog import api, initialize
options = {
"api_key": "foo",
"app_key": "bar"
}
initialize(**options)
result = api.Dashboard.get_all()
date_ordered = sorted(result["dashboards"], key=lambda d: d["modified_at"])
for dashboard in date_ordered:
print(f'{dashboard["title"]}: {dashboard["modified_at"]}')