From 03ea03aba27d54211e75199ff81fa63ffc331f0b Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Tue, 11 Jan 2022 11:05:50 +0100 Subject: [PATCH] feat(alerts): refactored alerts feat(alerts): fixed decimal precision feat(alerts): configurable cron-interval --- api/app_alerts.py | 4 +- api/build_alerts.sh | 4 +- api/chalicelib/core/alerts_listener.py | 27 +++ api/chalicelib/core/alerts_processor.py | 29 +-- ee/api/.gitignore | 5 +- ee/api/chalicelib/core/alerts_listener.py | 27 +++ ee/api/chalicelib/core/alerts_processor.py | 265 --------------------- 7 files changed, 66 insertions(+), 295 deletions(-) create mode 100644 api/chalicelib/core/alerts_listener.py create mode 100644 ee/api/chalicelib/core/alerts_listener.py delete mode 100644 ee/api/chalicelib/core/alerts_processor.py diff --git a/api/app_alerts.py b/api/app_alerts.py index 0cba8986a..60e1b7e48 100644 --- a/api/app_alerts.py +++ b/api/app_alerts.py @@ -1,6 +1,7 @@ import logging from apscheduler.schedulers.asyncio import AsyncIOScheduler +from decouple import config from fastapi import FastAPI from chalicelib.core import alerts_processor @@ -16,7 +17,8 @@ async def root(): app.schedule = AsyncIOScheduler() app.schedule.start() -app.schedule.add_job(id="alerts_processor", **{"func": alerts_processor.process, "trigger": "interval", "minutes": 5, +app.schedule.add_job(id="alerts_processor", **{"func": alerts_processor.process, "trigger": "interval", + "minutes": config("ALERTS_INTERVAL", cast=int, default=5), "misfire_grace_time": 20}) for job in app.schedule.get_jobs(): diff --git a/api/build_alerts.sh b/api/build_alerts.sh index bfe9c22a1..83fac6f83 100644 --- a/api/build_alerts.sh +++ b/api/build_alerts.sh @@ -15,7 +15,7 @@ function make_submodule() { mkdir -p ./alerts/chalicelib/ cp -R ./chalicelib/__init__.py ./alerts/chalicelib/ mkdir -p ./alerts/chalicelib/core/ - cp -R ./chalicelib/core/{__init__,alerts_processor,sessions,events,issues,sessions_metas,metadata,projects,users,authorizers,tenants,assist,events_ios,sessions_mobs,errors,sourcemaps,sourcemaps_parser,resources,performance_event,alerts,notifications,slack,collaboration_slack,webhook}.py ./alerts/chalicelib/core/ + cp -R ./chalicelib/core/{__init__,alerts_processor,alerts_listener,sessions,events,issues,sessions_metas,metadata,projects,users,authorizers,tenants,assist,events_ios,sessions_mobs,errors,sourcemaps,sourcemaps_parser,resources,performance_event,alerts,notifications,slack,collaboration_slack,webhook}.py ./alerts/chalicelib/core/ mkdir -p ./alerts/chalicelib/utils/ cp -R ./chalicelib/utils/{__init__,TimeUTC,pg_client,helper,event_filter_definition,dev,email_helper,email_handler,smtp,s3,metrics_helper}.py ./alerts/chalicelib/utils/ # -- end of generated part @@ -27,7 +27,7 @@ function make_submodule() { mkdir -p ./alerts/chalicelib/ cp -R ./chalicelib/__init__.py ./alerts/chalicelib/ mkdir -p ./alerts/chalicelib/core/ - cp -R ./chalicelib/core/{__init__,alerts_processor,sessions,events,issues,sessions_metas,metadata,projects,users,authorizers,tenants,roles,assist,events_ios,sessions_mobs,errors,dashboard,sourcemaps,sourcemaps_parser,resources,performance_event,alerts,notifications,slack,collaboration_slack,webhook}.py ./alerts/chalicelib/core/ + cp -R ./chalicelib/core/{__init__,alerts_processor,alerts_listener,sessions,events,issues,sessions_metas,metadata,projects,users,authorizers,tenants,roles,assist,events_ios,sessions_mobs,errors,dashboard,sourcemaps,sourcemaps_parser,resources,performance_event,alerts,notifications,slack,collaboration_slack,webhook}.py ./alerts/chalicelib/core/ mkdir -p ./alerts/chalicelib/utils/ cp -R ./chalicelib/utils/{__init__,TimeUTC,pg_client,helper,event_filter_definition,dev,SAML2_helper,email_helper,email_handler,smtp,s3,args_transformer,ch_client,metrics_helper}.py ./alerts/chalicelib/utils/ # -- end of generated part diff --git a/api/chalicelib/core/alerts_listener.py b/api/chalicelib/core/alerts_listener.py new file mode 100644 index 000000000..419f0326d --- /dev/null +++ b/api/chalicelib/core/alerts_listener.py @@ -0,0 +1,27 @@ +from chalicelib.utils import pg_client, helper + + +def get_all_alerts(): + with pg_client.PostgresClient(long_query=True) as cur: + query = """SELECT -1 AS tenant_id, + alert_id, + project_id, + detection_method, + query, + options, + (EXTRACT(EPOCH FROM alerts.created_at) * 1000)::BIGINT AS created_at, + alerts.name, + alerts.series_id, + filter + FROM public.alerts + LEFT JOIN metric_series USING (series_id) + INNER JOIN projects USING (project_id) + WHERE alerts.deleted_at ISNULL + AND alerts.active + AND projects.active + AND projects.deleted_at ISNULL + AND (alerts.series_id ISNULL OR metric_series.deleted_at ISNULL) + ORDER BY alerts.created_at;""" + cur.execute(query=query) + all_alerts = helper.list_to_camel_case(cur.fetchall()) + return all_alerts diff --git a/api/chalicelib/core/alerts_processor.py b/api/chalicelib/core/alerts_processor.py index 281064abf..43f57a96b 100644 --- a/api/chalicelib/core/alerts_processor.py +++ b/api/chalicelib/core/alerts_processor.py @@ -1,6 +1,7 @@ import schemas +from chalicelib.core import alerts_listener from chalicelib.core import sessions, alerts -from chalicelib.utils import pg_client, helper +from chalicelib.utils import pg_client from chalicelib.utils.TimeUTC import TimeUTC LeftToDb = { @@ -195,29 +196,7 @@ def Build(a): def process(): notifications = [] - with pg_client.PostgresClient(long_query=True) as cur: - query = """SELECT -1 AS tenant_id, - alert_id, - project_id, - detection_method, - query, - options, - (EXTRACT(EPOCH FROM alerts.created_at) * 1000)::BIGINT AS created_at, - alerts.name, - alerts.series_id, - filter - FROM public.alerts - LEFT JOIN metric_series USING (series_id) - INNER JOIN projects USING (project_id) - WHERE alerts.deleted_at ISNULL - AND alerts.active - AND projects.active - AND projects.deleted_at ISNULL - AND (alerts.series_id ISNULL OR metric_series.deleted_at ISNULL) - ORDER BY alerts.created_at;""" - cur.execute(query=query) - all_alerts = helper.list_to_camel_case(cur.fetchall()) - + all_alerts = alerts_listener.get_all_alerts() with pg_client.PostgresClient() as cur: for alert in all_alerts: if can_check(alert): @@ -235,7 +214,7 @@ def process(): "alertId": alert["alertId"], "tenantId": alert["tenantId"], "title": alert["name"], - "description": f"has been triggered, {alert['query']['left']} = {result['value']} ({alert['query']['operator']} {alert['query']['right']}).", + "description": f"has been triggered, {alert['query']['left']} = {round(result['value'], 2)} ({alert['query']['operator']} {alert['query']['right']}).", "buttonText": "Check metrics for more details", "buttonUrl": f"/{alert['projectId']}/metrics", "imageUrl": None, diff --git a/ee/api/.gitignore b/ee/api/.gitignore index cfd351eea..8afea0ab6 100644 --- a/ee/api/.gitignore +++ b/ee/api/.gitignore @@ -178,6 +178,7 @@ README/* Pipfile /chalicelib/core/alerts.py +/chalicelib/core/alerts_processor.py /chalicelib/core/announcements.py /chalicelib/blueprints/bp_app_api.py /chalicelib/blueprints/bp_core.py @@ -261,5 +262,5 @@ Pipfile /chalicelib/core/custom_metrics.py /chalicelib/core/performance_event.py /chalicelib/core/saved_search.py -/requirements_alerts.txt -/app_alerts.py \ No newline at end of file +/app_alerts.py +/build_alerts.sh diff --git a/ee/api/chalicelib/core/alerts_listener.py b/ee/api/chalicelib/core/alerts_listener.py new file mode 100644 index 000000000..40241f51e --- /dev/null +++ b/ee/api/chalicelib/core/alerts_listener.py @@ -0,0 +1,27 @@ +from chalicelib.utils import pg_client, helper + + +def get_all_alerts(): + with pg_client.PostgresClient(long_query=True) as cur: + query = """SELECT tenant_id, + alert_id, + project_id, + detection_method, + query, + options, + (EXTRACT(EPOCH FROM alerts.created_at) * 1000)::BIGINT AS created_at, + alerts.name, + alerts.series_id, + filter + FROM public.alerts + LEFT JOIN metric_series USING (series_id) + INNER JOIN projects USING (project_id) + WHERE alerts.deleted_at ISNULL + AND alerts.active + AND projects.active + AND projects.deleted_at ISNULL + AND (alerts.series_id ISNULL OR metric_series.deleted_at ISNULL) + ORDER BY alerts.created_at;""" + cur.execute(query=query) + all_alerts = helper.list_to_camel_case(cur.fetchall()) + return all_alerts diff --git a/ee/api/chalicelib/core/alerts_processor.py b/ee/api/chalicelib/core/alerts_processor.py deleted file mode 100644 index 3bb6b202f..000000000 --- a/ee/api/chalicelib/core/alerts_processor.py +++ /dev/null @@ -1,265 +0,0 @@ -import schemas -from chalicelib.core import sessions, alerts -from chalicelib.utils import pg_client, helper -from chalicelib.utils.TimeUTC import TimeUTC - -LeftToDb = { - schemas.AlertColumn.performance__dom_content_loaded__average: { - "table": "events.pages INNER JOIN public.sessions USING(session_id)", - "formula": "COALESCE(AVG(NULLIF(dom_content_loaded_time ,0)),0)"}, - schemas.AlertColumn.performance__first_meaningful_paint__average: { - "table": "events.pages INNER JOIN public.sessions USING(session_id)", - "formula": "COALESCE(AVG(NULLIF(first_contentful_paint_time,0)),0)"}, - schemas.AlertColumn.performance__page_load_time__average: { - "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(load_time ,0))"}, - schemas.AlertColumn.performance__dom_build_time__average: { - "table": "events.pages INNER JOIN public.sessions USING(session_id)", - "formula": "AVG(NULLIF(dom_building_time,0))"}, - schemas.AlertColumn.performance__speed_index__average: { - "table": "events.pages INNER JOIN public.sessions USING(session_id)", "formula": "AVG(NULLIF(speed_index,0))"}, - schemas.AlertColumn.performance__page_response_time__average: { - "table": "events.pages INNER JOIN public.sessions USING(session_id)", - "formula": "AVG(NULLIF(response_time,0))"}, - schemas.AlertColumn.performance__ttfb__average: { - "table": "events.pages INNER JOIN public.sessions USING(session_id)", - "formula": "AVG(NULLIF(first_paint_time,0))"}, - schemas.AlertColumn.performance__time_to_render__average: { - "table": "events.pages INNER JOIN public.sessions USING(session_id)", - "formula": "AVG(NULLIF(visually_complete,0))"}, - schemas.AlertColumn.performance__image_load_time__average: { - "table": "events.resources INNER JOIN public.sessions USING(session_id)", - "formula": "AVG(NULLIF(resources.duration,0))", "condition": "type='img'"}, - schemas.AlertColumn.performance__request_load_time__average: { - "table": "events.resources INNER JOIN public.sessions USING(session_id)", - "formula": "AVG(NULLIF(resources.duration,0))", "condition": "type='fetch'"}, - schemas.AlertColumn.resources__load_time__average: { - "table": "events.resources INNER JOIN public.sessions USING(session_id)", - "formula": "AVG(NULLIF(resources.duration,0))"}, - schemas.AlertColumn.resources__missing__count: { - "table": "events.resources INNER JOIN public.sessions USING(session_id)", - "formula": "COUNT(DISTINCT url_hostpath)", "condition": "success= FALSE"}, - schemas.AlertColumn.errors__4xx_5xx__count: { - "table": "events.resources INNER JOIN public.sessions USING(session_id)", "formula": "COUNT(session_id)", - "condition": "status/100!=2"}, - schemas.AlertColumn.errors__4xx__count: {"table": "events.resources INNER JOIN public.sessions USING(session_id)", - "formula": "COUNT(session_id)", "condition": "status/100=4"}, - schemas.AlertColumn.errors__5xx__count: {"table": "events.resources INNER JOIN public.sessions USING(session_id)", - "formula": "COUNT(session_id)", "condition": "status/100=5"}, - schemas.AlertColumn.errors__javascript__impacted_sessions__count: { - "table": "events.resources INNER JOIN public.sessions USING(session_id)", - "formula": "COUNT(DISTINCT session_id)", "condition": "success= FALSE AND type='script'"}, - schemas.AlertColumn.performance__crashes__count: { - "table": "(SELECT *, start_ts AS timestamp FROM public.sessions WHERE errors_count > 0) AS sessions", - "formula": "COUNT(DISTINCT session_id)", "condition": "errors_count > 0"}, - schemas.AlertColumn.errors__javascript__count: { - "table": "events.errors INNER JOIN public.errors AS m_errors USING (error_id)", - "formula": "COUNT(DISTINCT session_id)", "condition": "source='js_exception'", "joinSessions": False}, - schemas.AlertColumn.errors__backend__count: { - "table": "events.errors INNER JOIN public.errors AS m_errors USING (error_id)", - "formula": "COUNT(DISTINCT session_id)", "condition": "source!='js_exception'", "joinSessions": False}, -} - -# This is the frequency of execution for each threshold -TimeInterval = { - 15: 3, - 30: 5, - 60: 10, - 120: 20, - 240: 30, - 1440: 60, -} - - -def can_check(a) -> bool: - now = TimeUTC.now() - - repetitionBase = a["options"]["currentPeriod"] \ - if a["detectionMethod"] == schemas.AlertDetectionMethod.change \ - and a["options"]["currentPeriod"] > a["options"]["previousPeriod"] \ - else a["options"]["previousPeriod"] - - if TimeInterval.get(repetitionBase) is None: - print(f"repetitionBase: {repetitionBase} NOT FOUND") - return False - - return (a["options"]["renotifyInterval"] <= 0 or - a["options"].get("lastNotification") is None or - a["options"]["lastNotification"] <= 0 or - ((now - a["options"]["lastNotification"]) > a["options"]["renotifyInterval"] * 60 * 1000)) \ - and ((now - a["createdAt"]) % (TimeInterval[repetitionBase] * 60 * 1000)) < 60 * 1000 - - -def Build(a): - params = {"project_id": a["projectId"]} - full_args = {} - j_s = True - if a["seriesId"] is not None: - a["filter"]["sort"] = "session_id" - a["filter"]["order"] = "DESC" - a["filter"]["startDate"] = -1 - a["filter"]["endDate"] = TimeUTC.now() - full_args, query_part, sort = sessions.search_query_parts( - data=schemas.SessionsSearchPayloadSchema.parse_obj(a["filter"]), - error_status=None, errors_only=False, - favorite_only=False, issue=None, project_id=a["projectId"], - user_id=None) - subQ = f"""SELECT COUNT(session_id) AS value - {query_part}""" - else: - colDef = LeftToDb[a["query"]["left"]] - subQ = f"""SELECT {colDef["formula"]} AS value - FROM {colDef["table"]} - WHERE project_id = %(project_id)s - {"AND " + colDef["condition"] if colDef.get("condition") is not None else ""}""" - j_s = colDef.get("joinSessions", True) - - q = f"""SELECT coalesce(value,0) AS value, coalesce(value,0) {a["query"]["operator"]} {a["query"]["right"]} AS valid""" - - # if len(colDef.group) > 0 { - # subQ = subQ.Column(colDef.group + " AS group_value") - # subQ = subQ.GroupBy(colDef.group) - # q = q.Column("group_value") - # } - - if a["detectionMethod"] == schemas.AlertDetectionMethod.threshold: - if a["seriesId"] is not None: - q += f""" FROM ({subQ}) AS stat""" - else: - q += f""" FROM ({subQ} AND timestamp>=%(startDate)s - {"AND sessions.start_ts >= %(startDate)s" if j_s else ""}) AS stat""" - params = {**params, **full_args, "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000} - else: - if a["options"]["change"] == schemas.AlertDetectionChangeType.change: - # if len(colDef.group) > 0: - # subq1 := subQ.Where(sq.Expr("timestamp>=$2 ", time.Now().Unix()-a.Options.CurrentPeriod * 60)) - # sub2, args2, _ := subQ.Where( - # sq.And{ - # sq.Expr("timestamp<$3 ", time.Now().Unix()-a.Options.CurrentPeriod * 60), - # sq.Expr("timestamp>=$4 ", time.Now().Unix()-2 * a.Options.CurrentPeriod * 60), - # }).ToSql() - # sub1 := sq.Select("group_value", "(stat1.value-stat2.value) AS value").FromSelect(subq1, "stat1").JoinClause("INNER JOIN ("+sub2+") AS stat2 USING(group_value)", args2...) - # q = q.FromSelect(sub1, "stat") - # else: - if a["seriesId"] is not None: - sub2 = subQ.replace("%(startDate)s", "%(timestamp_sub2)s").replace("%(endDate)s", "%(startDate)s") - sub1 = f"SELECT (({subQ})-({sub2})) AS value" - q += f" FROM ( {sub1} ) AS stat" - params = {**params, **full_args, - "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000, - "timestamp_sub2": TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000} - else: - sub1 = f"""{subQ} AND timestamp>=%(startDate)s - {"AND sessions.start_ts >= %(startDate)s" if j_s else ""}""" - params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000 - sub2 = f"""{subQ} AND timestamp<%(startDate)s - AND timestamp>=%(timestamp_sub2)s - {"AND sessions.start_ts < %(startDate)s AND sessions.start_ts >= %(timestamp_sub2)s" if j_s else ""}""" - params["timestamp_sub2"] = TimeUTC.now() - 2 * a["options"]["currentPeriod"] * 60 * 1000 - sub1 = f"SELECT (( {sub1} )-( {sub2} )) AS value" - q += f" FROM ( {sub1} ) AS stat" - - else: - # if len(colDef.group) >0 { - # subq1 := subQ.Where(sq.Expr("timestamp>=$2 ", time.Now().Unix()-a.Options.CurrentPeriod * 60)) - # sub2, args2, _ := subQ.Where( - # sq.And{ - # sq.Expr("timestamp<$3 ", time.Now().Unix()-a.Options.CurrentPeriod * 60), - # sq.Expr("timestamp>=$4 ", time.Now().Unix()-a.Options.PreviousPeriod * 60-a.Options.CurrentPeriod * 60), - # }).ToSql() - # sub1 := sq.Select("group_value", "(stat1.value/stat2.value-1)*100 AS value").FromSelect(subq1, "stat1").JoinClause("INNER JOIN ("+sub2+") AS stat2 USING(group_value)", args2...) - # q = q.FromSelect(sub1, "stat") - # } else { - if a["seriesId"] is not None: - sub2 = subQ.replace("%(startDate)s", "%(timestamp_sub2)s").replace("%(endDate)s", "%(startDate)s") - sub1 = f"SELECT (({subQ})/NULLIF(({sub2}),0)-1)*100 AS value" - q += f" FROM ({sub1}) AS stat" - params = {**params, **full_args, - "startDate": TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000, - "timestamp_sub2": TimeUTC.now() \ - - (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) \ - * 60 * 1000} - else: - sub1 = f"""{subQ} AND timestamp>=%(startDate)s - {"AND sessions.start_ts >= %(startDate)s" if j_s else ""}""" - params["startDate"] = TimeUTC.now() - a["options"]["currentPeriod"] * 60 * 1000 - sub2 = f"""{subQ} AND timestamp<%(startDate)s - AND timestamp>=%(timestamp_sub2)s - {"AND sessions.start_ts < %(startDate)s AND sessions.start_ts >= %(timestamp_sub2)s" if j_s else ""}""" - params["timestamp_sub2"] = TimeUTC.now() \ - - (a["options"]["currentPeriod"] + a["options"]["currentPeriod"]) * 60 * 1000 - sub1 = f"SELECT (({sub1})/NULLIF(({sub2}),0)-1)*100 AS value" - q += f" FROM ({sub1}) AS stat" - - return q, params - - -def process(): - notifications = [] - with pg_client.PostgresClient(long_query=True) as cur: - query = """SELECT tenant_id, - alert_id, - project_id, - detection_method, - query, - options, - (EXTRACT(EPOCH FROM alerts.created_at) * 1000)::BIGINT AS created_at, - alerts.name, - alerts.series_id, - filter - FROM public.alerts - LEFT JOIN metric_series USING (series_id) - INNER JOIN projects USING (project_id) - WHERE alerts.deleted_at ISNULL - AND alerts.active - AND projects.active - AND projects.deleted_at ISNULL - AND (alerts.series_id ISNULL OR metric_series.deleted_at ISNULL) - ORDER BY alerts.created_at;""" - cur.execute(query=query) - all_alerts = helper.list_to_camel_case(cur.fetchall()) - - with pg_client.PostgresClient() as cur: - for alert in all_alerts: - if can_check(alert): - print(f"Querying alertId:{alert['alertId']} name: {alert['name']}") - query, params = Build(alert) - query = cur.mogrify(query, params) - # print(alert) - # print(query) - try: - cur.execute(query) - result = cur.fetchone() - if result["valid"]: - print("Valid alert, notifying users") - notifications.append({ - "alertId": alert["alertId"], - "tenantId": alert["tenantId"], - "title": alert["name"], - "description": f"has been triggered, {alert['query']['left']} = {result['value']} ({alert['query']['operator']} {alert['query']['right']}).", - "buttonText": "Check metrics for more details", - "buttonUrl": f"/{alert['projectId']}/metrics", - "imageUrl": None, - "options": {"source": "ALERT", "sourceId": alert["alertId"], - "sourceMeta": alert["detectionMethod"], - "message": alert["options"]["message"], "projectId": alert["projectId"], - "data": {"title": alert["name"], - "limitValue": alert["query"]["right"], "actualValue": result["value"], - "operator": alert["query"]["operator"], - "trigger": alert["query"]["left"], - "alertId": alert["alertId"], - "detectionMethod": alert["detectionMethod"], - "currentPeriod": alert["options"]["currentPeriod"], - "previousPeriod": alert["options"]["previousPeriod"], - "createdAt": TimeUTC.now()}}, - }) - except Exception as e: - print(f"!!!Error while running alert query for alertId:{alert['alertId']}") - print(str(e)) - print(query) - if len(notifications) > 0: - cur.execute( - cur.mogrify(f"""UPDATE public.Alerts - SET options = options||'{{"lastNotification":{TimeUTC.now()}}}'::jsonb - WHERE alert_id IN %(ids)s;""", {"ids": tuple([n["alertId"] for n in notifications])})) - if len(notifications) > 0: - alerts.process_notifications(notifications)