diff --git a/ee/connectors/deploy/Dockerfile_redshift b/ee/connectors/deploy/Dockerfile_redshift index 23b093fad..75af55b15 100644 --- a/ee/connectors/deploy/Dockerfile_redshift +++ b/ee/connectors/deploy/Dockerfile_redshift @@ -19,5 +19,5 @@ COPY handler.py . COPY consumer_pool.py . COPY fill_from_db.py . COPY entrypoint.sh . - +ENV replace_interval=300 ENTRYPOINT ./entrypoint.sh diff --git a/ee/connectors/deploy/requirements_redshift.txt b/ee/connectors/deploy/requirements_redshift.txt index e7151e9da..51696ca24 100644 --- a/ee/connectors/deploy/requirements_redshift.txt +++ b/ee/connectors/deploy/requirements_redshift.txt @@ -2,6 +2,7 @@ chardet==5.1.0 idna==3.4 confluent-kafka==2.1.1 psycopg2-binary==2.9.6 +apscheduler==3.10.1 python-decouple==3.8 pytz==2022.6 requests==2.28.1 diff --git a/ee/connectors/entrypoint.sh b/ee/connectors/entrypoint.sh index 523704b97..2eb4b7ae7 100755 --- a/ee/connectors/entrypoint.sh +++ b/ee/connectors/entrypoint.sh @@ -1,2 +1,2 @@ echo "[INFO] Service start" -python -u consumer_pool.py +python -u consumer_pool.py & python -u fill_from_db.py && fg diff --git a/ee/connectors/fill_from_db.py b/ee/connectors/fill_from_db.py index 9bd7fe18b..c5f60a9a7 100644 --- a/ee/connectors/fill_from_db.py +++ b/ee/connectors/fill_from_db.py @@ -1,7 +1,12 @@ import pandas_redshift as pdredshift +import pandas as pd +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger from utils import pg_client from decouple import config, Choices import asyncio +from time import time + DATABASE = config('CLOUD_SERVICE') sessions_table_name = config('SESSIONS_TABLE', default='connector_user_sessions') @@ -31,30 +36,77 @@ pdredshift.connect_to_redshift(dbname=cluster_info['DBNAME'], password=cluster_info['PASSWORD'], sslmode=sslmode) -async def main(limit = 100): + +async def main(): + limit = config('FILL_QUERY_LIMIT', default=100, cast=int) + t = time() query = "SELECT sessionid FROM {table} WHERE user_id = 'NULL' LIMIT {limit}" try: res = pdredshift.redshift_to_pandas(query.format(table=table, limit=limit)) except Exception as e: print(repr(e)) - if len(res) == 0: + res = list() + if res is None: + return + elif len(res) == 0: return sessionids = list(map(lambda k: str(k), res['sessionid'])) - await pg_client.init() with pg_client.PostgresClient() as conn: conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format( session_id_list = ','.join(sessionids)) ) pg_res = conn.fetchall() + df = pd.DataFrame(pg_res) + df.dropna(inplace=True) + df = df.groupby('user_id').agg({'session_id': lambda x: list(x)}) + base_query = "UPDATE {table} SET user_id = CASE".format(table=table) + template = "\nWHEN sessionid IN ({session_ids}) THEN '{user_id}'" + all_ids = list() + for i in range(len(df)): + user = df.iloc[i].name + if user == '' or user == 'None' or user == 'NULL': + continue + aux = [str(sess) for sess in df.iloc[i].session_id] + all_ids += aux + if len(aux) == 0: + continue + base_query += template.format(user_id=user, session_ids=','.join(aux)) + base_query += f"\nEND WHERE sessionid IN ({','.join(all_ids)})" + if len(all_ids) == 0: + return + print(base_query[:200]) + try: + pdredshift.exec_commit(base_query) + except Exception as e: + print(repr(e)) + print(f'[FILL-INFO] {time()-t} - for {len(sessionids)} elements') + + +cron_jobs = [ + {"func": main, "trigger": IntervalTrigger(seconds=15), "misfire_grace_time": 60, "max_instances": 1}, +] + + +def get_or_create_eventloop(): + try: + return asyncio.get_event_loop() + except RuntimeError as ex: + if "There is no current event loop in thread" in str(ex): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return asyncio.get_event_loop() - base_query = "UPDATE {table} SET user_id = '{user_id}' WHERE sessionid = {session_id}" - for e in pg_res: - query = base_query.format(table=table, user_id=e['user_id'], session_id=e['session_id']) - try: - pdredshift.exec_commit(query) - except Exception as e: - print(repr(e)) if __name__ == '__main__': - asyncio.run(main()) + scheduler = AsyncIOScheduler() + asyncio.run(pg_client.init()) + for job in cron_jobs: + scheduler.add_job(id=job['func'].__name__, **job) + loop = get_or_create_eventloop() + scheduler.start() + try: + loop.run_forever() + except (KeyboardInterrupt, SystemExit): + pass + asyncio.run(pg_client.terminate()) diff --git a/ee/connectors/msgcodec/setup-messages.py b/ee/connectors/msgcodec/setup-messages.py new file mode 100644 index 000000000..567e9ec1f --- /dev/null +++ b/ee/connectors/msgcodec/setup-messages.py @@ -0,0 +1,9 @@ +#from setuptools import setup +from distutils.core import setup +from Cython.Build import cythonize + +setup( + ext_modules = cythonize("messages.pyx"), + include_package_data=True, + package_data={"": ["*.pxd"]}, +) diff --git a/ee/connectors/msgcodec/setup-msgcodec.py b/ee/connectors/msgcodec/setup-msgcodec.py new file mode 100644 index 000000000..a1db3d5b7 --- /dev/null +++ b/ee/connectors/msgcodec/setup-msgcodec.py @@ -0,0 +1,9 @@ +#from setuptools import setup +from distutils.core import setup +from Cython.Build import cythonize + +setup( + ext_modules = cythonize("msgcodec.pyx"), + include_package_data=True, + package_data={"": ["*.pxd"]}, +) diff --git a/ee/connectors/utils/cache.py b/ee/connectors/utils/cache.py index b2e92ca51..ad9f3daa7 100644 --- a/ee/connectors/utils/cache.py +++ b/ee/connectors/utils/cache.py @@ -27,24 +27,27 @@ class CachedSessions: self.session_project = dict() self.max_alive_time = config('MAX_SESSION_LIFE', default=7800, cast=int) # Default 2 hours - def create(self, sessionid): + def create(self, sessionid: int): """Saves a new session with status OPEN and set its insertion time""" - self.session_project[sessionid] = (time(), 'OPEN') + _sessionid = str(sessionid) + self.session_project[_sessionid] = (time(), 'OPEN') - def add(self, sessionid): + def add(self, sessionid: int): """Handle the creation of a cached session or update its status if already in cache""" - if sessionid in self.session_project.keys(): - if self.session_project[sessionid][1] == 'CLOSE': - tmp = self.session_project[sessionid] - self.session_project[sessionid] = (tmp[0], 'UPDATE') + _sessionid = str(sessionid) + if _sessionid in self.session_project.keys(): + if self.session_project[_sessionid][1] == 'CLOSE': + tmp = self.session_project[_sessionid] + self.session_project[_sessionid] = (tmp[0], 'UPDATE') else: self.create(sessionid) - def close(self, sessionid): + def close(self, sessionid: int): """Sets status of session to closed session (received sessionend message)""" - tmp = self.session_project[sessionid] + _sessionid = str(sessionid) + tmp = self.session_project[_sessionid] old_status = tmp[1] - self.session_project[sessionid] = (tmp[0], 'CLOSE') + self.session_project[_sessionid] = (tmp[0], 'CLOSE') return old_status def clear_sessions(self): @@ -53,9 +56,9 @@ class CachedSessions: current_time = time() for sessionid, values in self.session_project.items(): if current_time - values[0] > self.max_alive_time: - to_clean_list.append(sessionid) + to_clean_list.append(int(sessionid)) for sessionid in to_clean_list: - del self.session_project[sessionid] + del self.session_project[str(sessionid)] return to_clean_list diff --git a/ee/connectors/utils/worker.py b/ee/connectors/utils/worker.py index 9ae0633ee..f00b100eb 100644 --- a/ee/connectors/utils/worker.py +++ b/ee/connectors/utils/worker.py @@ -41,6 +41,7 @@ elif EVENT_TYPE == 'detailed': events_messages = [1, 4, 21, 22, 25, 27, 31, 32, 39, 48, 59, 64, 69, 78, 125, 126] allowed_messages = list(set(session_messages + events_messages)) codec = MessageCodec(allowed_messages) +max_kafka_read = config('MAX_KAFKA_READ', default=60000, cast=int) def init_consumer(): @@ -95,7 +96,7 @@ class ProjectFilter: self.sessions_lifespan = CachedSessions() self.non_valid_sessions_cache = dict() - def is_valid(self, sessionId): + def is_valid(self, sessionId: int): if len(self.project_filter) == 0: return True elif sessionId in self.sessions_lifespan.session_project.keys(): @@ -103,13 +104,36 @@ class ProjectFilter: elif sessionId in self.non_valid_sessions_cache.keys(): return False else: - projetId = project_from_session(sessionId) - if projetId not in self.project_filter: + projectId = project_from_session(sessionId) + if projectId not in self.project_filter: self.non_valid_sessions_cache[sessionId] = int(datetime.now().timestamp()) return False else: return True + def already_checked(self, sessionId): + if len(self.project_filter) == 0: + return True, True + elif sessionId in self.sessions_lifespan.session_project.keys(): + return True, True + elif sessionId in self.non_valid_sessions_cache.keys(): + return True, False + else: + return False, None + + def are_valid(self, sessionIds: list[int]): + valid_sessions = list() + if len(self.project_filter) == 0: + return sessionIds + projects_session = project_from_sessions(list(set(sessionIds))) + current_datetime = int(datetime.now().timestamp()) + for projectId, sessionId in projects_session: + if projectId not in self.project_filter: + self.non_valid_sessions_cache[sessionId] = current_datetime + else: + valid_sessions.append(sessionId) + return valid_sessions + def handle_clean(self): if len(self.project_filter) == 0: return @@ -121,45 +145,68 @@ class ProjectFilter: def read_from_kafka(pipe: Connection, params: dict): - global UPLOAD_RATE - try: - asyncio.run(pg_client.init()) - kafka_consumer = init_consumer() - project_filter = params['project_filter'] - while True: - to_decode = list() - sessionIds = list() - start_time = datetime.now().timestamp() - broken_batchs = 0 - n_messages = 0 - while datetime.now().timestamp() - start_time < UPLOAD_RATE: + global UPLOAD_RATE, max_kafka_read + # try: + asyncio.run(pg_client.init()) + kafka_consumer = init_consumer() + project_filter = params['project_filter'] + capture_messages = list() + capture_sessions = list() + while True: + to_decode = list() + sessionIds = list() + start_time = datetime.now().timestamp() + broken_batchs = 0 + n_messages = 0 + while datetime.now().timestamp() - start_time < UPLOAD_RATE and max_kafka_read > n_messages: + try: msg = kafka_consumer.poll(5.0) - if msg is None: - continue - n_messages += 1 - try: - sessionId = codec.decode_key(msg.key()) - except Exception: - broken_batchs += 1 - continue - if project_filter.is_valid(sessionId): - to_decode.append(msg.value()) - sessionIds.append(sessionId) - if n_messages != 0: - print( - f'[INFO] Found {broken_batchs} broken batch over {n_messages} read messages ({100 * broken_batchs / n_messages:.2f}%)') - else: - print('[WARN] No messages read') - non_valid_updated = project_filter.non_valid_sessions_cache - pipe.send((non_valid_updated, sessionIds, to_decode)) - continue_signal = pipe.recv() - if continue_signal == 'CLOSE': - break - kafka_consumer.commit() - close_consumer(kafka_consumer) - asyncio.run(pg_client.terminate()) - except Exception as e: - print('[WARN]', repr(e)) + except Exception as e: + print(e) + if msg is None: + continue + n_messages += 1 + try: + sessionId = codec.decode_key(msg.key()) + except Exception: + broken_batchs += 1 + continue + checked, is_valid = project_filter.already_checked(sessionId) + if not checked: + capture_sessions.append(sessionId) + capture_messages.append(msg.value()) + elif is_valid: + to_decode.append(msg.value()) + sessionIds.append(sessionId) + # if project_filter.is_valid(sessionId): + # to_decode.append(msg.value()) + # sessionIds.append(sessionId) + valid_sessions = project_filter.are_valid(list(set(capture_sessions))) + while capture_sessions: + sessId = capture_sessions.pop() + msg = capture_messages.pop() + if sessId in valid_sessions: + sessionIds.append(sessId) + to_decode.append(msg) + if n_messages != 0: + print( + f'[INFO-bg] Found {broken_batchs} broken batch over {n_messages} read messages ({100 * broken_batchs / n_messages:.2f}%)') + else: + print('[WARN-bg] No messages read') + non_valid_updated = project_filter.non_valid_sessions_cache + pipe.send((non_valid_updated, sessionIds, to_decode)) + continue_signal = pipe.recv() + if continue_signal == 'CLOSE': + print('[SHUTDOWN-reader] Reader shutting down') + break + kafka_consumer.commit() + print('[INFO] Closing consumer') + close_consumer(kafka_consumer) + print('[INFO] Closing pg connection') + asyncio.run(pg_client.terminate()) + print('[INFO] Successfully closed reader task') + # except Exception as e: + # print('[WARN]', repr(e)) def into_batch(batch: list[Event | DetailedEvent], session_id: int, n: Session): @@ -184,6 +231,32 @@ def project_from_session(sessionId: int): return res['project_id'] +def project_from_sessions(sessionIds: list[int]): + """Search projectId of requested sessionId in PG table sessions""" + response = list() + while sessionIds: + sessIds = sessionIds[-1000:] + try: + with pg_client.PostgresClient() as conn: + conn.execute( + "SELECT session_id, project_id FROM sessions WHERE session_id IN ({sessionIds})".format( + sessionIds=','.join([str(sessId) for sessId in sessIds]) + ) + ) + res = conn.fetchall() + except Exception as e: + print('[project_from_sessions]', repr(e)) + raise e + if res is None: + print(f'[WARN] sessionids {",".join([str(sessId) for sessId in sessIds])} not found in sessions table') + else: + response += res + sessionIds = sessionIds[:-1000] + if not response: + return [] + return [(e['project_id'], e['session_id']) for e in response] + + def decode_message(params: dict): global codec, session_messages, events_messages, EVENT_TYPE if len(params['message']) == 0: @@ -301,6 +374,8 @@ class WorkerPool: while signal_handler.KEEP_PROCESSING: # Setup of parameters for workers if not kafka_reader_process.is_alive(): + print('[INFO] Restarting reader task') + del kafka_reader_process kafka_reader_process = Process(target=read_from_kafka, args=(reader_conn, kafka_task_params)) kafka_reader_process.start() decoding_params = [{'flag': 'decoder', @@ -333,14 +408,15 @@ class WorkerPool: raise e events_batch, sessions_insert_batch, sessions_update_batch, session_ids, messages = self._pool_response_handler( pool_results=results) - insertBatch(events_batch, sessions_insert_batch, sessions_update_batch, database_api, sessions_table_name, table_name, EVENT_TYPE) self.save_snapshot(database_api) main_conn.send('CONTINUE') + print('[INFO] Sending close signal') main_conn.send('CLOSE') self.terminate() kafka_reader_process.terminate() + print('[SHUTDOWN] Process terminated') def load_checkpoint(self, database_api): file = database_api.load_binary(name='checkpoint') @@ -349,7 +425,7 @@ class WorkerPool: if 'version' not in checkpoint.keys(): sessions_cache_list = checkpoint['cache'] reload_default_time = datetime.now().timestamp() - self.project_filter_class.non_valid_sessions_cache = {sessId: reload_default_time for sessId, value in + self.project_filter_class.non_valid_sessions_cache = {int(sessId): reload_default_time for sessId, value in sessions_cache_list.items() if not value[1]} self.project_filter_class.sessions_lifespan.session_project = checkpoint['cached_sessions'] elif checkpoint['version'] == 'v1.0':