From f2172607ed114d0dcf505c784bac11a6db7483ad Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Tue, 11 Apr 2023 13:50:59 +0100 Subject: [PATCH] feat(chalice): Kafka health check --- api/chalicelib/core/assist.py | 10 +++--- ee/api/chalicelib/core/health.py | 56 ++++++++++++++++---------------- ee/api/requirements.txt | 2 +- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/api/chalicelib/core/assist.py b/api/chalicelib/core/assist.py index fcc845b7f..eea02759f 100644 --- a/api/chalicelib/core/assist.py +++ b/api/chalicelib/core/assist.py @@ -56,7 +56,7 @@ def __get_live_sessions_ws(project_id, data): results = requests.post(ASSIST_URL + config("assist") + f"/{project_key}", json=data, timeout=config("assistTimeout", cast=int, default=5)) if results.status_code != 200: - print(f"!! issue with the peer-server code:{results.status_code}") + print(f"!! issue with the peer-server code:{results.status_code} for __get_live_sessions_ws") print(results.text) return {"total": 0, "sessions": []} live_peers = results.json().get("data", []) @@ -106,7 +106,7 @@ def get_live_session_by_id(project_id, session_id): results = requests.get(ASSIST_URL + config("assist") + f"/{project_key}/{session_id}", timeout=config("assistTimeout", cast=int, default=5)) if results.status_code != 200: - print(f"!! issue with the peer-server code:{results.status_code}") + print(f"!! issue with the peer-server code:{results.status_code} for get_live_session_by_id") print(results.text) return None results = results.json().get("data") @@ -136,7 +136,7 @@ def is_live(project_id, session_id, project_key=None): results = requests.get(ASSIST_URL + config("assistList") + f"/{project_key}/{session_id}", timeout=config("assistTimeout", cast=int, default=5)) if results.status_code != 200: - print(f"!! issue with the peer-server code:{results.status_code}") + print(f"!! issue with the peer-server code:{results.status_code} for is_live") print(results.text) return False results = results.json().get("data") @@ -165,7 +165,7 @@ def autocomplete(project_id, q: str, key: str = None): ASSIST_URL + config("assistList") + f"/{project_key}/autocomplete", params=params, timeout=config("assistTimeout", cast=int, default=5)) if results.status_code != 200: - print(f"!! issue with the peer-server code:{results.status_code}") + print(f"!! issue with the peer-server code:{results.status_code} for autocomplete") print(results.text) return {"errors": [f"Something went wrong wile calling assist:{results.text}"]} results = results.json().get("data", []) @@ -248,7 +248,7 @@ def session_exists(project_id, session_id): results = requests.get(ASSIST_URL + config("assist") + f"/{project_key}/{session_id}", timeout=config("assistTimeout", cast=int, default=5)) if results.status_code != 200: - print(f"!! issue with the peer-server code:{results.status_code}") + print(f"!! issue with the peer-server code:{results.status_code} for session_exists") print(results.text) return None results = results.json().get("data") diff --git a/ee/api/chalicelib/core/health.py b/ee/api/chalicelib/core/health.py index 70ecdd674..d222e0c66 100644 --- a/ee/api/chalicelib/core/health.py +++ b/ee/api/chalicelib/core/health.py @@ -2,7 +2,7 @@ from urllib.parse import urlparse import redis import requests -# from confluent_kafka.admin import AdminClient +from confluent_kafka.admin import AdminClient from decouple import config from chalicelib.utils import pg_client, ch_client @@ -149,8 +149,7 @@ def get_health(): "ingestionPipeline": { **({"redis": __check_redis} if config("REDIS_STRING", default=None) and len(config("REDIS_STRING")) > 0 else {}), - # "kafka": __check_kafka - "kafka": __always_healthy + "kafka": __check_kafka }, "backendServices": { "alerts": __check_be_service("alerts"), @@ -210,28 +209,29 @@ def __check_database_ch(): } } -# def __check_kafka(): -# fail_response = { -# "health": False, -# "details": {"errors": ["server health-check failed"]} -# } -# if config("KAFKA_SERVERS", default=None) is None: -# fail_response["details"]["errors"].append("KAFKA_SERVERS not defined in env-vars") -# return fail_response -# -# try: -# a = AdminClient({'bootstrap.servers': config("KAFKA_SERVERS"), "socket.connection.setup.timeout.ms": 3000}) -# topics = a.list_topics().topics -# if not topics: -# raise Exception('topics not found') -# -# except Exception as e: -# print("!! Issue getting kafka-health response") -# print(str(e)) -# fail_response["details"]["errors"].append(str(e)) -# return fail_response -# -# return { -# "health": True, -# "details": {} -# } + +def __check_kafka(): + fail_response = { + "health": False, + "details": {"errors": ["server health-check failed"]} + } + if config("KAFKA_SERVERS", default=None) is None: + fail_response["details"]["errors"].append("KAFKA_SERVERS not defined in env-vars") + return fail_response + + try: + a = AdminClient({'bootstrap.servers': config("KAFKA_SERVERS"), "socket.connection.setup.timeout.ms": 3000}) + topics = a.list_topics().topics + if not topics: + raise Exception('topics not found') + + except Exception as e: + print("!! Issue getting kafka-health response") + print(str(e)) + fail_response["details"]["errors"].append(str(e)) + return fail_response + + return { + "health": True, + "details": {} + } diff --git a/ee/api/requirements.txt b/ee/api/requirements.txt index d6a3a0dc1..8d605bdf8 100644 --- a/ee/api/requirements.txt +++ b/ee/api/requirements.txt @@ -19,4 +19,4 @@ python3-saml==1.15.0 python-multipart==0.0.6 redis==4.5.3 -#confluent-kafka==2.0.2 \ No newline at end of file +confluent-kafka==2.1.0 \ No newline at end of file