From 2b4e5a512198babfa27aefa0b6219e13a36f417d Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Thu, 27 Apr 2023 17:03:44 +0200 Subject: [PATCH] feat(chalice): changed projects stats for health-check feat(crons): cahnged projects stats for health-check chore(helm): projectStats cron every 18 min chore(helm): projectStats-fix cron every Sunday at 5am --- api/chalicelib/core/health.py | 9 +- api/routers/crons/core_crons.py | 19 +-- api/routers/crons/core_dynamic_crons.py | 15 ++- ee/api/app_crons.py | 4 +- ee/api/chalicelib/core/health.py | 116 +++++++++++++++++- ee/api/routers/crons/core_dynamic_crons.py | 17 ++- .../openreplay/charts/utilities/values.yaml | 24 +++- 7 files changed, 174 insertions(+), 30 deletions(-) diff --git a/api/chalicelib/core/health.py b/api/chalicelib/core/health.py index 820f5fbae..7f9c02f1e 100644 --- a/api/chalicelib/core/health.py +++ b/api/chalicelib/core/health.py @@ -161,9 +161,12 @@ def __check_SSL(*_): def __get_sessions_stats(*_): with pg_client.PostgresClient() as cur: - query = cur.mogrify("""SELECT COALESCE(SUM(sessions_count),0) AS s_c, - COALESCE(SUM(events_count),0) AS e_c - FROM public.projects_stats;""") + constraints = ["projects.deteled_at IS NULL"] + query = cur.mogrify(f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c, + COALESCE(SUM(events_count),0) AS e_c + FROM public.projects_stats + INNER JOIN public.projects USING(project_id) + WHERE {" AND ".join(constraints)};""") cur.execute(query) row = cur.fetchone() return { diff --git a/api/routers/crons/core_crons.py b/api/routers/crons/core_crons.py index bdc73fdcd..aa9ce100f 100644 --- a/api/routers/crons/core_crons.py +++ b/api/routers/crons/core_crons.py @@ -1,20 +1,3 @@ -from apscheduler.triggers.cron import CronTrigger -from apscheduler.triggers.interval import IntervalTrigger - -from chalicelib.core import health - - -async def health_cron() -> None: - health.cron() - - -async def weekly_health_cron() -> None: - health.weekly_cron() - - cron_jobs = [ - {"func": health_cron, "trigger": IntervalTrigger(hours=0, minutes=30, start_date="2023-01-01 0:0:0", jitter=300), - "misfire_grace_time": 60 * 60, "max_instances": 1}, - {"func": weekly_health_cron, "trigger": CronTrigger(day_of_week="sun", hour=5), - "misfire_grace_time": 60 * 60, "max_instances": 1} + ] diff --git a/api/routers/crons/core_dynamic_crons.py b/api/routers/crons/core_dynamic_crons.py index b2086a22b..f1bc87929 100644 --- a/api/routers/crons/core_dynamic_crons.py +++ b/api/routers/crons/core_dynamic_crons.py @@ -1,7 +1,8 @@ from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger from chalicelib.core import telemetry -from chalicelib.core import weekly_report, jobs +from chalicelib.core import weekly_report, jobs, health async def run_scheduled_jobs() -> None: @@ -16,11 +17,23 @@ async def telemetry_cron() -> None: telemetry.compute() +async def health_cron() -> None: + health.cron() + + +async def weekly_health_cron() -> None: + health.weekly_cron() + + cron_jobs = [ {"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"), "misfire_grace_time": 60 * 60, "max_instances": 1}, {"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15), "misfire_grace_time": 20, "max_instances": 1}, {"func": weekly_report_cron, "trigger": CronTrigger(day_of_week="mon", hour=5), + "misfire_grace_time": 60 * 60, "max_instances": 1}, + {"func": health_cron, "trigger": IntervalTrigger(hours=0, minutes=30, start_date="2023-04-01 0:0:0", jitter=300), + "misfire_grace_time": 60 * 60, "max_instances": 1}, + {"func": weekly_health_cron, "trigger": CronTrigger(day_of_week="sun", hour=5), "misfire_grace_time": 60 * 60, "max_instances": 1} ] diff --git a/ee/api/app_crons.py b/ee/api/app_crons.py index 55bc91619..f4770c2b2 100644 --- a/ee/api/app_crons.py +++ b/ee/api/app_crons.py @@ -7,7 +7,9 @@ from routers.crons import core_dynamic_crons ACTIONS = { "TELEMETRY": core_dynamic_crons.telemetry_cron, "JOB": core_dynamic_crons.run_scheduled_jobs, - "REPORT": core_dynamic_crons.weekly_report + "REPORT": core_dynamic_crons.weekly_report, + "PROJECTS_STATS": core_dynamic_crons.health_cron, + "FIX_PROJECTS_STATS": core_dynamic_crons.weekly_health_cron } diff --git a/ee/api/chalicelib/core/health.py b/ee/api/chalicelib/core/health.py index b92530213..32e44c133 100644 --- a/ee/api/chalicelib/core/health.py +++ b/ee/api/chalicelib/core/health.py @@ -6,6 +6,7 @@ import requests from decouple import config from chalicelib.utils import pg_client, ch_client +from chalicelib.utils.TimeUTC import TimeUTC def app_connection_string(name, port, path): @@ -161,13 +162,14 @@ def __check_SSL(*_): def __get_sessions_stats(tenant_id, *_): with pg_client.PostgresClient() as cur: - extra_query = "" + constraints = ["projects.deteled_at IS NULL"] if tenant_id: - extra_query = """INNER JOIN public.projects USING(project_id) - WHERE tenant_id=%(tenant_id)s""" + constraints.append("tenant_id=%(tenant_id)s") query = cur.mogrify(f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c, COALESCE(SUM(events_count),0) AS e_c - FROM public.sessions_count {extra_query};""", + FROM public.projects_stats + INNER JOIN public.projects USING(project_id) + WHERE {" AND ".join(constraints)};""", {"tenant_id": tenant_id}) cur.execute(query) row = cur.fetchone() @@ -219,6 +221,112 @@ def get_health(tenant_id=None): return health_map +def cron(): + with pg_client.PostgresClient() as cur: + query = cur.mogrify("""SELECT projects.project_id, + projects.created_at, + projects.sessions_last_check_at, + projects.first_recorded_session_at, + projects_stats.last_update_at + FROM public.projects + LEFT JOIN public.projects_stats USING (project_id) + WHERE projects.deleted_at IS NULL + ORDER BY project_id;""") + cur.execute(query) + rows = cur.fetchall() + for r in rows: + insert = False + if r["last_update_at"] is None: + # never counted before, must insert + insert = True + if r["first_recorded_session_at"] is None: + if r["sessions_last_check_at"] is None: + count_start_from = r["created_at"] + else: + count_start_from = r["sessions_last_check_at"] + else: + count_start_from = r["first_recorded_session_at"] + + else: + # counted before, must update + count_start_from = r["last_update_at"] + + count_start_from = TimeUTC.datetime_to_timestamp(count_start_from) + params = {"project_id": r["project_id"], + "start_ts": count_start_from, + "end_ts": TimeUTC.now(), + "sessions_count": 0, + "events_count": 0} + + query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, + COALESCE(SUM(events_count),0) AS events_count + FROM public.sessions + WHERE project_id=%(project_id)s + AND start_ts>=%(start_ts)s + AND start_ts<=%(end_ts)s + AND duration IS NOT NULL;""", + params) + cur.execute(query) + row = cur.fetchone() + if row is not None: + params["sessions_count"] = row["sessions_count"] + params["events_count"] = row["events_count"] + + if insert: + query = cur.mogrify("""INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at) + VALUES (%(project_id)s, %(sessions_count)s, %(events_count)s, (now() AT TIME ZONE 'utc'::text));""", + params) + else: + query = cur.mogrify("""UPDATE public.projects_stats + SET sessions_count=sessions_count+%(sessions_count)s, + events_count=events_count+%(events_count)s, + last_update_at=(now() AT TIME ZONE 'utc'::text) + WHERE project_id=%(project_id)s;""", + params) + cur.execute(query) + + +# this cron is used to correct the sessions&events count every week +def weekly_cron(): + with pg_client.PostgresClient(long_query=True) as cur: + query = cur.mogrify("""SELECT project_id + projects_stats.last_update_at + FROM public.projects + LEFT JOIN public.projects_stats USING (project_id) + WHERE projects.deleted_at IS NULL + ORDER BY project_id;""") + cur.execute(query) + rows = cur.fetchall() + for r in rows: + if r["last_update_at"] is None: + continue + + params = {"project_id": r["project_id"], + "end_ts": TimeUTC.now(), + "sessions_count": 0, + "events_count": 0} + + query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, + SUM(events_count) AS events_count + FROM public.sessions + WHERE project_id=%(project_id)s + AND start_ts<=%(end_ts)s + AND duration IS NOT NULL;""", + params) + cur.execute(query) + row = cur.fetchone() + if row is not None: + params["sessions_count"] = row["sessions_count"] + params["events_count"] = row["events_count"] + + query = cur.mogrify("""UPDATE public.projects_stats + SET sessions_count=%(sessions_count)s, + events_count=%(events_count)s, + last_update_at=(now() AT TIME ZONE 'utc'::text) + WHERE project_id=%(project_id)s;""", + params) + cur.execute(query) + def __check_database_ch(*_): fail_response = { "health": False, diff --git a/ee/api/routers/crons/core_dynamic_crons.py b/ee/api/routers/crons/core_dynamic_crons.py index 89846366b..f9fc045cd 100644 --- a/ee/api/routers/crons/core_dynamic_crons.py +++ b/ee/api/routers/crons/core_dynamic_crons.py @@ -1,9 +1,10 @@ from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger from decouple import config from chalicelib.core import jobs from chalicelib.core import telemetry, unlock -from chalicelib.core import weekly_report as weekly_report_script +from chalicelib.core import weekly_report as weekly_report_script, health async def run_scheduled_jobs() -> None: @@ -18,12 +19,20 @@ async def telemetry_cron() -> None: telemetry.compute() -def unlock_cron() -> None: +async def unlock_cron() -> None: print("validating license") unlock.check() print(f"valid: {unlock.is_valid()}") +async def health_cron() -> None: + health.cron() + + +async def weekly_health_cron() -> None: + health.weekly_cron() + + cron_jobs = [ {"func": unlock_cron, "trigger": CronTrigger(day="*")}, ] @@ -34,6 +43,10 @@ SINGLE_CRONS = [ {"func": run_scheduled_jobs, "trigger": CronTrigger(day_of_week="*", hour=0, minute=15), "misfire_grace_time": 20, "max_instances": 1}, {"func": weekly_report, "trigger": CronTrigger(day_of_week="mon", hour=5), + "misfire_grace_time": 60 * 60, "max_instances": 1}, + {"func": health_cron, "trigger": IntervalTrigger(hours=0, minutes=30, start_date="2023-04-01 0:0:0", jitter=300), + "misfire_grace_time": 60 * 60, "max_instances": 1}, + {"func": weekly_health_cron, "trigger": CronTrigger(day_of_week="sun", hour=5), "misfire_grace_time": 60 * 60, "max_instances": 1} ] diff --git a/scripts/helmcharts/openreplay/charts/utilities/values.yaml b/scripts/helmcharts/openreplay/charts/utilities/values.yaml index 5f31b65aa..36e0f0c3d 100644 --- a/scripts/helmcharts/openreplay/charts/utilities/values.yaml +++ b/scripts/helmcharts/openreplay/charts/utilities/values.yaml @@ -26,7 +26,7 @@ telemetry: env: ACTION: "TELEMETRY" report: - # Ref: https://crontab.guru/#0_5_*_*_1 + # https://crontab.guru/#0_5_*_*_1 # Monday morning 5am cron: "0 5 * * 1" image: @@ -47,6 +47,28 @@ sessionsCleaner: tag: "" env: ACTION: "JOB" +projectsStats: + # https://crontab.guru/#*/18_*_*_*_* + # Every 18 min + cron: "*/18 * * * *" + image: + repository: "{{ .Values.global.openReplayContainerRegistry }}/crons" + pullPolicy: Always + # Overrides the image tag whose default is the chart appVersion. + tag: "" + env: + ACTION: "PROJECTS_STATS" +fixProjectsStats: + # https://crontab.guru/#0_5_*_*_0 + # Sunday at 5am + cron: "0 5 * * 0" + image: + repository: "{{ .Values.global.openReplayContainerRegistry }}/crons" + pullPolicy: Always + # Overrides the image tag whose default is the chart appVersion. + tag: "" + env: + ACTION: "FIX_PROJECTS_STATS" # Common env values are from chalice for the crons chalice: