diff --git a/api/chalicelib/core/boarding.py b/api/chalicelib/core/boarding.py index 50e2ca713..d4fbea9fb 100644 --- a/api/chalicelib/core/boarding.py +++ b/api/chalicelib/core/boarding.py @@ -13,15 +13,18 @@ def get_state(tenant_id): if len(pids) > 0: cur.execute( - cur.mogrify("""SELECT EXISTS(( SELECT 1 + cur.mogrify( + """SELECT EXISTS(( SELECT 1 FROM public.sessions AS s WHERE s.project_id IN %(ids)s)) AS exists;""", - {"ids": tuple(pids)}) + {"ids": tuple(pids)}, + ) ) recorded = cur.fetchone()["exists"] meta = False if recorded: - query = cur.mogrify(f"""SELECT EXISTS((SELECT 1 + query = cur.mogrify( + f"""SELECT EXISTS((SELECT 1 FROM public.projects AS p LEFT JOIN LATERAL ( SELECT 1 FROM public.sessions @@ -36,26 +39,35 @@ def get_state(tenant_id): OR p.metadata_8 IS NOT NULL OR p.metadata_9 IS NOT NULL OR p.metadata_10 IS NOT NULL ) )) AS exists;""", - {"tenant_id": tenant_id}) + {"tenant_id": tenant_id}, + ) cur.execute(query) meta = cur.fetchone()["exists"] return [ - {"task": "Install OpenReplay", - "done": recorded, - "URL": "https://docs.openreplay.com/getting-started/quick-start"}, - {"task": "Identify Users", - "done": meta, - "URL": "https://docs.openreplay.com/data-privacy-security/metadata"}, - {"task": "Invite Team Members", - "done": len(users.get_members(tenant_id=tenant_id)) > 1, - "URL": "https://app.openreplay.com/client/manage-users"}, - {"task": "Integrations", - "done": len(datadog.get_all(tenant_id=tenant_id)) > 0 \ - or len(sentry.get_all(tenant_id=tenant_id)) > 0 \ - or len(stackdriver.get_all(tenant_id=tenant_id)) > 0, - "URL": "https://docs.openreplay.com/integrations"} + { + "task": "Install OpenReplay", + "done": recorded, + "URL": "https://docs.openreplay.com/getting-started/quick-start", + }, + { + "task": "Identify Users", + "done": meta, + "URL": "https://docs.openreplay.com/data-privacy-security/metadata", + }, + { + "task": "Invite Team Members", + "done": len(users.get_members(tenant_id=tenant_id)) > 1, + "URL": "https://app.openreplay.com/client/manage-users", + }, + { + "task": "Integrations", + "done": len(datadog.get_all(tenant_id=tenant_id)) > 0 + or len(sentry.get_all(tenant_id=tenant_id)) > 0 + or len(stackdriver.get_all(tenant_id=tenant_id)) > 0, + "URL": "https://docs.openreplay.com/integrations", + }, ] @@ -66,21 +78,26 @@ def get_state_installing(tenant_id): if len(pids) > 0: cur.execute( - cur.mogrify("""SELECT EXISTS(( SELECT 1 + cur.mogrify( + """SELECT EXISTS(( SELECT 1 FROM public.sessions AS s WHERE s.project_id IN %(ids)s)) AS exists;""", - {"ids": tuple(pids)}) + {"ids": tuple(pids)}, + ) ) recorded = cur.fetchone()["exists"] - return {"task": "Install OpenReplay", - "done": recorded, - "URL": "https://docs.openreplay.com/getting-started/quick-start"} + return { + "task": "Install OpenReplay", + "done": recorded, + "URL": "https://docs.openreplay.com/getting-started/quick-start", + } def get_state_identify_users(tenant_id): with pg_client.PostgresClient() as cur: - query = cur.mogrify(f"""SELECT EXISTS((SELECT 1 + query = cur.mogrify( + f"""SELECT EXISTS((SELECT 1 FROM public.projects AS p LEFT JOIN LATERAL ( SELECT 1 FROM public.sessions @@ -95,25 +112,32 @@ def get_state_identify_users(tenant_id): OR p.metadata_8 IS NOT NULL OR p.metadata_9 IS NOT NULL OR p.metadata_10 IS NOT NULL ) )) AS exists;""", - {"tenant_id": tenant_id}) + {"tenant_id": tenant_id}, + ) cur.execute(query) meta = cur.fetchone()["exists"] - return {"task": "Identify Users", - "done": meta, - "URL": "https://docs.openreplay.com/data-privacy-security/metadata"} + return { + "task": "Identify Users", + "done": meta, + "URL": "https://docs.openreplay.com/data-privacy-security/metadata", + } def get_state_manage_users(tenant_id): - return {"task": "Invite Team Members", - "done": len(users.get_members(tenant_id=tenant_id)) > 1, - "URL": "https://app.openreplay.com/client/manage-users"} + return { + "task": "Invite Team Members", + "done": len(users.get_members(tenant_id=tenant_id)) > 1, + "URL": "https://app.openreplay.com/client/manage-users", + } def get_state_integrations(tenant_id): - return {"task": "Integrations", - "done": len(datadog.get_all(tenant_id=tenant_id)) > 0 \ - or len(sentry.get_all(tenant_id=tenant_id)) > 0 \ - or len(stackdriver.get_all(tenant_id=tenant_id)) > 0, - "URL": "https://docs.openreplay.com/integrations"} + return { + "task": "Integrations", + "done": len(datadog.get_all(tenant_id=tenant_id)) > 0 + or len(sentry.get_all(tenant_id=tenant_id)) > 0 + or len(stackdriver.get_all(tenant_id=tenant_id)) > 0, + "URL": "https://docs.openreplay.com/integrations", + } diff --git a/api/chalicelib/core/health.py b/api/chalicelib/core/health.py index dc9d48ca4..90af134e2 100644 --- a/api/chalicelib/core/health.py +++ b/api/chalicelib/core/health.py @@ -27,7 +27,6 @@ HEALTH_ENDPOINTS = { "http": app_connection_string("http-openreplay", 8888, "metrics"), "ingress-nginx": app_connection_string("ingress-nginx-openreplay", 80, "healthz"), "integrations": app_connection_string("integrations-openreplay", 8888, "metrics"), - "peers": app_connection_string("peers-openreplay", 8888, "health"), "sink": app_connection_string("sink-openreplay", 8888, "metrics"), "sourcemapreader": app_connection_string( "sourcemapreader-openreplay", 8888, "health" @@ -39,9 +38,7 @@ HEALTH_ENDPOINTS = { def __check_database_pg(*_): fail_response = { "health": False, - "details": { - "errors": ["Postgres health-check failed"] - } + "details": {"errors": ["Postgres health-check failed"]}, } with pg_client.PostgresClient() as cur: try: @@ -63,29 +60,26 @@ def __check_database_pg(*_): "details": { # "version": server_version["server_version"], # "schema": schema_version["version"] - } + }, } def __always_healthy(*_): - return { - "health": True, - "details": {} - } + return {"health": True, "details": {}} def __check_be_service(service_name): def fn(*_): fail_response = { "health": False, - "details": { - "errors": ["server health-check failed"] - } + "details": {"errors": ["server health-check failed"]}, } try: results = requests.get(HEALTH_ENDPOINTS.get(service_name), timeout=2) if results.status_code != 200: - logger.error(f"!! issue with the {service_name}-health code:{results.status_code}") + logger.error( + f"!! issue with the {service_name}-health code:{results.status_code}" + ) logger.error(results.text) # fail_response["details"]["errors"].append(results.text) return fail_response @@ -103,10 +97,7 @@ def __check_be_service(service_name): logger.error("couldn't get response") # fail_response["details"]["errors"].append(str(e)) return fail_response - return { - "health": True, - "details": {} - } + return {"health": True, "details": {}} return fn @@ -114,7 +105,7 @@ def __check_be_service(service_name): def __check_redis(*_): fail_response = { "health": False, - "details": {"errors": ["server health-check failed"]} + "details": {"errors": ["server health-check failed"]}, } if config("REDIS_STRING", default=None) is None: # fail_response["details"]["errors"].append("REDIS_STRING not defined in env-vars") @@ -133,16 +124,14 @@ def __check_redis(*_): "health": True, "details": { # "version": r.execute_command('INFO')['redis_version'] - } + }, } def __check_SSL(*_): fail_response = { "health": False, - "details": { - "errors": ["SSL Certificate health-check failed"] - } + "details": {"errors": ["SSL Certificate health-check failed"]}, } try: requests.get(config("SITE_URL"), verify=True, allow_redirects=True) @@ -150,36 +139,28 @@ def __check_SSL(*_): logger.error("!! health failed: SSL Certificate") logger.exception(e) return fail_response - return { - "health": True, - "details": {} - } + return {"health": True, "details": {}} def __get_sessions_stats(*_): with pg_client.PostgresClient() as cur: constraints = ["projects.deleted_at IS NULL"] - query = cur.mogrify(f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c, + query = cur.mogrify( + f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c, COALESCE(SUM(events_count),0) AS e_c FROM public.projects_stats INNER JOIN public.projects USING(project_id) - WHERE {" AND ".join(constraints)};""") + WHERE {" AND ".join(constraints)};""" + ) cur.execute(query) row = cur.fetchone() - return { - "numberOfSessionsCaptured": row["s_c"], - "numberOfEventCaptured": row["e_c"] - } + return {"numberOfSessionsCaptured": row["s_c"], "numberOfEventCaptured": row["e_c"]} def get_health(tenant_id=None): health_map = { - "databases": { - "postgres": __check_database_pg - }, - "ingestionPipeline": { - "redis": __check_redis - }, + "databases": {"postgres": __check_database_pg}, + "ingestionPipeline": {"redis": __check_redis}, "backendServices": { "alerts": __check_be_service("alerts"), "assets": __check_be_service("assets"), @@ -192,13 +173,12 @@ def get_health(tenant_id=None): "http": __check_be_service("http"), "ingress-nginx": __always_healthy, "integrations": __check_be_service("integrations"), - "peers": __check_be_service("peers"), "sink": __check_be_service("sink"), "sourcemapreader": __check_be_service("sourcemapreader"), - "storage": __check_be_service("storage") + "storage": __check_be_service("storage"), }, "details": __get_sessions_stats, - "ssl": __check_SSL + "ssl": __check_SSL, } return __process_health(health_map=health_map) @@ -210,10 +190,16 @@ def __process_health(health_map): response.pop(parent_key) elif isinstance(health_map[parent_key], dict): for element_key in health_map[parent_key]: - if config(f"SKIP_H_{parent_key.upper()}_{element_key.upper()}", cast=bool, default=False): + if config( + f"SKIP_H_{parent_key.upper()}_{element_key.upper()}", + cast=bool, + default=False, + ): response[parent_key].pop(element_key) else: - response[parent_key][element_key] = health_map[parent_key][element_key]() + response[parent_key][element_key] = health_map[parent_key][ + element_key + ]() else: response[parent_key] = health_map[parent_key]() return response @@ -221,7 +207,8 @@ def __process_health(health_map): def cron(): with pg_client.PostgresClient() as cur: - query = cur.mogrify("""SELECT projects.project_id, + query = cur.mogrify( + """SELECT projects.project_id, projects.created_at, projects.sessions_last_check_at, projects.first_recorded_session_at, @@ -229,7 +216,8 @@ def cron(): FROM public.projects LEFT JOIN public.projects_stats USING (project_id) WHERE projects.deleted_at IS NULL - ORDER BY project_id;""") + ORDER BY project_id;""" + ) cur.execute(query) rows = cur.fetchall() for r in rows: @@ -250,20 +238,24 @@ def cron(): count_start_from = r["last_update_at"] count_start_from = TimeUTC.datetime_to_timestamp(count_start_from) - params = {"project_id": r["project_id"], - "start_ts": count_start_from, - "end_ts": TimeUTC.now(), - "sessions_count": 0, - "events_count": 0} + params = { + "project_id": r["project_id"], + "start_ts": count_start_from, + "end_ts": TimeUTC.now(), + "sessions_count": 0, + "events_count": 0, + } - query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, + query = cur.mogrify( + """SELECT COUNT(1) AS sessions_count, COALESCE(SUM(events_count),0) AS events_count FROM public.sessions WHERE project_id=%(project_id)s AND start_ts>=%(start_ts)s AND start_ts<=%(end_ts)s AND duration IS NOT NULL;""", - params) + params, + ) cur.execute(query) row = cur.fetchone() if row is not None: @@ -271,56 +263,68 @@ def cron(): params["events_count"] = row["events_count"] if insert: - query = cur.mogrify("""INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at) + query = cur.mogrify( + """INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at) VALUES (%(project_id)s, %(sessions_count)s, %(events_count)s, (now() AT TIME ZONE 'utc'::text));""", - params) + params, + ) else: - query = cur.mogrify("""UPDATE public.projects_stats + query = cur.mogrify( + """UPDATE public.projects_stats SET sessions_count=sessions_count+%(sessions_count)s, events_count=events_count+%(events_count)s, last_update_at=(now() AT TIME ZONE 'utc'::text) WHERE project_id=%(project_id)s;""", - params) + params, + ) cur.execute(query) # this cron is used to correct the sessions&events count every week def weekly_cron(): with pg_client.PostgresClient(long_query=True) as cur: - query = cur.mogrify("""SELECT project_id, + query = cur.mogrify( + """SELECT project_id, projects_stats.last_update_at FROM public.projects LEFT JOIN public.projects_stats USING (project_id) WHERE projects.deleted_at IS NULL - ORDER BY project_id;""") + ORDER BY project_id;""" + ) cur.execute(query) rows = cur.fetchall() for r in rows: if r["last_update_at"] is None: continue - params = {"project_id": r["project_id"], - "end_ts": TimeUTC.now(), - "sessions_count": 0, - "events_count": 0} + params = { + "project_id": r["project_id"], + "end_ts": TimeUTC.now(), + "sessions_count": 0, + "events_count": 0, + } - query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, + query = cur.mogrify( + """SELECT COUNT(1) AS sessions_count, COALESCE(SUM(events_count),0) AS events_count FROM public.sessions WHERE project_id=%(project_id)s AND start_ts<=%(end_ts)s AND duration IS NOT NULL;""", - params) + params, + ) cur.execute(query) row = cur.fetchone() if row is not None: params["sessions_count"] = row["sessions_count"] params["events_count"] = row["events_count"] - query = cur.mogrify("""UPDATE public.projects_stats + query = cur.mogrify( + """UPDATE public.projects_stats SET sessions_count=%(sessions_count)s, events_count=%(events_count)s, last_update_at=(now() AT TIME ZONE 'utc'::text) WHERE project_id=%(project_id)s;""", - params) + params, + ) cur.execute(query) diff --git a/ee/api/chalicelib/core/health.py b/ee/api/chalicelib/core/health.py index f8837cf9f..9a856f9a7 100644 --- a/ee/api/chalicelib/core/health.py +++ b/ee/api/chalicelib/core/health.py @@ -2,6 +2,7 @@ import logging import redis import requests + # from confluent_kafka.admin import AdminClient from decouple import config @@ -28,7 +29,6 @@ HEALTH_ENDPOINTS = { "http": app_connection_string("http-openreplay", 8888, "metrics"), "ingress-nginx": app_connection_string("ingress-nginx-openreplay", 80, "healthz"), "integrations": app_connection_string("integrations-openreplay", 8888, "metrics"), - "peers": app_connection_string("peers-openreplay", 8888, "health"), "sink": app_connection_string("sink-openreplay", 8888, "metrics"), "sourcemapreader": app_connection_string( "sourcemapreader-openreplay", 8888, "health" @@ -40,9 +40,7 @@ HEALTH_ENDPOINTS = { def __check_database_pg(*_): fail_response = { "health": False, - "details": { - "errors": ["Postgres health-check failed"] - } + "details": {"errors": ["Postgres health-check failed"]}, } with pg_client.PostgresClient() as cur: try: @@ -64,29 +62,26 @@ def __check_database_pg(*_): "details": { # "version": server_version["server_version"], # "schema": schema_version["version"] - } + }, } def __always_healthy(*_): - return { - "health": True, - "details": {} - } + return {"health": True, "details": {}} def __check_be_service(service_name): def fn(*_): fail_response = { "health": False, - "details": { - "errors": ["server health-check failed"] - } + "details": {"errors": ["server health-check failed"]}, } try: results = requests.get(HEALTH_ENDPOINTS.get(service_name), timeout=2) if results.status_code != 200: - logger.error(f"!! issue with the {service_name}-health code:{results.status_code}") + logger.error( + f"!! issue with the {service_name}-health code:{results.status_code}" + ) logger.error(results.text) # fail_response["details"]["errors"].append(results.text) return fail_response @@ -104,10 +99,7 @@ def __check_be_service(service_name): logger.error("couldn't get response") # fail_response["details"]["errors"].append(str(e)) return fail_response - return { - "health": True, - "details": {} - } + return {"health": True, "details": {}} return fn @@ -115,7 +107,7 @@ def __check_be_service(service_name): def __check_redis(*_): fail_response = { "health": False, - "details": {"errors": ["server health-check failed"]} + "details": {"errors": ["server health-check failed"]}, } if config("REDIS_STRING", default=None) is None: # fail_response["details"]["errors"].append("REDIS_STRING not defined in env-vars") @@ -134,16 +126,14 @@ def __check_redis(*_): "health": True, "details": { # "version": r.execute_command('INFO')['redis_version'] - } + }, } def __check_SSL(*_): fail_response = { "health": False, - "details": { - "errors": ["SSL Certificate health-check failed"] - } + "details": {"errors": ["SSL Certificate health-check failed"]}, } try: requests.get(config("SITE_URL"), verify=True, allow_redirects=True) @@ -151,10 +141,7 @@ def __check_SSL(*_): logger.error("!! health failed: SSL Certificate") logger.exception(e) return fail_response - return { - "health": True, - "details": {} - } + return {"health": True, "details": {}} def __get_sessions_stats(tenant_id, *_): @@ -162,31 +149,34 @@ def __get_sessions_stats(tenant_id, *_): constraints = ["projects.deleted_at IS NULL"] if tenant_id: constraints.append("tenant_id=%(tenant_id)s") - query = cur.mogrify(f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c, + query = cur.mogrify( + f"""SELECT COALESCE(SUM(sessions_count),0) AS s_c, COALESCE(SUM(events_count),0) AS e_c FROM public.projects_stats INNER JOIN public.projects USING(project_id) WHERE {" AND ".join(constraints)};""", - {"tenant_id": tenant_id}) + {"tenant_id": tenant_id}, + ) cur.execute(query) row = cur.fetchone() - return { - "numberOfSessionsCaptured": row["s_c"], - "numberOfEventCaptured": row["e_c"] - } + return {"numberOfSessionsCaptured": row["s_c"], "numberOfEventCaptured": row["e_c"]} def get_health(tenant_id=None): health_map = { "databases": { "postgres": __check_database_pg, - "clickhouse": __check_database_ch + "clickhouse": __check_database_ch, }, "ingestionPipeline": { - **({"redis": __check_redis} if config("REDIS_STRING", default=None) - and len(config("REDIS_STRING")) > 0 else {}), + **( + {"redis": __check_redis} + if config("REDIS_STRING", default=None) + and len(config("REDIS_STRING")) > 0 + else {} + ), # "kafka": __check_kafka - "kafka": __always_healthy + "kafka": __always_healthy, }, "backendServices": { "alerts": __check_be_service("alerts"), @@ -200,14 +190,13 @@ def get_health(tenant_id=None): "http": __check_be_service("http"), "ingress-nginx": __always_healthy, "integrations": __check_be_service("integrations"), - "peers": __check_be_service("peers"), # "quickwit": __check_be_service("quickwit"), "sink": __check_be_service("sink"), "sourcemapreader": __check_be_service("sourcemapreader"), - "storage": __check_be_service("storage") + "storage": __check_be_service("storage"), }, "details": __get_sessions_stats, - "ssl": __check_SSL + "ssl": __check_SSL, } return __process_health(tenant_id=tenant_id, health_map=health_map) @@ -219,10 +208,16 @@ def __process_health(tenant_id, health_map): response.pop(parent_key) elif isinstance(health_map[parent_key], dict): for element_key in health_map[parent_key]: - if config(f"SKIP_H_{parent_key.upper()}_{element_key.upper()}", cast=bool, default=False): + if config( + f"SKIP_H_{parent_key.upper()}_{element_key.upper()}", + cast=bool, + default=False, + ): response[parent_key].pop(element_key) else: - response[parent_key][element_key] = health_map[parent_key][element_key](tenant_id) + response[parent_key][element_key] = health_map[parent_key][ + element_key + ](tenant_id) else: response[parent_key] = health_map[parent_key](tenant_id) return response @@ -230,7 +225,8 @@ def __process_health(tenant_id, health_map): def cron(): with pg_client.PostgresClient() as cur: - query = cur.mogrify("""SELECT projects.project_id, + query = cur.mogrify( + """SELECT projects.project_id, projects.created_at, projects.sessions_last_check_at, projects.first_recorded_session_at, @@ -238,7 +234,8 @@ def cron(): FROM public.projects LEFT JOIN public.projects_stats USING (project_id) WHERE projects.deleted_at IS NULL - ORDER BY project_id;""") + ORDER BY project_id;""" + ) cur.execute(query) rows = cur.fetchall() for r in rows: @@ -259,20 +256,24 @@ def cron(): count_start_from = r["last_update_at"] count_start_from = TimeUTC.datetime_to_timestamp(count_start_from) - params = {"project_id": r["project_id"], - "start_ts": count_start_from, - "end_ts": TimeUTC.now(), - "sessions_count": 0, - "events_count": 0} + params = { + "project_id": r["project_id"], + "start_ts": count_start_from, + "end_ts": TimeUTC.now(), + "sessions_count": 0, + "events_count": 0, + } - query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, + query = cur.mogrify( + """SELECT COUNT(1) AS sessions_count, COALESCE(SUM(events_count),0) AS events_count FROM public.sessions WHERE project_id=%(project_id)s AND start_ts>=%(start_ts)s AND start_ts<=%(end_ts)s AND duration IS NOT NULL;""", - params) + params, + ) cur.execute(query) row = cur.fetchone() if row is not None: @@ -280,65 +281,77 @@ def cron(): params["events_count"] = row["events_count"] if insert: - query = cur.mogrify("""INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at) + query = cur.mogrify( + """INSERT INTO public.projects_stats(project_id, sessions_count, events_count, last_update_at) VALUES (%(project_id)s, %(sessions_count)s, %(events_count)s, (now() AT TIME ZONE 'utc'::text));""", - params) + params, + ) else: - query = cur.mogrify("""UPDATE public.projects_stats + query = cur.mogrify( + """UPDATE public.projects_stats SET sessions_count=sessions_count+%(sessions_count)s, events_count=events_count+%(events_count)s, last_update_at=(now() AT TIME ZONE 'utc'::text) WHERE project_id=%(project_id)s;""", - params) + params, + ) cur.execute(query) # this cron is used to correct the sessions&events count every week def weekly_cron(): with pg_client.PostgresClient(long_query=True) as cur: - query = cur.mogrify("""SELECT project_id, + query = cur.mogrify( + """SELECT project_id, projects_stats.last_update_at FROM public.projects LEFT JOIN public.projects_stats USING (project_id) WHERE projects.deleted_at IS NULL - ORDER BY project_id;""") + ORDER BY project_id;""" + ) cur.execute(query) rows = cur.fetchall() for r in rows: if r["last_update_at"] is None: continue - params = {"project_id": r["project_id"], - "end_ts": TimeUTC.now(), - "sessions_count": 0, - "events_count": 0} + params = { + "project_id": r["project_id"], + "end_ts": TimeUTC.now(), + "sessions_count": 0, + "events_count": 0, + } - query = cur.mogrify("""SELECT COUNT(1) AS sessions_count, + query = cur.mogrify( + """SELECT COUNT(1) AS sessions_count, COALESCE(SUM(events_count),0) AS events_count FROM public.sessions WHERE project_id=%(project_id)s AND start_ts<=%(end_ts)s AND duration IS NOT NULL;""", - params) + params, + ) cur.execute(query) row = cur.fetchone() if row is not None: params["sessions_count"] = row["sessions_count"] params["events_count"] = row["events_count"] - query = cur.mogrify("""UPDATE public.projects_stats + query = cur.mogrify( + """UPDATE public.projects_stats SET sessions_count=%(sessions_count)s, events_count=%(events_count)s, last_update_at=(now() AT TIME ZONE 'utc'::text) WHERE project_id=%(project_id)s;""", - params) + params, + ) cur.execute(query) def __check_database_ch(*_): fail_response = { "health": False, - "details": {"errors": ["server health-check failed"]} + "details": {"errors": ["server health-check failed"]}, } with ch_client.ClickHouseClient() as ch: try: @@ -348,9 +361,11 @@ def __check_database_ch(*_): logger.exception(e) return fail_response - schema_version = ch.execute("""SELECT 1 + schema_version = ch.execute( + """SELECT 1 FROM system.functions - WHERE name = 'openreplay_version';""") + WHERE name = 'openreplay_version';""" + ) if len(schema_version) > 0: schema_version = ch.execute("SELECT openreplay_version() AS version;") schema_version = schema_version[0]["version"] @@ -365,9 +380,10 @@ def __check_database_ch(*_): # "version": server_version[0]["server_version"], # "schema": schema_version, # **errors - } + }, } + # def __check_kafka(*_): # fail_response = { # "health": False,