From 3479060d02cd237723948cc634f680ee2f4908ce Mon Sep 17 00:00:00 2001 From: MauricioGarciaS <47052044+MauricioGarciaS@users.noreply.github.com> Date: Fri, 30 Jun 2023 12:41:36 +0200 Subject: [PATCH] style(connectors): different prints for each script (fill, worker) (#1388) --- ee/connectors/consumer_pool.py | 4 ++-- ee/connectors/fill_from_db.py | 5 ++--- ee/connectors/utils/worker.py | 30 +++++++++++++++--------------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/ee/connectors/consumer_pool.py b/ee/connectors/consumer_pool.py index 137910928..e145b96c0 100644 --- a/ee/connectors/consumer_pool.py +++ b/ee/connectors/consumer_pool.py @@ -15,7 +15,7 @@ def main(): try: w_pool.load_checkpoint(database_api) except Exception as e: - print('[WARN] Checkpoint not found') + print('[WORKER WARN] Checkpoint not found') print(repr(e)) # ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool) # consumer_settings = { @@ -29,7 +29,7 @@ def main(): # consumer = Consumer(consumer_settings) # consumer.subscribe(config("TOPICS", default="saas-raw").split(',')) - print("[INFO] Kafka consumer subscribed") + print("[WORKER INFO] Kafka consumer subscribed") # w_pool.run_workers(kafka_consumer=consumer, database_api=database_api) w_pool.run_workers(database_api=database_api) diff --git a/ee/connectors/fill_from_db.py b/ee/connectors/fill_from_db.py index c5f60a9a7..2070baccd 100644 --- a/ee/connectors/fill_from_db.py +++ b/ee/connectors/fill_from_db.py @@ -44,7 +44,7 @@ async def main(): try: res = pdredshift.redshift_to_pandas(query.format(table=table, limit=limit)) except Exception as e: - print(repr(e)) + print('[FILL Exception]',repr(e)) res = list() if res is None: return @@ -75,11 +75,10 @@ async def main(): base_query += f"\nEND WHERE sessionid IN ({','.join(all_ids)})" if len(all_ids) == 0: return - print(base_query[:200]) try: pdredshift.exec_commit(base_query) except Exception as e: - print(repr(e)) + print('[FILL Exception]',repr(e)) print(f'[FILL-INFO] {time()-t} - for {len(sessionids)} elements') diff --git a/ee/connectors/utils/worker.py b/ee/connectors/utils/worker.py index f00b100eb..af0cfe3d3 100644 --- a/ee/connectors/utils/worker.py +++ b/ee/connectors/utils/worker.py @@ -162,7 +162,7 @@ def read_from_kafka(pipe: Connection, params: dict): try: msg = kafka_consumer.poll(5.0) except Exception as e: - print(e) + print('[WORKER Exception]', e) if msg is None: continue n_messages += 1 @@ -190,21 +190,21 @@ def read_from_kafka(pipe: Connection, params: dict): to_decode.append(msg) if n_messages != 0: print( - f'[INFO-bg] Found {broken_batchs} broken batch over {n_messages} read messages ({100 * broken_batchs / n_messages:.2f}%)') + f'[WORKER INFO-bg] Found {broken_batchs} broken batch over {n_messages} read messages ({100 * broken_batchs / n_messages:.2f}%)') else: - print('[WARN-bg] No messages read') + print('[WORKER WARN-bg] No messages read') non_valid_updated = project_filter.non_valid_sessions_cache pipe.send((non_valid_updated, sessionIds, to_decode)) continue_signal = pipe.recv() if continue_signal == 'CLOSE': - print('[SHUTDOWN-reader] Reader shutting down') + print('[WORKER SHUTDOWN-reader] Reader shutting down') break kafka_consumer.commit() - print('[INFO] Closing consumer') + print('[WORKER INFO] Closing consumer') close_consumer(kafka_consumer) - print('[INFO] Closing pg connection') + print('[WORKER INFO] Closing pg connection') asyncio.run(pg_client.terminate()) - print('[INFO] Successfully closed reader task') + print('[WORKER INFO] Successfully closed reader task') # except Exception as e: # print('[WARN]', repr(e)) @@ -226,7 +226,7 @@ def project_from_session(sessionId: int): ) res = conn.fetchone() if res is None: - print(f'[WARN] sessionid {sessionId} not found in sessions table') + print(f'[WORKER WARN] sessionid {sessionId} not found in sessions table') return None return res['project_id'] @@ -245,10 +245,10 @@ def project_from_sessions(sessionIds: list[int]): ) res = conn.fetchall() except Exception as e: - print('[project_from_sessions]', repr(e)) + print('[WORKER project_from_sessions]', repr(e)) raise e if res is None: - print(f'[WARN] sessionids {",".join([str(sessId) for sessId in sessIds])} not found in sessions table') + print(f'[WORKER WARN] sessionids {",".join([str(sessId) for sessId in sessIds])} not found in sessions table') else: response += res sessionIds = sessionIds[:-1000] @@ -341,7 +341,7 @@ class WorkerPool: elif old_status == 'OPEN': sessions_insert_batch.append(deepcopy(self.sessions[session_id])) else: - print('[WARN] Closed session should not be closed again') + print('[WORKER-WARN] Closed session should not be closed again') elif flag == 'reader': count += 1 if count > 1: @@ -374,7 +374,7 @@ class WorkerPool: while signal_handler.KEEP_PROCESSING: # Setup of parameters for workers if not kafka_reader_process.is_alive(): - print('[INFO] Restarting reader task') + print('[WORKER-INFO] Restarting reader task') del kafka_reader_process kafka_reader_process = Process(target=read_from_kafka, args=(reader_conn, kafka_task_params)) kafka_reader_process.start() @@ -404,7 +404,7 @@ class WorkerPool: try: results.append(async_result.get(timeout=32 * UPLOAD_RATE)) except TimeoutError as e: - print('[TimeoutError] Decoding of messages is taking longer than expected') + print('[WORKER-TimeoutError] Decoding of messages is taking longer than expected') raise e events_batch, sessions_insert_batch, sessions_update_batch, session_ids, messages = self._pool_response_handler( pool_results=results) @@ -412,11 +412,11 @@ class WorkerPool: table_name, EVENT_TYPE) self.save_snapshot(database_api) main_conn.send('CONTINUE') - print('[INFO] Sending close signal') + print('[WORKER-INFO] Sending close signal') main_conn.send('CLOSE') self.terminate() kafka_reader_process.terminate() - print('[SHUTDOWN] Process terminated') + print('[WORKER-SHUTDOWN] Process terminated') def load_checkpoint(self, database_api): file = database_api.load_binary(name='checkpoint')