From d83f731eb20206a709bd1c98d4fa7a8006d81e68 Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Tue, 13 Dec 2022 18:00:24 +0100 Subject: [PATCH] Changed function placements and mogrify for pg INSERT --- ee/api/app.py | 3 +- ee/api/chalicelib/core/signals.py | 19 +---------- ee/api/chalicelib/utils/events_queue.py | 33 +++++++++++++------ ee/api/routers/core_dynamic.py | 11 +------ ee/api/routers/crons/core_dynamic_crons.py | 5 --- ee/api/routers/crons/ee_crons.py | 10 ++++++ ee/api/routers/ee.py | 12 ++++++- .../db/init_dbs/postgresql/1.10.0/1.10.0.sql | 14 ++++---- 8 files changed, 55 insertions(+), 52 deletions(-) create mode 100644 ee/api/routers/crons/ee_crons.py diff --git a/ee/api/app.py b/ee/api/app.py index f3edcefe3..d5a787ffc 100644 --- a/ee/api/app.py +++ b/ee/api/app.py @@ -16,6 +16,7 @@ from chalicelib.utils import events_queue from routers import core, core_dynamic, ee, saml from routers.crons import core_crons from routers.crons import core_dynamic_crons +from routers.crons import ee_crons from routers.subs import dashboard, insights, metrics, v1_api_ee from routers.subs import v1_api @@ -85,7 +86,7 @@ async def startup(): await events_queue.init() app.schedule.start() - for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs: + for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs + ee_crons.ee_cron_jobs: app.schedule.add_job(id=job["func"].__name__, **job) ap_logger.info(">Scheduled jobs:") diff --git a/ee/api/chalicelib/core/signals.py b/ee/api/chalicelib/core/signals.py index cb4d714c4..72822d0af 100644 --- a/ee/api/chalicelib/core/signals.py +++ b/ee/api/chalicelib/core/signals.py @@ -1,26 +1,9 @@ -import json - import schemas_ee import logging -from chalicelib.utils import helper -from chalicelib.utils import pg_client from chalicelib.utils import events_queue -def handle_frontend_signals(project_id: int, user_id: str, data: schemas_ee.SignalsSchema): - insights_query = """INSERT INTO public.frontend_signals (project_id, user_id, timestamp, action, source, category, data) VALUES (%(project_id)s, %(user_id)s, %(timestamp)s, %(action)s, %(source)s, %(category)s, %(data)s)""" - try: - with pg_client.PostgresClient() as conn: - query = conn.mogrify(insights_query, {'project_id': project_id, 'user_id': user_id, 'timestamp': data.timestamp, 'action': data.action, 'source': data.source, - 'category': data.category, 'data': json.dumps(data.data)}) - conn.execute(query) - # res = helper.dict_to_camel_case(conn.fetchone()) - return {'data': 'insertion succeded'} - except Exception as e: - logging.info(f'Error while inserting: {e}') - return {'errors': [e]} - -def handle_frontend_signals_queued(project_id: int, user_id: str, data: schemas_ee.SignalsSchema): +def handle_frontend_signals_queued(project_id: int, user_id: int, data: schemas_ee.SignalsSchema): try: events_queue.global_queue.put((project_id, user_id, data)) return {'data': 'insertion succeded'} diff --git a/ee/api/chalicelib/utils/events_queue.py b/ee/api/chalicelib/utils/events_queue.py index 8c44a1690..9858e055b 100644 --- a/ee/api/chalicelib/utils/events_queue.py +++ b/ee/api/chalicelib/utils/events_queue.py @@ -15,21 +15,29 @@ class EventQueue(): def flush(self, conn): events = list() + params = dict() + # while not self.events.empty(): + # project_id, user_id, element = self.events.get() + # events.append("({project_id}, {user_id}, {timestamp}, '{action}', '{source}', '{category}', '{data}')".format( + # project_id=project_id, user_id=user_id, timestamp=element.timestamp, action=element.action, source=element.source, category=element.category, data=json.dumps(element.data))) + i = 0 while not self.events.empty(): project_id, user_id, element = self.events.get() - events.append("({project_id}, '{user_id}', {timestamp}, '{action}', '{source}', '{category}', '{data}')".format( - project_id=project_id, user_id=user_id, timestamp=element.timestamp, action=element.action, source=element.source, category=element.category, data=json.dumps(element.data))) - if len(events)==0: + params[f'project_id_{i}'] = project_id + params[f'user_id_{i}'] = user_id + for _key, _val in element.dict().items(): + params[f'{_key}_{i}'] = _val + events.append(f"(%(project_id_{i})s, %(user_id_{i})s, %(timestamp_{i})s, %(action_{i})s, %(source_{i})s, %(category_{i})s, %(data_{i})s::jsonb)") + i += 1 + if i == 0: return 0 if self.test: print(events) return 1 - _base_query = 'INSERT INTO {database}.{table} (project_id, user_id, timestamp, action, source, category, data) VALUES {values_list}' - conn.execute(_base_query.format(database='public', table='frontend_signals', values_list=', '.join(events))) - # logging.info(_query) - # res = 'done' - # res = conn.fetchone() - # res = helper.dict_to_camel_case(conn.fetchone()) + conn.execute( + conn.mogrify(f"""INSERT INTO public.frontend_signals (project_id, user_id, timestamp, action, source, category, data) + VALUES {' , '.join(events)}""", params) + ) return 1 def force_flush(self): @@ -61,4 +69,9 @@ async def terminate(): global_queue.force_flush() logging.info('> queue fulshed') - +# def __process_schema(trace): +# data = trace.dict() +# data["parameters"] = json.dumps(trace.parameters) if trace.parameters is not None and len( +# trace.parameters.keys()) > 0 else None +# data["payload"] = json.dumps(trace.payload) if trace.payload is not None and len(trace.payload.keys()) > 0 else None +# return data diff --git a/ee/api/routers/core_dynamic.py b/ee/api/routers/core_dynamic.py index addbe749b..97c674c2f 100644 --- a/ee/api/routers/core_dynamic.py +++ b/ee/api/routers/core_dynamic.py @@ -8,7 +8,7 @@ import schemas import schemas_ee from chalicelib.core import sessions, assist, heatmaps, sessions_favorite, sessions_assignments, errors, errors_viewed, \ errors_favorite, sessions_notes -from chalicelib.core import sessions_viewed, signals +from chalicelib.core import sessions_viewed from chalicelib.core import tenants, users, projects, license from chalicelib.core import webhook from chalicelib.core.collaboration_slack import Slack @@ -436,12 +436,3 @@ def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(...), return data return {'data': data} - -@app.post('/{projectId}/signals', tags=['signals']) -def send_interactions(projectId:int, data: schemas_ee.SignalsSchema = Body(...), - context: schemas.CurrentContext = Depends(OR_context)): - data = signals.handle_frontend_signals_queued(project_id=projectId, user_id=context.user_id, data=data) - - if "errors" in data: - return data - return {'data': data} diff --git a/ee/api/routers/crons/core_dynamic_crons.py b/ee/api/routers/crons/core_dynamic_crons.py index 88d591269..31a144749 100644 --- a/ee/api/routers/crons/core_dynamic_crons.py +++ b/ee/api/routers/crons/core_dynamic_crons.py @@ -1,7 +1,6 @@ from chalicelib.core import telemetry, unlock from chalicelib.core import jobs from chalicelib.core import weekly_report as weekly_report_script -from chalicelib.utils import events_queue from decouple import config @@ -23,12 +22,8 @@ def unlock_cron() -> None: print(f"valid: {unlock.is_valid()}") -def pg_events_queue() -> None: - events_queue.global_queue.force_flush() - cron_jobs = [ {"func": unlock_cron, "trigger": "cron", "hour": "*"}, - {"func": pg_events_queue, "trigger": "interval", "seconds": 60*5, "misfire_grace_time": 20} ] SINGLE_CRONS = [{"func": telemetry_cron, "trigger": "cron", "day_of_week": "*"}, diff --git a/ee/api/routers/crons/ee_crons.py b/ee/api/routers/crons/ee_crons.py new file mode 100644 index 000000000..f59fc9edb --- /dev/null +++ b/ee/api/routers/crons/ee_crons.py @@ -0,0 +1,10 @@ +from chalicelib.utils import events_queue + + +def pg_events_queue() -> None: + events_queue.global_queue.force_flush() + + +ee_cron_jobs = [ + {"func": pg_events_queue, "trigger": "interval", "seconds": 60*5, "misfire_grace_time": 20}, +] \ No newline at end of file diff --git a/ee/api/routers/ee.py b/ee/api/routers/ee.py index dc08ac569..2660ab25d 100644 --- a/ee/api/routers/ee.py +++ b/ee/api/routers/ee.py @@ -1,5 +1,5 @@ from chalicelib.core import roles, traces -from chalicelib.core import unlock +from chalicelib.core import unlock, signals from chalicelib.utils import assist_helper unlock.check() @@ -71,3 +71,13 @@ def get_trails(data: schemas_ee.TrailSearchPayloadSchema = Body(...), @app.post('/trails/actions', tags=["traces", "trails"]) def get_available_trail_actions(context: schemas.CurrentContext = Depends(OR_context)): return {'data': traces.get_available_actions(tenant_id=context.tenant_id)} + + +@app.post('/{projectId}/signals', tags=['signals']) +def send_interactions(projectId: int, data: schemas_ee.SignalsSchema = Body(...), + context: schemas.CurrentContext = Depends(OR_context)): + data = signals.handle_frontend_signals_queued(project_id=projectId, user_id=context.user_id, data=data) + + if "errors" in data: + return data + return {'data': data} \ No newline at end of file diff --git a/ee/scripts/schema/db/init_dbs/postgresql/1.10.0/1.10.0.sql b/ee/scripts/schema/db/init_dbs/postgresql/1.10.0/1.10.0.sql index fa35caa33..a34074856 100644 --- a/ee/scripts/schema/db/init_dbs/postgresql/1.10.0/1.10.0.sql +++ b/ee/scripts/schema/db/init_dbs/postgresql/1.10.0/1.10.0.sql @@ -7,13 +7,13 @@ $$ LANGUAGE sql IMMUTABLE; CREATE TABLE IF NOT EXISTS frontend_signals ( - project_id bigint NOT NULL, - user_id text NOT NULL, - timestamp bigint NOT NULL, - action text NOT NULL, - source text NOT NULL, - category text NOT NULL, - data json + project_id bigint NOT NULL, + user_id integer NOT NULL references users (user_id) ON DELETE CASCADE, + timestamp bigint NOT NULL, + action text NOT NULL, + source text NOT NULL, + category text NOT NULL, + data jsonb ); CREATE INDEX IF NOT EXISTS frontend_signals_user_id_idx ON frontend_signals (user_id);