diff --git a/ee/connectors/Readme.md b/ee/connectors/Readme.md new file mode 100644 index 000000000..833eb8246 --- /dev/null +++ b/ee/connectors/Readme.md @@ -0,0 +1,3 @@ +## Build + +docker build -f deploy/Dockerfile_redshift -t {tag} . diff --git a/ee/connectors/consumer.py b/ee/connectors/consumer.py index 1c3488642..75979ec43 100644 --- a/ee/connectors/consumer.py +++ b/ee/connectors/consumer.py @@ -1,19 +1,28 @@ -import os +from decouple import config from confluent_kafka import Consumer from datetime import datetime from collections import defaultdict +import json +from time import time +#from msgcodec.codec import MessageCodec from msgcodec.msgcodec import MessageCodec from msgcodec.messages import SessionEnd +print('[INFO] Importing DBConnection...') from db.api import DBConnection +print('[INFO] Importing from models..') from db.models import events_detailed_table_name, events_table_name, sessions_table_name +print('[INFO] Importing from writer..') from db.writer import insert_batch +print('[INFO] Importing from handler..') from handler import handle_message, handle_normal_message, handle_session -DATABASE = os.environ['DATABASE_NAME'] -LEVEL = os.environ['level'] +DATABASE = config('DATABASE_NAME') +LEVEL = config('LEVEL') +print(f'[INFO] Connecting to database {DATABASE}') db = DBConnection(DATABASE) +print('Connected successfully') if LEVEL == 'detailed': table_name = events_detailed_table_name @@ -22,30 +31,34 @@ elif LEVEL == 'normal': def main(): - batch_size = 4000 - sessions_batch_size = 400 + batch_size = config('events_batch_size', default=4000, cast=int) + sessions_batch_size = config('sessions_batch_size', default=400, cast=int) batch = [] sessions = defaultdict(lambda: None) sessions_batch = [] codec = MessageCodec() - consumer = Consumer({ - "security.protocol": "SSL", - "bootstrap.servers": ",".join([os.environ['KAFKA_SERVER_1'], - os.environ['KAFKA_SERVER_2']]), + ssl_protocol = config('SSL_ENABLED', default=True, cast=bool) + consumer_settings = { + "bootstrap.servers": config('KAFKA_SERVER'), "group.id": f"connector_{DATABASE}", "auto.offset.reset": "earliest", "enable.auto.commit": False - }) + } + if ssl_protocol: + consumer_settings['security.protocol'] = 'SSL' + consumer = Consumer(consumer_settings) - consumer.subscribe(["raw", "raw_ios"]) + consumer.subscribe([config("topic", default="saas-raw")]) print("Kafka consumer subscribed") + t_ = time() while True: - msg.consumer.poll(1.0) + msg = consumer.poll(1.0) if msg is None: continue - messages = codec.decode_detailed(msg.value) - session_id = codec.decode_key(msg.key) + #value = json.loads(msg.value().decode('utf-8')) + messages = codec.decode_detailed(msg.value()) + session_id = codec.decode_key(msg.key()) if messages is None: print('-') continue @@ -68,7 +81,11 @@ def main(): # try to insert sessions if len(sessions_batch) >= sessions_batch_size: + t2 = time() attempt_session_insert(sessions_batch) + t2_ = time() + print(f'[INFO] Inserted sessions into Redshift - time spent: {t2_-t2}') + t_ += t2_-t2 for s in sessions_batch: try: del sessions[s.sessionid] @@ -86,10 +103,15 @@ def main(): # insert a batch of events if len(batch) >= batch_size: + t1 = time() + print(f'[INFO] Spent time filling ({batch_size})-batch: {t1-t_}') attempt_batch_insert(batch) + t1_ = time() + t_ = t1_ + print(f'[INFO] Inserted events into Redshift - time spent: {t1_-t1}') batch = [] consumer.commit() - print("sessions in cache:", len(sessions)) + print("[INFO] sessions in cache:", len(sessions)) def attempt_session_insert(sess_batch): @@ -134,4 +156,6 @@ def decode_key(b) -> int: return decoded if __name__ == '__main__': + print('[INFO] Setup complete') + print('[INFO] Starting script') main() diff --git a/ee/connectors/consumer_async.py b/ee/connectors/consumer_async.py new file mode 100644 index 000000000..5e0d57610 --- /dev/null +++ b/ee/connectors/consumer_async.py @@ -0,0 +1,229 @@ +from numpy._typing import _16Bit +from decouple import config, Csv +from confluent_kafka import Consumer +from datetime import datetime +from collections import defaultdict +import json +import asyncio +from time import time, sleep +from copy import deepcopy + +from msgcodec.msgcodec import MessageCodec +from msgcodec.messages import SessionStart, SessionEnd +from db.api import DBConnection +from db.models import events_detailed_table_name, events_table_name, sessions_table_name +from db.writer import insert_batch, update_batch +from handler import handle_message, handle_normal_message, handle_session +from utils.cache import ProjectFilter as PF +from utils import pg_client + +from psycopg2 import InterfaceError + +def process_message(msg, codec, sessions, batch, sessions_batch, interesting_sessions, interesting_events, EVENT_TYPE, projectFilter): + if msg is None: + return + messages = codec.decode_detailed(msg.value()) + session_id = codec.decode_key(msg.key()) + if messages is None: + print('-') + return + elif not projectFilter.is_valid(session_id): + # We check using projectFilter if session_id is from the selected projects + return + + for message in messages: + if message.__id__ in interesting_events: + if EVENT_TYPE == 'detailed': + n = handle_message(message) + elif EVENT_TYPE == 'normal': + n = handle_normal_message(message) + if message.__id__ in interesting_sessions: + + # Here we create the session if not exists or append message event if session exists + sessions[session_id] = handle_session(sessions[session_id], message) + if sessions[session_id]: + sessions[session_id].sessionid = session_id + projectFilter.cached_sessions.add(session_id) + + if isinstance(message, SessionEnd): + # Here only if session exists and we get sessionend we start cleanup + if sessions[session_id].session_start_timestamp: + projectFilter.handle_clean() + old_status = projectFilter.cached_sessions.close(session_id) + sessions_batch.append((old_status, deepcopy(sessions[session_id]))) + sessions_to_delete = projectFilter.cached_sessions.clear_sessions() + for sess_id in sessions_to_delete: + try: + del sessions[sess_id] + except KeyError: + print('[INFO] Session already deleted') + else: + print('[WARN] Session not started received SessionEnd message') + del sessions[session_id] + + if message.__id__ in interesting_events: + if n: + n.sessionid = session_id + n.received_at = int(datetime.now().timestamp() * 1000) + n.batch_order_number = len(batch) + batch.append(n) + else: + continue + + +def attempt_session_insert(sess_batch, db, sessions_table_name, try_=0): + if sess_batch: + try: + print("inserting sessions...") + insert_batch(db, sess_batch, table=sessions_table_name, level='sessions') + print("inserted sessions succesfully") + except TypeError as e: + print("Type conversion error") + print(repr(e)) + except ValueError as e: + print("Message value could not be processed or inserted correctly") + print(repr(e)) + except InterfaceError as e: + if try_ < 3: + try_ += 1 + sleep(try_*2) + attempt_session_insert(sess_batch, db, sessions_table_name, try_) + except Exception as e: + print(repr(e)) + + +def attempt_session_update(sess_batch, db, sessions_table_name): + if sess_batch: + try: + print('updating sessions') + update_batch(db, sess_batch, table=sessions_table_name) + except TypeError as e: + print('Type conversion error') + print(repr(e)) + except ValueError as e: + print('Message value could not be processed or inserted correctly') + print(repr(e)) + except InterfaceError as e: + print('Error while trying to update session into datawarehouse') + print(repr(e)) + except Exception as e: + print(repr(e)) + + +def attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_=0): + # insert a batch + try: + print("inserting...") + insert_batch(db=db, batch=batch, table=table_name, level=EVENT_TYPE) + print("inserted succesfully") + except TypeError as e: + print("Type conversion error") + print(repr(e)) + except ValueError as e: + print("Message value could not be processed or inserted correctly") + print(repr(e)) + except InterfaceError as e: + if try_ < 3: + try_ += 1 + sleep(try_*2) + attempt_batch_insert(batch, db, table_name, EVENT_TYPE, try_) + else: + # TODO: Restart redshift + print(repr(e)) + except Exception as e: + print(repr(e)) + +def decode_key(b) -> int: + """ + Decode the message key (encoded with little endian) + """ + try: + decoded = int.from_bytes(b, "little", signed=False) + except Exception as e: + raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}") + return decoded + + +async def main(): + await pg_client.init() + DATABASE = config('CLOUD_SERVICE') + EVENT_TYPE = config('EVENT_TYPE') + + db = DBConnection(DATABASE) + upload_rate = config('upload_rate', default=30, cast=int) + + if EVENT_TYPE == 'detailed': + table_name = events_detailed_table_name + elif EVENT_TYPE == 'normal': + table_name = events_table_name + + batch = [] + sessions = defaultdict(lambda: None) + sessions_batch = [] + + sessions_events_selection = [1,25,28,29,30,31,32,54,56,62,69,78,125,126] + if EVENT_TYPE == 'normal': + selected_events = [21,22,25,27,64,78,125] + elif EVENT_TYPE == 'detailed': + selected_events = [1,4,21,22,25,27,31,32,39,48,59,64,69,78,125,126] + filter_events = list(set(sessions_events_selection+selected_events)) + + allowed_projects = config('PROJECT_IDS', default=None, cast=Csv(int)) + project_filter = PF(allowed_projects) + codec = MessageCodec(filter_events) + ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool) + consumer_settings = { + "bootstrap.servers": config('KAFKA_SERVERS'), + "group.id": f"connector_{DATABASE}", + "auto.offset.reset": "earliest", + "enable.auto.commit": False + } + if ssl_protocol: + consumer_settings['security.protocol'] = 'SSL' + consumer = Consumer(consumer_settings) + + consumer.subscribe(config("TOPICS", default="saas-raw").split(',')) + print("[INFO] Kafka consumer subscribed") + + c_time = time() + read_msgs = 0 + while True: + msg = consumer.poll(1.0) + process_message(msg, codec, sessions, batch, sessions_batch, sessions_events_selection, selected_events, EVENT_TYPE, project_filter) + read_msgs += 1 + if time() - c_time > upload_rate: + print(f'[INFO] {read_msgs} kafka messages read in {upload_rate} seconds') + await insertBatch(deepcopy(sessions_batch), deepcopy(batch), db, sessions_table_name, table_name, EVENT_TYPE) + consumer.commit() + sessions_batch = [] + batch = [] + read_msgs = 0 + c_time = time() + + + +async def insertBatch(sessions_batch, batch, db, sessions_table_name, table_name, EVENT_TYPE): + t1 = time() + print(f'[BG-INFO] Number of events to add {len(batch)}, number of sessions to add {len(sessions_batch)}') + new_sessions = list() + updated_sessions = list() + for old_status, session_in_batch in sessions_batch: + if old_status == 'UPDATE': + updated_sessions.append(session_in_batch) + else: + new_sessions.append(session_in_batch) + print(f'[DEBUG] Number of new sessions {len(new_sessions)}, number of sessions to update {len(updated_sessions)}') + if new_sessions != []: + attempt_session_insert(new_sessions, db, sessions_table_name) + + if updated_sessions != []: + attempt_session_update(updated_sessions, db, sessions_table_name) + + # insert a batch of events + if batch != []: + attempt_batch_insert(batch, db, table_name, EVENT_TYPE) + print(f'[BG-INFO] Uploaded into S3 in {time()-t1} seconds') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/ee/connectors/db/api.py b/ee/connectors/db/api.py index b3d7cb086..daeced903 100644 --- a/ee/connectors/db/api.py +++ b/ee/connectors/db/api.py @@ -3,10 +3,10 @@ from sqlalchemy import MetaData from sqlalchemy.orm import sessionmaker, session from contextlib import contextmanager import logging -import os +from decouple import config as _config from pathlib import Path -DATABASE = os.environ['DATABASE_NAME'] +DATABASE = _config('CLOUD_SERVICE') if DATABASE == 'redshift': import pandas_redshift as pr @@ -34,7 +34,7 @@ class DBConnection: """ Initializes connection to a database To update models file use: - sqlacodegen --outfile models_universal.py mysql+pymysql://{user}:{pwd}@{address} + sqlacodegen --outfile models_universal.py mysql+pymysql://{USER}:{pwd}@{HOST} """ _sessions = sessionmaker() @@ -44,53 +44,67 @@ class DBConnection: if config == 'redshift': self.pdredshift = pr - self.pdredshift.connect_to_redshift(dbname=os.environ['schema'], - host=os.environ['address'], - port=os.environ['port'], - user=os.environ['user'], - password=os.environ['password']) + ci = _config('cluster_info', default='') + cluster_info = dict() + if ci == '': + cluster_info['USER'] = _config('USER') + cluster_info['HOST'] = _config('HOST') + cluster_info['PORT'] = _config('PORT') + cluster_info['PASSWORD'] = _config('PASSWORD') + cluster_info['DBNAME'] = _config('DBNAME') + else: + ci = ci.split(' ') + cluster_info = dict() + for _d in ci: + k,v = _d.split('=') + cluster_info[k]=v + self.pdredshift.connect_to_redshift(dbname=cluster_info['DBNAME'], + host=cluster_info['HOST'], + port=cluster_info['PORT'], + user=cluster_info['USER'], + password=cluster_info['PASSWORD']) - self.pdredshift.connect_to_s3(aws_access_key_id=os.environ['aws_access_key_id'], - aws_secret_access_key=os.environ['aws_secret_access_key'], - bucket=os.environ['bucket'], - subdirectory=os.environ['subdirectory']) + self.pdredshift.connect_to_s3(aws_access_key_id=_config('AWS_ACCESS_KEY_ID'), + aws_secret_access_key=_config('AWS_SECRET_ACCESS_KEY'), + bucket=_config('BUCKET'), + subdirectory=_config('SUBDIRECTORY', default=None)) - self.connect_str = os.environ['connect_str'].format( - user=os.environ['user'], - password=os.environ['password'], - address=os.environ['address'], - port=os.environ['port'], - schema=os.environ['schema'] + self.CONNECTION_STRING = _config('CONNECTION_STRING').format( + USER=cluster_info['USER'], + PASSWORD=cluster_info['PASSWORD'], + HOST=cluster_info['HOST'], + PORT=cluster_info['PORT'], + DBNAME=cluster_info['DBNAME'] ) - self.engine = create_engine(self.connect_str) + self.engine = create_engine(self.CONNECTION_STRING) elif config == 'clickhouse': - self.connect_str = os.environ['connect_str'].format( - address=os.environ['address'], - database=os.environ['database'] + self.CONNECTION_STRING = _config('CONNECTION_STRING').format( + HOST=_config('HOST'), + DATABASE=_config('DATABASE') ) - self.engine = create_engine(self.connect_str) + self.engine = create_engine(self.CONNECTION_STRING) elif config == 'pg': - self.connect_str = os.environ['connect_str'].format( - user=os.environ['user'], - password=os.environ['password'], - address=os.environ['address'], - port=os.environ['port'], - database=os.environ['database'] + self.CONNECTION_STRING = _config('CONNECTION_STRING').format( + USER=_config('USER'), + PASSWORD=_config('PASSWORD'), + HOST=_config('HOST'), + PORT=_config('PORT'), + DATABASE=_config('DATABASE') ) - self.engine = create_engine(self.connect_str) + self.engine = create_engine(self.CONNECTION_STRING) elif config == 'bigquery': pass elif config == 'snowflake': - self.connect_str = os.environ['connect_str'].format( - user=os.environ['user'], - password=os.environ['password'], - account=os.environ['account'], - database=os.environ['database'], - schema = os.environ['schema'], - warehouse = os.environ['warehouse'] + self.CONNECTION_STRING = _config('CONNECTION_STRING').format( + USER=_config('USER'), + PASSWORD=_config('PASSWORD'), + ACCOUNT=_config('ACCOUNT'), + DATABASE=_config('DATABASE'), + DBNAME = _config('DBNAME'), + WAREHOUSE = _config('WAREHOUSE') ) - self.engine = create_engine(self.connect_str) + self.engine = create_engine(self.CONNECTION_STRING) else: raise ValueError("This db configuration doesn't exist. Add into keys file.") @@ -128,3 +142,10 @@ class DBConnection: my_session.close() connection.close() + def restart(self): + self.close() + self.__init__(config=self.config) + + def close(self): + if self.config == 'redshift': + self.pdredshift.close_up_shop() diff --git a/ee/connectors/db/models.py b/ee/connectors/db/models.py index 74cd23230..7e80e1313 100644 --- a/ee/connectors/db/models.py +++ b/ee/connectors/db/models.py @@ -2,9 +2,9 @@ from sqlalchemy import BigInteger, Boolean, Column, Integer, ARRAY, VARCHAR, text, VARCHAR from sqlalchemy.ext.declarative import declarative_base from pathlib import Path -import os +from decouple import config -DATABASE = os.environ['DATABASE_NAME'] +DATABASE = config('CLOUD_SERVICE') Base = declarative_base() metadata = Base.metadata @@ -13,16 +13,16 @@ base_path = Path(__file__).parent.parent # Get a table name from a configuration file try: - events_table_name = os.environ['events_table_name'] + events_table_name = config('EVENTS_TABLE_NAME', default='connector_events') except KeyError as e: events_table_name = None print(repr(e)) try: - events_detailed_table_name = os.environ['events_detailed_table_name'] + events_detailed_table_name = config('EVENTS_DETAILED_TABLE_NAME', default='connector_events_detailed') except KeyError as e: print(repr(e)) events_detailed_table_name = None -sessions_table_name = os.environ['sessions_table'] +sessions_table_name = config('SESSIONS_TABLE', default='connector_user_sessions') class Session(Base): @@ -69,65 +69,42 @@ class Session(Base): # ISSUES AND EVENTS js_exceptions_count = Column(BigInteger) - long_tasks_total_duration = Column(BigInteger) - long_tasks_max_duration = Column(BigInteger) - long_tasks_count = Column(BigInteger) + #long_tasks_total_duration = Column(BigInteger) + #long_tasks_max_duration = Column(BigInteger) + #long_tasks_count = Column(BigInteger) inputs_count = Column(BigInteger) clicks_count = Column(BigInteger) issues_count = Column(BigInteger) - issues = ARRAY(VARCHAR(5000)) urls_count = Column(BigInteger) - urls = ARRAY(VARCHAR(5000)) class Event(Base): __tablename__ = events_table_name sessionid = Column(BigInteger, primary_key=True) - connectioninformation_downlink = Column(BigInteger) - connectioninformation_type = Column(VARCHAR(5000)) consolelog_level = Column(VARCHAR(5000)) consolelog_value = Column(VARCHAR(5000)) - customevent_messageid = Column(BigInteger) customevent_name = Column(VARCHAR(5000)) customevent_payload = Column(VARCHAR(5000)) - customevent_timestamp = Column(BigInteger) - errorevent_message = Column(VARCHAR(5000)) - errorevent_messageid = Column(BigInteger) - errorevent_name = Column(VARCHAR(5000)) - errorevent_payload = Column(VARCHAR(5000)) - errorevent_source = Column(VARCHAR(5000)) - errorevent_timestamp = Column(BigInteger) jsexception_message = Column(VARCHAR(5000)) jsexception_name = Column(VARCHAR(5000)) jsexception_payload = Column(VARCHAR(5000)) - metadata_key = Column(VARCHAR(5000)) - metadata_value = Column(VARCHAR(5000)) - mouseclick_id = Column(BigInteger) - mouseclick_hesitationtime = Column(BigInteger) - mouseclick_label = Column(VARCHAR(5000)) - pageevent_firstcontentfulpaint = Column(BigInteger) - pageevent_firstpaint = Column(BigInteger) - pageevent_messageid = Column(BigInteger) - pageevent_referrer = Column(VARCHAR(5000)) - pageevent_speedindex = Column(BigInteger) - pageevent_timestamp = Column(BigInteger) - pageevent_url = Column(VARCHAR(5000)) - pagerendertiming_timetointeractive = Column(BigInteger) - pagerendertiming_visuallycomplete = Column(BigInteger) - rawcustomevent_name = Column(VARCHAR(5000)) - rawcustomevent_payload = Column(VARCHAR(5000)) - setviewportsize_height = Column(BigInteger) - setviewportsize_width = Column(BigInteger) - timestamp_timestamp = Column(BigInteger) - user_anonymous_id = Column(VARCHAR(5000)) - user_id = Column(VARCHAR(5000)) - issueevent_messageid = Column(BigInteger) + jsexception_metadata = Column(VARCHAR(5000)) + networkrequest_type = Column(VARCHAR(5000)) + networkrequest_method = Column(VARCHAR(5000)) + networkrequest_url = Column(VARCHAR(5000)) + networkrequest_request = Column(VARCHAR(5000)) + networkrequest_response = Column(VARCHAR(5000)) + networkrequest_status = Column(BigInteger) + networkrequest_timestamp = Column(BigInteger) + networkrequest_duration = Column(BigInteger) + issueevent_message_id = Column(BigInteger) issueevent_timestamp = Column(BigInteger) issueevent_type = Column(VARCHAR(5000)) - issueevent_contextstring = Column(VARCHAR(5000)) + issueevent_context_string = Column(VARCHAR(5000)) issueevent_context = Column(VARCHAR(5000)) issueevent_payload = Column(VARCHAR(5000)) + issueevent_url = Column(VARCHAR(5000)) customissue_name = Column(VARCHAR(5000)) customissue_payload = Column(VARCHAR(5000)) received_at = Column(BigInteger) @@ -137,7 +114,6 @@ class Event(Base): class DetailedEvent(Base): __tablename__ = events_detailed_table_name - # id = Column(Integer, primary_key=True, server_default=text("\"identity\"(119029, 0, '0,1'::text)")) sessionid = Column(BigInteger, primary_key=True) clickevent_hesitationtime = Column(BigInteger) clickevent_label = Column(VARCHAR(5000)) @@ -147,28 +123,8 @@ class DetailedEvent(Base): connectioninformation_type = Column(VARCHAR(5000)) consolelog_level = Column(VARCHAR(5000)) consolelog_value = Column(VARCHAR(5000)) - cpuissue_duration = Column(BigInteger) - cpuissue_rate = Column(BigInteger) - cpuissue_timestamp = Column(BigInteger) - createdocument = Column(Boolean) - createelementnode_id = Column(BigInteger) - createelementnode_parentid = Column(BigInteger) - cssdeleterule_index = Column(BigInteger) - cssdeleterule_stylesheetid = Column(BigInteger) - cssinsertrule_index = Column(BigInteger) - cssinsertrule_rule = Column(VARCHAR(5000)) - cssinsertrule_stylesheetid = Column(BigInteger) - customevent_messageid = Column(BigInteger) customevent_name = Column(VARCHAR(5000)) customevent_payload = Column(VARCHAR(5000)) - customevent_timestamp = Column(BigInteger) - domdrop_timestamp = Column(BigInteger) - errorevent_message = Column(VARCHAR(5000)) - errorevent_messageid = Column(BigInteger) - errorevent_name = Column(VARCHAR(5000)) - errorevent_payload = Column(VARCHAR(5000)) - errorevent_source = Column(VARCHAR(5000)) - errorevent_timestamp = Column(BigInteger) fetch_duration = Column(BigInteger) fetch_method = Column(VARCHAR(5000)) fetch_request = Column(VARCHAR(5000)) @@ -180,9 +136,6 @@ class DetailedEvent(Base): graphql_operationname = Column(VARCHAR(5000)) graphql_response = Column(VARCHAR(5000)) graphql_variables = Column(VARCHAR(5000)) - graphqlevent_messageid = Column(BigInteger) - graphqlevent_name = Column(VARCHAR(5000)) - graphqlevent_timestamp = Column(BigInteger) inputevent_label = Column(VARCHAR(5000)) inputevent_messageid = Column(BigInteger) inputevent_timestamp = Column(BigInteger) @@ -191,26 +144,18 @@ class DetailedEvent(Base): jsexception_message = Column(VARCHAR(5000)) jsexception_name = Column(VARCHAR(5000)) jsexception_payload = Column(VARCHAR(5000)) - memoryissue_duration = Column(BigInteger) - memoryissue_rate = Column(BigInteger) - memoryissue_timestamp = Column(BigInteger) - metadata_key = Column(VARCHAR(5000)) - metadata_value = Column(VARCHAR(5000)) - mobx_payload = Column(VARCHAR(5000)) - mobx_type = Column(VARCHAR(5000)) + jsexception_metadata = Column(VARCHAR(5000)) mouseclick_id = Column(BigInteger) mouseclick_hesitationtime = Column(BigInteger) mouseclick_label = Column(VARCHAR(5000)) - mousemove_x = Column(BigInteger) - mousemove_y = Column(BigInteger) - movenode_id = Column(BigInteger) - movenode_index = Column(BigInteger) - movenode_parentid = Column(BigInteger) - ngrx_action = Column(VARCHAR(5000)) - ngrx_duration = Column(BigInteger) - ngrx_state = Column(VARCHAR(5000)) - otable_key = Column(VARCHAR(5000)) - otable_value = Column(VARCHAR(5000)) + networkrequest_type = Column(VARCHAR(5000)) + networkrequest_method = Column(VARCHAR(5000)) + networkrequest_url = Column(VARCHAR(5000)) + networkrequest_request = Column(VARCHAR(5000)) + networkrequest_response = Column(VARCHAR(5000)) + networkrequest_status = Column(BigInteger) + networkrequest_timestamp = Column(BigInteger) + networkrequest_duration = Column(BigInteger) pageevent_domcontentloadedeventend = Column(BigInteger) pageevent_domcontentloadedeventstart = Column(BigInteger) pageevent_firstcontentfulpaint = Column(BigInteger) @@ -226,77 +171,8 @@ class DetailedEvent(Base): pageevent_speedindex = Column(BigInteger) pageevent_timestamp = Column(BigInteger) pageevent_url = Column(VARCHAR(5000)) - pageloadtiming_domcontentloadedeventend = Column(BigInteger) - pageloadtiming_domcontentloadedeventstart = Column(BigInteger) - pageloadtiming_firstcontentfulpaint = Column(BigInteger) - pageloadtiming_firstpaint = Column(BigInteger) - pageloadtiming_loadeventend = Column(BigInteger) - pageloadtiming_loadeventstart = Column(BigInteger) - pageloadtiming_requeststart = Column(BigInteger) - pageloadtiming_responseend = Column(BigInteger) - pageloadtiming_responsestart = Column(BigInteger) - pagerendertiming_speedindex = Column(BigInteger) - pagerendertiming_timetointeractive = Column(BigInteger) - pagerendertiming_visuallycomplete = Column(BigInteger) - performancetrack_frames = Column(BigInteger) - performancetrack_ticks = Column(BigInteger) - performancetrack_totaljsheapsize = Column(BigInteger) - performancetrack_usedjsheapsize = Column(BigInteger) - performancetrackaggr_avgcpu = Column(BigInteger) - performancetrackaggr_avgfps = Column(BigInteger) - performancetrackaggr_avgtotaljsheapsize = Column(BigInteger) - performancetrackaggr_avgusedjsheapsize = Column(BigInteger) - performancetrackaggr_maxcpu = Column(BigInteger) - performancetrackaggr_maxfps = Column(BigInteger) - performancetrackaggr_maxtotaljsheapsize = Column(BigInteger) - performancetrackaggr_maxusedjsheapsize = Column(BigInteger) - performancetrackaggr_mincpu = Column(BigInteger) - performancetrackaggr_minfps = Column(BigInteger) - performancetrackaggr_mintotaljsheapsize = Column(BigInteger) - performancetrackaggr_minusedjsheapsize = Column(BigInteger) - performancetrackaggr_timestampend = Column(BigInteger) - performancetrackaggr_timestampstart = Column(BigInteger) - profiler_args = Column(VARCHAR(5000)) - profiler_duration = Column(BigInteger) - profiler_name = Column(VARCHAR(5000)) - profiler_result = Column(VARCHAR(5000)) - rawcustomevent_name = Column(VARCHAR(5000)) - rawcustomevent_payload = Column(VARCHAR(5000)) - rawerrorevent_message = Column(VARCHAR(5000)) - rawerrorevent_name = Column(VARCHAR(5000)) - rawerrorevent_payload = Column(VARCHAR(5000)) - rawerrorevent_source = Column(VARCHAR(5000)) - rawerrorevent_timestamp = Column(BigInteger) - redux_action = Column(VARCHAR(5000)) - redux_duration = Column(BigInteger) - redux_state = Column(VARCHAR(5000)) - removenode_id = Column(BigInteger) - removenodeattribute_id = Column(BigInteger) - removenodeattribute_name = Column(VARCHAR(5000)) - resourceevent_decodedbodysize = Column(BigInteger) - resourceevent_duration = Column(BigInteger) - resourceevent_encodedbodysize = Column(BigInteger) - resourceevent_headersize = Column(BigInteger) - resourceevent_messageid = Column(BigInteger) - resourceevent_method = Column(VARCHAR(5000)) - resourceevent_status = Column(BigInteger) - resourceevent_success = Column(Boolean) - resourceevent_timestamp = Column(BigInteger) - resourceevent_ttfb = Column(BigInteger) - resourceevent_type = Column(VARCHAR(5000)) - resourceevent_url = Column(VARCHAR(5000)) - resourcetiming_decodedbodysize = Column(BigInteger) - resourcetiming_duration = Column(BigInteger) - resourcetiming_encodedbodysize = Column(BigInteger) - resourcetiming_headersize = Column(BigInteger) - resourcetiming_initiator = Column(VARCHAR(5000)) - resourcetiming_timestamp = Column(BigInteger) - resourcetiming_ttfb = Column(BigInteger) - resourcetiming_url = Column(VARCHAR(5000)) - sessiondisconnect = Column(Boolean) - sessiondisconnect_timestamp = Column(BigInteger) - sessionend = Column(Boolean) sessionend_timestamp = Column(BigInteger) + sessionend_encryption_key = Column(VARCHAR(5000)) sessionstart_projectid = Column(BigInteger) sessionstart_revid = Column(VARCHAR(5000)) sessionstart_timestamp = Column(BigInteger) @@ -312,65 +188,18 @@ class DetailedEvent(Base): sessionstart_useros = Column(VARCHAR(5000)) sessionstart_userosversion = Column(VARCHAR(5000)) sessionstart_useruuid = Column(VARCHAR(5000)) - setcssdata_data = Column(BigInteger) - setcssdata_id = Column(BigInteger) - setinputchecked_checked = Column(BigInteger) - setinputchecked_id = Column(BigInteger) - setinputtarget_id = Column(BigInteger) - setinputtarget_label = Column(BigInteger) - setinputvalue_id = Column(BigInteger) - setinputvalue_mask = Column(BigInteger) - setinputvalue_value = Column(BigInteger) - setnodeattribute_id = Column(BigInteger) - setnodeattribute_name = Column(BigInteger) - setnodeattribute_value = Column(BigInteger) - setnodedata_data = Column(BigInteger) - setnodedata_id = Column(BigInteger) - setnodescroll_id = Column(BigInteger) - setnodescroll_x = Column(BigInteger) - setnodescroll_y = Column(BigInteger) setpagelocation_navigationstart = Column(BigInteger) setpagelocation_referrer = Column(VARCHAR(5000)) setpagelocation_url = Column(VARCHAR(5000)) - setpagevisibility_hidden = Column(Boolean) - setviewportscroll_x = Column(BigInteger) - setviewportscroll_y = Column(BigInteger) - setviewportsize_height = Column(BigInteger) - setviewportsize_width = Column(BigInteger) - stateaction_type = Column(VARCHAR(5000)) - stateactionevent_messageid = Column(BigInteger) - stateactionevent_timestamp = Column(BigInteger) - stateactionevent_type = Column(VARCHAR(5000)) - timestamp_timestamp = Column(BigInteger) - useranonymousid_id = Column(VARCHAR(5000)) - userid_id = Column(VARCHAR(5000)) - vuex_mutation = Column(VARCHAR(5000)) - vuex_state = Column(VARCHAR(5000)) - longtasks_timestamp = Column(BigInteger) - longtasks_duration = Column(BigInteger) - longtasks_context = Column(BigInteger) - longtasks_containertype = Column(BigInteger) - longtasks_containersrc = Column(VARCHAR(5000)) - longtasks_containerid = Column(VARCHAR(5000)) - longtasks_containername = Column(VARCHAR(5000)) - setnodeurlbasedattribute_id = Column(BigInteger) - setnodeurlbasedattribute_name = Column(VARCHAR(5000)) - setnodeurlbasedattribute_value = Column(VARCHAR(5000)) - setnodeurlbasedattribute_baseurl = Column(VARCHAR(5000)) - setstyledata_id = Column(BigInteger) - setstyledata_data = Column(VARCHAR(5000)) - setstyledata_baseurl = Column(VARCHAR(5000)) - issueevent_messageid = Column(BigInteger) + issueevent_message_id = Column(BigInteger) issueevent_timestamp = Column(BigInteger) issueevent_type = Column(VARCHAR(5000)) - issueevent_contextstring = Column(VARCHAR(5000)) + issueevent_context_string = Column(VARCHAR(5000)) issueevent_context = Column(VARCHAR(5000)) issueevent_payload = Column(VARCHAR(5000)) - technicalinfo_type = Column(VARCHAR(5000)) - technicalinfo_value = Column(VARCHAR(5000)) + issueevent_url = Column(VARCHAR(5000)) customissue_name = Column(VARCHAR(5000)) customissue_payload = Column(VARCHAR(5000)) - pageclose = Column(Boolean) received_at = Column(BigInteger) batch_order_number = Column(BigInteger) diff --git a/ee/connectors/db/tables.py b/ee/connectors/db/tables.py index f9e6eaaef..df502cab1 100644 --- a/ee/connectors/db/tables.py +++ b/ee/connectors/db/tables.py @@ -1,86 +1,107 @@ from pathlib import Path +from decouple import config base_path = Path(__file__).parent.parent - +EVENT_TYPE = config('EVENT_TYPE', default='normal') def create_tables_clickhouse(db): - with open(base_path / 'sql' / 'clickhouse_events.sql') as f: - q = f.read() - db.engine.execute(q) - print(f"`connector_user_events` table created succesfully.") + if EVENT_TYPE == 'normal': + with open(base_path / 'sql' / 'clickhouse_events.sql', 'r') as f: + q = f.read() + with db.get_live_session() as conn: + conn.execute(q) + print(f"`connector_user_events` table created succesfully.") - with open(base_path / 'sql' / 'clickhouse_events_buffer.sql') as f: - q = f.read() - db.engine.execute(q) - print(f"`connector_user_events_buffer` table created succesfully.") + with open(base_path / 'sql' / 'clickhouse_events_buffer.sql', 'r') as f: + q = f.read() + with db.get_live_session() as conn: + conn.execute(q) + print(f"`connector_user_events_buffer` table created succesfully.") - with open(base_path / 'sql' / 'clickhouse_sessions.sql') as f: + with open(base_path / 'sql' / 'clickhouse_sessions.sql', 'r') as f: q = f.read() - db.engine.execute(q) + with db.get_live_session() as conn: + conn.execute(q) print(f"`connector_sessions` table created succesfully.") - with open(base_path / 'sql' / 'clickhouse_sessions_buffer.sql') as f: + with open(base_path / 'sql' / 'clickhouse_sessions_buffer.sql', 'r') as f: q = f.read() - db.engine.execute(q) + with db.get_live_session() as conn: + conn.execute(q) print(f"`connector_sessions_buffer` table created succesfully.") - #with open(base_path / 'sql' / 'clickhouse_events_detailed.sql') as f: - # q = f.read() - #db.engine.execute(q) - #print(f"`connector_user_events_detailed` table created succesfully.") - - #with open(base_path / 'sql' / 'clickhouse_events_detailed_buffer.sql') as f: - # q = f.read() - #db.engine.execute(q) - #print(f"`connector_user_events_detailed_buffer` table created succesfully.") + if EVENT_TYPE == 'detailed': + with open(base_path / 'sql' / 'clickhouse_events_detailed.sql') as f: + q = f.read() + with db.get_live_session() as conn: conn.execute(q) + print(f"`connector_user_events_detailed` table created succesfully.") + + with open(base_path / 'sql' / 'clickhouse_events_detailed_buffer.sql') as f: + q = f.read() + with db.get_live_session() as conn: conn.execute(q) + print(f"`connector_user_events_detailed_buffer` table created succesfully.") def create_tables_postgres(db): - with open(base_path / 'sql' / 'postgres_events.sql') as f: - q = f.read() - db.engine.execute(q) - print(f"`connector_user_events` table created succesfully.") + if EVENT_TYPE == 'normal': + with open(base_path / 'sql' / 'postgres_events.sql', 'r') as f: + q = f.read() + with db.get_live_session() as conn: + conn.execute(q) + print(f"`connector_user_events` table created succesfully.") - with open(base_path / 'sql' / 'postgres_sessions.sql') as f: + with open(base_path / 'sql' / 'postgres_sessions.sql', 'r') as f: q = f.read() - db.engine.execute(q) + with db.get_live_session() as conn: + conn.execute(q) print(f"`connector_sessions` table created succesfully.") - #with open(base_path / 'sql' / 'postgres_events_detailed.sql') as f: - # q = f.read() - #db.engine.execute(q) - #print(f"`connector_user_events_detailed` table created succesfully.") + if EVENT_TYPE == 'detailed': + with open(base_path / 'sql' / 'postgres_events_detailed.sql') as f: + q = f.read() + with db.get_live_session() as conn: conn.execute(q) + print(f"`connector_user_events_detailed` table created succesfully.") def create_tables_snowflake(db): - with open(base_path / 'sql' / 'snowflake_events.sql') as f: - q = f.read() - db.engine.execute(q) - print(f"`connector_user_events` table created succesfully.") - with open(base_path / 'sql' / 'snowflake_sessions.sql') as f: + if EVENT_TYPE == 'normal': + with open(base_path / 'sql' / 'snowflake_events.sql', 'r') as f: + q = f.read() + with db.get_live_session() as conn: + conn.execute(q) + print(f"`connector_user_events` table created succesfully.") + + with open(base_path / 'sql' / 'snowflake_sessions.sql', 'r') as f: q = f.read() - db.engine.execute(q) + with db.get_live_session() as conn: + conn.execute(q) print(f"`connector_sessions` table created succesfully.") - #with open(base_path / 'sql' / 'snowflake_events_detailed.sql') as f: - # q = f.read() - #db.engine.execute(q) - #print(f"`connector_user_events_detailed` table created succesfully.") + if EVENT_TYPE == 'detailed': + with open(base_path / 'sql' / 'snowflake_events_detailed.sql') as f: + q = f.read() + with db.get_live_session() as conn: conn.execute(q) + print(f"`connector_user_events_detailed` table created succesfully.") def create_tables_redshift(db): - with open(base_path / 'sql' / 'redshift_events.sql') as f: - q = f.read() - db.engine.execute(q) - print(f"`connector_user_events` table created succesfully.") + if EVENT_TYPE == 'normal': + with open(base_path / 'sql' / 'redshift_events.sql', 'r') as f: + q = f.read() + with db.get_live_session() as conn: + conn.execute(q) + print(f"`connector_user_events` table created succesfully.") - with open(base_path / 'sql' / 'redshift_sessions.sql') as f: + with open(base_path / 'sql' / 'redshift_sessions.sql', 'r') as f: q = f.read() - db.engine.execute(q) + with db.get_live_session() as conn: + conn.execute(q) print(f"`connector_sessions` table created succesfully.") - #with open(base_path / 'sql' / 'redshift_events_detailed.sql') as f: - # q = f.read() - #db.engine.execute(q) - #print(f"`connector_user_events_detailed` table created succesfully.") + if EVENT_TYPE == 'detailed': + with open(base_path / 'sql' / 'redshift_events_detailed.sql') as f: + q = f.read() + with db.get_live_session() as conn: conn.execute(q) + print(f"`connector_user_events_detailed` table created succesfully.") + diff --git a/ee/connectors/db/utils.py b/ee/connectors/db/utils.py index 698a6043a..e19399340 100644 --- a/ee/connectors/db/utils.py +++ b/ee/connectors/db/utils.py @@ -1,334 +1,167 @@ import pandas as pd from db.models import DetailedEvent, Event, Session, DATABASE -dtypes_events = {'sessionid': "Int64", - 'connectioninformation_downlink': "Int64", - 'connectioninformation_type': "string", - 'consolelog_level': "string", - 'consolelog_value': "string", - 'customevent_messageid': "Int64", - 'customevent_name': "string", - 'customevent_payload': "string", - 'customevent_timestamp': "Int64", - 'errorevent_message': "string", - 'errorevent_messageid': "Int64", - 'errorevent_name': "string", - 'errorevent_payload': "string", - 'errorevent_source': "string", - 'errorevent_timestamp': "Int64", - 'jsexception_message': "string", - 'jsexception_name': "string", - 'jsexception_payload': "string", - 'metadata_key': "string", - 'metadata_value': "string", - 'mouseclick_id': "Int64", - 'mouseclick_hesitationtime': "Int64", - 'mouseclick_label': "string", - 'pageevent_firstcontentfulpaint': "Int64", - 'pageevent_firstpaint': "Int64", - 'pageevent_messageid': "Int64", - 'pageevent_referrer': "string", - 'pageevent_speedindex': "Int64", - 'pageevent_timestamp': "Int64", - 'pageevent_url': "string", - 'pagerendertiming_timetointeractive': "Int64", - 'pagerendertiming_visuallycomplete': "Int64", - 'rawcustomevent_name': "string", - 'rawcustomevent_payload': "string", - 'setviewportsize_height': "Int64", - 'setviewportsize_width': "Int64", - 'timestamp_timestamp': "Int64", - 'user_anonymous_id': "string", - 'user_id': "string", - 'issueevent_messageid': "Int64", - 'issueevent_timestamp': "Int64", - 'issueevent_type': "string", - 'issueevent_contextstring': "string", - 'issueevent_context': "string", - 'issueevent_payload': "string", - 'customissue_name': "string", - 'customissue_payload': "string", - 'received_at': "Int64", - 'batch_order_number': "Int64"} +dtypes_events = { + 'sessionid': "Int64", + 'consolelog_level': "string", + 'consolelog_value': "string", + 'customevent_name': "string", + 'customevent_payload': "string", + 'jsexception_message': "string", + 'jsexception_name': "string", + 'jsexception_payload': "string", + 'jsexception_metadata': "string", + 'networkrequest_type': "string", + 'networkrequest_method': "string", + 'networkrequest_url': "string", + 'networkrequest_request': "string", + 'networkrequest_response': "string", + 'networkrequest_status': "Int64", + 'networkrequest_timestamp': "Int64", + 'networkrequest_duration': "Int64", + 'issueevent_message_id': "Int64", + 'issueevent_timestamp': "Int64", + 'issueevent_type': "string", + 'issueevent_context_string': "string", + 'issueevent_context': "string", + 'issueevent_url': "string", + 'issueevent_payload': "string", + 'customissue_name': "string", + 'customissue_payload': "string", + 'received_at': "Int64", + 'batch_order_number': "Int64"} dtypes_detailed_events = { - "sessionid": "Int64", - "clickevent_hesitationtime": "Int64", - "clickevent_label": "object", - "clickevent_messageid": "Int64", - "clickevent_timestamp": "Int64", - "connectioninformation_downlink": "Int64", - "connectioninformation_type": "object", - "consolelog_level": "object", - "consolelog_value": "object", - "cpuissue_duration": "Int64", - "cpuissue_rate": "Int64", - "cpuissue_timestamp": "Int64", - "createdocument": "boolean", - "createelementnode_id": "Int64", - "createelementnode_parentid": "Int64", - "cssdeleterule_index": "Int64", - "cssdeleterule_stylesheetid": "Int64", - "cssinsertrule_index": "Int64", - "cssinsertrule_rule": "object", - "cssinsertrule_stylesheetid": "Int64", - "customevent_messageid": "Int64", - "customevent_name": "object", - "customevent_payload": "object", - "customevent_timestamp": "Int64", - "domdrop_timestamp": "Int64", - "errorevent_message": "object", - "errorevent_messageid": "Int64", - "errorevent_name": "object", - "errorevent_payload": "object", - "errorevent_source": "object", - "errorevent_timestamp": "Int64", - "fetch_duration": "Int64", - "fetch_method": "object", - "fetch_request": "object", - "fetch_response": "object", - "fetch_status": "Int64", - "fetch_timestamp": "Int64", - "fetch_url": "object", - "graphql_operationkind": "object", - "graphql_operationname": "object", - "graphql_response": "object", - "graphql_variables": "object", - "graphqlevent_messageid": "Int64", - "graphqlevent_name": "object", - "graphqlevent_timestamp": "Int64", - "inputevent_label": "object", - "inputevent_messageid": "Int64", - "inputevent_timestamp": "Int64", - "inputevent_value": "object", - "inputevent_valuemasked": "boolean", - "jsexception_message": "object", - "jsexception_name": "object", - "jsexception_payload": "object", - "longtasks_timestamp": "Int64", - "longtasks_duration": "Int64", - "longtasks_containerid": "object", - "longtasks_containersrc": "object", - "memoryissue_duration": "Int64", - "memoryissue_rate": "Int64", - "memoryissue_timestamp": "Int64", - "metadata_key": "object", - "metadata_value": "object", - "mobx_payload": "object", - "mobx_type": "object", - "mouseclick_id": "Int64", - "mouseclick_hesitationtime": "Int64", - "mouseclick_label": "object", - "mousemove_x": "Int64", - "mousemove_y": "Int64", - "movenode_id": "Int64", - "movenode_index": "Int64", - "movenode_parentid": "Int64", - "ngrx_action": "object", - "ngrx_duration": "Int64", - "ngrx_state": "object", - "otable_key": "object", - "otable_value": "object", - "pageevent_domcontentloadedeventend": "Int64", - "pageevent_domcontentloadedeventstart": "Int64", - "pageevent_firstcontentfulpaint": "Int64", - "pageevent_firstpaint": "Int64", - "pageevent_loaded": "boolean", - "pageevent_loadeventend": "Int64", - "pageevent_loadeventstart": "Int64", - "pageevent_messageid": "Int64", - "pageevent_referrer": "object", - "pageevent_requeststart": "Int64", - "pageevent_responseend": "Int64", - "pageevent_responsestart": "Int64", - "pageevent_speedindex": "Int64", - "pageevent_timestamp": "Int64", - "pageevent_url": "object", - "pageloadtiming_domcontentloadedeventend": "Int64", - "pageloadtiming_domcontentloadedeventstart": "Int64", - "pageloadtiming_firstcontentfulpaint": "Int64", - "pageloadtiming_firstpaint": "Int64", - "pageloadtiming_loadeventend": "Int64", - "pageloadtiming_loadeventstart": "Int64", - "pageloadtiming_requeststart": "Int64", - "pageloadtiming_responseend": "Int64", - "pageloadtiming_responsestart": "Int64", - "pagerendertiming_speedindex": "Int64", - "pagerendertiming_timetointeractive": "Int64", - "pagerendertiming_visuallycomplete": "Int64", - "performancetrack_frames": "Int64", - "performancetrack_ticks": "Int64", - "performancetrack_totaljsheapsize": "Int64", - "performancetrack_usedjsheapsize": "Int64", - "performancetrackaggr_avgcpu": "Int64", - "performancetrackaggr_avgfps": "Int64", - "performancetrackaggr_avgtotaljsheapsize": "Int64", - "performancetrackaggr_avgusedjsheapsize": "Int64", - "performancetrackaggr_maxcpu": "Int64", - "performancetrackaggr_maxfps": "Int64", - "performancetrackaggr_maxtotaljsheapsize": "Int64", - "performancetrackaggr_maxusedjsheapsize": "Int64", - "performancetrackaggr_mincpu": "Int64", - "performancetrackaggr_minfps": "Int64", - "performancetrackaggr_mintotaljsheapsize": "Int64", - "performancetrackaggr_minusedjsheapsize": "Int64", - "performancetrackaggr_timestampend": "Int64", - "performancetrackaggr_timestampstart": "Int64", - "profiler_args": "object", - "profiler_duration": "Int64", - "profiler_name": "object", - "profiler_result": "object", - "rawcustomevent_name": "object", - "rawcustomevent_payload": "object", - "rawerrorevent_message": "object", - "rawerrorevent_name": "object", - "rawerrorevent_payload": "object", - "rawerrorevent_source": "object", - "rawerrorevent_timestamp": "Int64", - "redux_action": "object", - "redux_duration": "Int64", - "redux_state": "object", - "removenode_id": "Int64", - "removenodeattribute_id": "Int64", - "removenodeattribute_name": "object", - "resourceevent_decodedbodysize": "Int64", - "resourceevent_duration": "Int64", - "resourceevent_encodedbodysize": "Int64", - "resourceevent_headersize": "Int64", - "resourceevent_messageid": "Int64", - "resourceevent_method": "object", - "resourceevent_status": "Int64", - "resourceevent_success": "boolean", - "resourceevent_timestamp": "Int64", - "resourceevent_ttfb": "Int64", - "resourceevent_type": "object", - "resourceevent_url": "object", - "resourcetiming_decodedbodysize": "Int64", - "resourcetiming_duration": "Int64", - "resourcetiming_encodedbodysize": "Int64", - "resourcetiming_headersize": "Int64", - "resourcetiming_initiator": "object", - "resourcetiming_timestamp": "Int64", - "resourcetiming_ttfb": "Int64", - "resourcetiming_url": "object", - "sessiondisconnect": "boolean", - "sessiondisconnect_timestamp": "Int64", - "sessionend": "boolean", - "sessionend_timestamp": "Int64", - "sessionstart_projectid": "Int64", - "sessionstart_revid": "object", - "sessionstart_timestamp": "Int64", - "sessionstart_trackerversion": "object", - "sessionstart_useragent": "object", - "sessionstart_userbrowser": "object", - "sessionstart_userbrowserversion": "object", - "sessionstart_usercountry": "object", - "sessionstart_userdevice": "object", - "sessionstart_userdeviceheapsize": "Int64", - "sessionstart_userdevicememorysize": "Int64", - "sessionstart_userdevicetype": "object", - "sessionstart_useros": "object", - "sessionstart_userosversion": "object", - "sessionstart_useruuid": "object", - "setcssdata_data": "Int64", - "setcssdata_id": "Int64", - "setinputchecked_checked": "Int64", - "setinputchecked_id": "Int64", - "setinputtarget_id": "Int64", - "setinputtarget_label": "Int64", - "setinputvalue_id": "Int64", - "setinputvalue_mask": "Int64", - "setinputvalue_value": "Int64", - "setnodeattribute_id": "Int64", - "setnodeattribute_name": "Int64", - "setnodeattribute_value": "Int64", - "setnodedata_data": "Int64", - "setnodedata_id": "Int64", - "setnodescroll_id": "Int64", - "setnodescroll_x": "Int64", - "setnodescroll_y": "Int64", - "setpagelocation_navigationstart": "Int64", - "setpagelocation_referrer": "object", - "setpagelocation_url": "object", - "setpagevisibility_hidden": "boolean", - "setviewportscroll_x": "Int64", - "setviewportscroll_y": "Int64", - "setviewportsize_height": "Int64", - "setviewportsize_width": "Int64", - "stateaction_type": "object", - "stateactionevent_messageid": "Int64", - "stateactionevent_timestamp": "Int64", - "stateactionevent_type": "object", - "timestamp_timestamp": "Int64", - "useranonymousid_id": "object", - "userid_id": "object", - "vuex_mutation": "object", - "vuex_state": "string", - "received_at": "Int64", - "batch_order_number": "Int64", - - #NEW - 'setnodeurlbasedattribute_id': 'Int64', - 'setnodeurlbasedattribute_name': 'string', - 'setnodeurlbasedattribute_value': 'string', - 'setnodeurlbasedattribute_baseurl': 'string', - 'setstyledata_id': 'Int64', - 'setstyledata_data': 'string', - 'setstyledata_baseurl': 'string', - 'customissue_payload': 'string', - 'customissue_name': 'string', - 'technicalinfo_value': 'string', - 'technicalinfo_type': 'string', - 'issueevent_payload': 'string', - 'issueevent_context': 'string', - 'issueevent_contextstring': 'string', - 'issueevent_type': 'string' +"sessionid": "Int64", +"clickevent_hesitationtime": "Int64", +"clickevent_label": "string", +"clickevent_messageid": "Int64", +"clickevent_timestamp": "Int64", +"connectioninformation_downlink": "Int64", +"connectioninformation_type": "string", +"consolelog_level": "string", +"consolelog_value": "string", +"customevent_name": "string", +"customevent_payload": "string", +"fetch_duration": "Int64", +"fetch_method": "string", +"fetch_request": "string", +"fetch_response": "string", +"fetch_status": "Int64", +"fetch_timestamp": "Int64", +"fetch_url": "string", +"graphql_operationkind": "string", +"graphql_operationname": "string", +"graphql_response": "string", +"graphql_variables": "string", +"inputevent_label": "string", +"inputevent_messageid": "Int64", +"inputevent_timestamp": "Int64", +"inputevent_value": "string", +"inputevent_valuemasked": "boolean", +"jsexception_message": "string", +"jsexception_name": "string", +"jsexception_payload": "string", +"jsexception_metadata": "string", +"mouseclick_id": "Int64", +"mouseclick_hesitationtime": "Int64", +"mouseclick_label": "string", +"networkrequest_type": "string", +"networkrequest_method": "string", +"networkrequest_url": "string", +"networkrequest_request": "string", +"networkrequest_response": "string", +"networkrequest_status": "Int64", +"networkrequest_timestamp": "Int64", +"networkrequest_duration": "Int64", +"pageevent_domcontentloadedeventend": "Int64", +"pageevent_domcontentloadedeventstart": "Int64", +"pageevent_firstcontentfulpaint": "Int64", +"pageevent_firstpaint": "Int64", +"pageevent_loaded": "boolean", +"pageevent_loadeventend": "Int64", +"pageevent_loadeventstart": "Int64", +"pageevent_messageid": "Int64", +"pageevent_referrer": "string", +"pageevent_requeststart": "Int64", +"pageevent_responseend": "Int64", +"pageevent_responsestart": "Int64", +"pageevent_speedindex": "Int64", +"pageevent_timestamp": "Int64", +"pageevent_url": "string", +"sessionend_timestamp": "Int64", +"sessionend_encryption_key": "string", +"sessionstart_projectid": "Int64", +"sessionstart_revid": "string", +"sessionstart_timestamp": "Int64", +"sessionstart_trackerversion": "string", +"sessionstart_useragent": "string", +"sessionstart_userbrowser": "string", +"sessionstart_userbrowserversion": "string", +"sessionstart_usercountry": "string", +"sessionstart_userdevice": "string", +"sessionstart_userdeviceheapsize": "Int64", +"sessionstart_userdevicememorysize": "Int64", +"sessionstart_userdevicetype": "string", +"sessionstart_useros": "string", +"sessionstart_userosversion": "string", +"sessionstart_useruuid": "string", +"setpagelocation_navigationstart": "Int64", +"setpagelocation_referrer": "string", +"setpagelocation_url": "string", +"issueevent_message_id": "Int64", +"issueevent_timestamp": "Int64", +"issueevent_type": "string", +"issueevent_context_string": "string", +"issueevent_context": "string", +"issueevent_payload": "string", +"issueevent_url": "string", +"customissue_name": "string", +"customissue_payload": "string", +"received_at": "Int64", +"batch_order_number": "Int64", } -dtypes_sessions = {'sessionid': 'Int64', - 'user_agent': 'string', - 'user_browser': 'string', - 'user_browser_version': 'string', - 'user_country': 'string', - 'user_device': 'string', - 'user_device_heap_size': 'Int64', - 'user_device_memory_size': 'Int64', - 'user_device_type': 'string', - 'user_os': 'string', - 'user_os_version': 'string', - 'user_uuid': 'string', - 'connection_effective_bandwidth': 'Int64', - 'connection_type': 'string', - 'metadata_key': 'string', - 'metadata_value': 'string', - 'referrer': 'string', - 'user_anonymous_id': 'string', - 'user_id': 'string', - 'session_start_timestamp': 'Int64', - 'session_end_timestamp': 'Int64', - 'session_duration': 'Int64', - 'first_contentful_paint': 'Int64', - 'speed_index': 'Int64', - 'visually_complete': 'Int64', - 'timing_time_to_interactive': 'Int64', - 'avg_cpu': 'Int64', - 'avg_fps': 'Int64', - 'max_cpu': 'Int64', - 'max_fps': 'Int64', - 'max_total_js_heap_size': 'Int64', - 'max_used_js_heap_size': 'Int64', - 'js_exceptions_count': 'Int64', - 'long_tasks_total_duration': 'Int64', - 'long_tasks_max_duration': 'Int64', - 'long_tasks_count': 'Int64', - 'inputs_count': 'Int64', - 'clicks_count': 'Int64', - 'issues_count': 'Int64', - 'issues': 'object', - 'urls_count': 'Int64', - 'urls': 'object'} +dtypes_sessions = {'sessionid': "Int64", + 'user_agent': "string", + 'user_browser': "string", + 'user_browser_version': "string", + 'user_country': "string", + 'user_device': "string", + 'user_device_heap_size': "Int64", + 'user_device_memory_size': "Int64", + 'user_device_type': "string", + 'user_os': "string", + 'user_os_version': "string", + 'user_uuid': "string", + 'connection_effective_bandwidth': "Int64", + 'connection_type': "string", + 'metadata_key': "string", + 'metadata_value': "string", + 'referrer': "string", + 'user_anonymous_id': "string", + 'user_id': "string", + 'session_start_timestamp': "Int64", + 'session_end_timestamp': "Int64", + 'session_duration': "Int64", + 'first_contentful_paint': "Int64", + 'speed_index': "Int64", + 'visually_complete': "Int64", + 'timing_time_to_interactive': "Int64", + 'avg_cpu': "Int64", + 'avg_fps': "Int64", + 'max_cpu': "Int64", + 'max_fps': "Int64", + 'max_total_js_heap_size': "Int64", + 'max_used_js_heap_size': "Int64", + 'js_exceptions_count': "Int64", + 'inputs_count': "Int64", + 'clicks_count': "Int64", + 'issues_count': "Int64", + 'urls_count': "Int64", + } if DATABASE == 'bigquery': - dtypes_sessions['urls'] = 'string' - dtypes_sessions['issues'] = 'string' + dtypes_sessions['urls'] = "string" + dtypes_sessions['issues'] = "string" detailed_events_col = [] for col in DetailedEvent.__dict__: @@ -360,13 +193,19 @@ def get_df_from_batch(batch, level): pass if level == 'normal': - df = df.astype(dtypes_events) + current_types = dtypes_events if level == 'detailed': + current_types = dtypes_detailed_events df['inputevent_value'] = None df['customevent_payload'] = None - df = df.astype(dtypes_detailed_events) if level == 'sessions': - df = df.astype(dtypes_sessions) + current_types = dtypes_sessions + df['js_exceptions_count'] = df['js_exceptions_count'].fillna(0) + df['inputs_count'] = df['inputs_count'].fillna(0) + df['clicks_count'] = df['clicks_count'].fillna(0) + df['issues_count'] = df['issues_count'].fillna(0) + df['urls_count'] = df['urls_count'].fillna(0) + df = df.astype(current_types) if DATABASE == 'clickhouse' and level == 'sessions': df['issues'] = df['issues'].fillna('') @@ -374,7 +213,8 @@ def get_df_from_batch(batch, level): for x in df.columns: try: - if df[x].dtype == 'string': + if df[x].dtype == "string" or current_types[x] == "string": + df[x] = df[x].fillna('NULL') df[x] = df[x].str.slice(0, 255) df[x] = df[x].str.replace("|", "") except TypeError as e: diff --git a/ee/connectors/db/writer.py b/ee/connectors/db/writer.py index 42ebc6f39..265bc5012 100644 --- a/ee/connectors/db/writer.py +++ b/ee/connectors/db/writer.py @@ -1,13 +1,14 @@ -import os +from decouple import config -DATABASE = os.environ['DATABASE_NAME'] +DATABASE = config('CLOUD_SERVICE') from db.api import DBConnection -from db.utils import get_df_from_batch +from db.utils import get_df_from_batch, dtypes_sessions from db.tables import * if DATABASE == 'redshift': from db.loaders.redshift_loader import transit_insert_to_redshift + import pandas as pd elif DATABASE == 'clickhouse': from db.loaders.clickhouse_loader import insert_to_clickhouse elif DATABASE == 'pg': @@ -21,23 +22,25 @@ else: raise Exception(f"{DATABASE}-database not supported") # create tables if don't exist -try: - db = DBConnection(DATABASE) - if DATABASE == 'pg': - create_tables_postgres(db) - if DATABASE == 'clickhouse': - create_tables_clickhouse(db) - if DATABASE == 'snowflake': - create_tables_snowflake(db) - if DATABASE == 'bigquery': - create_tables_bigquery() - if DATABASE == 'redshift': - create_tables_redshift(db) - db.engine.dispose() - db = None -except Exception as e: - print(repr(e)) - print("Please create the tables with scripts provided in " + +_build_tables = config('build_tables', default=False, cast=bool) +if _build_tables: + try: + db = DBConnection(DATABASE) + if DATABASE == 'pg': + create_tables_postgres(db) + if DATABASE == 'clickhouse': + create_tables_clickhouse(db) + if DATABASE == 'snowflake': + create_tables_snowflake(db) + if DATABASE == 'bigquery': + create_tables_bigquery() + if DATABASE == 'redshift': + create_tables_redshift(db) + db.engine.dispose() + db = None + except Exception as e: + print(repr(e)) + print("Please create the tables with scripts provided in " + f"'/sql/{DATABASE}_sessions.sql' and '/sql/{DATABASE}_events.sql'") @@ -61,3 +64,29 @@ def insert_batch(db: DBConnection, batch, table, level='normal'): if db.config == 'snowflake': insert_to_snowflake(db=db, df=df, table=table) + + +def update_batch(db: DBConnection, batch, table): + if len(batch) == 0: + return + df = get_df_from_batch(batch, level='sessions') + base_query = f"UPDATE {table} SET" + for column_name, column_type in dtypes_sessions.items(): + if column_name == 'sessionid': + continue + elif column_type == 'string': + df[column_name] = df[column_name].fillna('NULL') + base_query += f" {column_name} = " + "'{" + f"{column_name}" + "}'," + else: + df[column_name] = df[column_name].fillna(0) + base_query += f" {column_name} = " + "{" + f"{column_name}" + "}," + base_query = base_query[:-1] + " WHERE sessionid = {sessionid};" + for i in range(len(df)): + if db.config == 'redshift': + params = dict(df.iloc[i]) + query = base_query.format(**params) + try: + db.pdredshift.exec_commit(query) + except Exception as e: + print('[ERROR] Error while executing query') + print(repr(e)) diff --git a/ee/connectors/deploy/Dockerfile_bigquery b/ee/connectors/deploy/Dockerfile_bigquery index 515914e15..d838909cc 100644 --- a/ee/connectors/deploy/Dockerfile_bigquery +++ b/ee/connectors/deploy/Dockerfile_bigquery @@ -1,15 +1,10 @@ -FROM python:3.11 +FROM python:3.11-alpine WORKDIR /usr/src/app - +ENV LIBRD_VER=2.0.2 +RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base +RUN apk add --no-cache --virtual .make-deps bash make wget git gcc g++ && apk add --no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure --prefix /usr && make && make install && make clean && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps COPY . . -RUN apt update -RUN apt-get install -y libc-dev libffi-dev gcc -RUN apt update && apt -y install software-properties-common gcc -RUN git clone https://github.com/edenhill/librdkafka -RUN cd librdkafka && ./configure && make && make install && ldconfig - -RUN pip install -r ./deploy/requirements_bigquery.txt - -CMD ["python", "consumer.py"] +RUN pip install -r deploy/requirements_bigquery.txt +ENTRYPOINT ./entrypoint.sh diff --git a/ee/connectors/deploy/Dockerfile_clickhouse b/ee/connectors/deploy/Dockerfile_clickhouse index f2dad8f65..695dab174 100644 --- a/ee/connectors/deploy/Dockerfile_clickhouse +++ b/ee/connectors/deploy/Dockerfile_clickhouse @@ -1,15 +1,10 @@ -FROM python:3.11 +FROM python:3.11-alpine WORKDIR /usr/src/app - +ENV LIBRD_VER=2.0.2 +RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base +RUN apk add --no-cache --virtual .make-deps bash make wget git gcc g++ && apk add --no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure --prefix /usr && make && make install && make clean && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps COPY . . -RUN apt update -RUN apt-get install -y libc-dev libffi-dev gcc -RUN apt update && apt -y install software-properties-common gcc -RUN git clone https://github.com/edenhill/librdkafka -RUN cd librdkafka && ./configure && make && make install && ldconfig - -RUN pip install -r ./deploy/requirements_clickhouse.txt - -CMD ["python", "consumer.py"] +RUN pip install -r deploy/requirements_clickhouse.txt +ENTRYPOINT ./entrypoint.sh diff --git a/ee/connectors/deploy/Dockerfile_pg b/ee/connectors/deploy/Dockerfile_pg index a8b1c0f01..ef4a8d1ac 100644 --- a/ee/connectors/deploy/Dockerfile_pg +++ b/ee/connectors/deploy/Dockerfile_pg @@ -1,15 +1,10 @@ -FROM python:3.11 +FROM python:3.11-alpine WORKDIR /usr/src/app - +ENV LIBRD_VER=2.0.2 +RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base +RUN apk add --no-cache --virtual .make-deps bash make wget git gcc g++ && apk add --no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure --prefix /usr && make && make install && make clean && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps COPY . . -RUN apt update -RUN apt-get install -y libc-dev libffi-dev gcc -RUN apt update && apt -y install software-properties-common gcc -RUN git clone https://github.com/edenhill/librdkafka -RUN cd librdkafka && ./configure && make && make install && ldconfig - -RUN pip install -r ./deploy/requirements_pg.txt - -CMD ["python", "consumer.py"] +RUN pip install -r deploy/requirements_pg.txt +ENTRYPOINT ./entrypoint.sh diff --git a/ee/connectors/deploy/Dockerfile_redshift b/ee/connectors/deploy/Dockerfile_redshift index c69739623..0cefbc553 100644 --- a/ee/connectors/deploy/Dockerfile_redshift +++ b/ee/connectors/deploy/Dockerfile_redshift @@ -1,15 +1,15 @@ -FROM python:3.11 - -WORKDIR /usr/src/app +FROM public.ecr.aws/p1t3u8a3/connectors/redshift:base +ENV CLOUD_SERVICE=redshift \ + CONNECTION_STRING=postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DBNAME} \ + # Keep postgres connection + PG_MINCONN=3 \ + PG_MAXCONN=10 +RUN apk add --no-cache postgresql-libs lz4-libs zstd-libs +COPY deploy/requirements_redshift.txt . +RUN apk add --no-cache --virtual .build-deps gcc g++ musl-dev postgresql-dev && \ + python3 -m pip install -r requirements_redshift.txt --no-cache-dir && \ + apk --purge del .build-deps COPY . . -RUN apt update -RUN apt-get install -y libc-dev libffi-dev gcc -RUN apt update && apt -y install software-properties-common gcc -RUN git clone https://github.com/edenhill/librdkafka -RUN cd librdkafka && ./configure && make && make install && ldconfig - -RUN pip install -r ./deploy/requirements_redshift.txt - -CMD ["python", "consumer.py"] +ENTRYPOINT ./entrypoint.sh diff --git a/ee/connectors/deploy/Dockerfile_redshift_base b/ee/connectors/deploy/Dockerfile_redshift_base new file mode 100644 index 000000000..57100d8f2 --- /dev/null +++ b/ee/connectors/deploy/Dockerfile_redshift_base @@ -0,0 +1,13 @@ +FROM amancevice/pandas:2.0.0-alpine + +WORKDIR /usr/src/app +ENV LIBRD_VER=2.0.2 +WORKDIR /work +RUN apk add --no-cache --virtual .make-deps postgresql-dev gcc python3-dev \ + musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base \ + bash make wget git gcc g++ musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && \ + wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && \ + tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && \ + ./configure --prefix /usr && make && make install && make clean && \ + cd /work && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz \ + && apk del .make-deps diff --git a/ee/connectors/deploy/Dockerfile_snowflake b/ee/connectors/deploy/Dockerfile_snowflake index 1d4b926a8..5c026ca1c 100644 --- a/ee/connectors/deploy/Dockerfile_snowflake +++ b/ee/connectors/deploy/Dockerfile_snowflake @@ -1,15 +1,10 @@ -FROM python:3.11 +FROM python:3.11-alpine WORKDIR /usr/src/app - +ENV LIBRD_VER=2.0.2 +RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev linux-headers g++ libc-dev libffi-dev make cmake py-pip build-base +RUN apk add --no-cache --virtual .make-deps bash make wget git gcc g++ && apk add --no-cache musl-dev zlib-dev openssl zstd-dev pkgconfig libc-dev && wget https://github.com/edenhill/librdkafka/archive/v${LIBRD_VER}.tar.gz && tar -xvf v${LIBRD_VER}.tar.gz && cd librdkafka-${LIBRD_VER} && ./configure --prefix /usr && make && make install && make clean && rm -rf librdkafka-${LIBRD_VER} && rm -rf v${LIBRD_VER}.tar.gz && apk del .make-deps COPY . . -RUN apt update -RUN apt-get install -y libc-dev libffi-dev gcc -RUN apt update && apt -y install software-properties-common gcc -RUN git clone https://github.com/edenhill/librdkafka -RUN cd librdkafka && ./configure && make && make install && ldconfig - -RUN pip install -r ./deploy/requirements_snowflake.txt - -CMD ["python", "consumer.py"] +RUN pip install -r deploy/requirements_snowflake.txt +ENTRYPOINT ./entrypoint.sh diff --git a/ee/connectors/deploy/requirements_redshift.txt b/ee/connectors/deploy/requirements_redshift.txt index 7bd5e3f08..b32f4efcb 100644 --- a/ee/connectors/deploy/requirements_redshift.txt +++ b/ee/connectors/deploy/requirements_redshift.txt @@ -1,14 +1,14 @@ -certifi==2022.09.24 -chardet==5.0.0 -clickhouse-driver==0.2.4 -clickhouse-sqlalchemy==0.2.2 +chardet==5.1.0 idna==3.4 -confluent-kafka -psycopg2-binary==2.9.3 +confluent-kafka==2.0.2 +psycopg2-binary==2.9.6 +python-decouple==3.8 pytz==2022.6 requests==2.28.1 -SQLAlchemy==1.4.43 +SQLAlchemy==2.0.8 tzlocal -urllib3==1.26.12 +urllib3==1.26.15 +sqlalchemy-redshift +redshift-connector pandas-redshift PyYAML diff --git a/ee/connectors/entrypoint.sh b/ee/connectors/entrypoint.sh new file mode 100755 index 000000000..4ffc08864 --- /dev/null +++ b/ee/connectors/entrypoint.sh @@ -0,0 +1,2 @@ +echo "[INFO] Starting service" +python -u consumer_async.py diff --git a/ee/connectors/handler.py b/ee/connectors/handler.py index 02f393091..9843eb085 100644 --- a/ee/connectors/handler.py +++ b/ee/connectors/handler.py @@ -19,27 +19,10 @@ def handle_normal_message(message: Message) -> Optional[Event]: return n if isinstance(message, CustomEvent): - n.customevent_messageid = message.message_id n.customevent_name = message.name - n.customevent_timestamp = message.timestamp n.customevent_payload = message.payload return n - if isinstance(message, ErrorEvent): - n.errorevent_message = message.message - n.errorevent_messageid = message.message_id - n.errorevent_name = message.name - n.errorevent_payload = message.payload - n.errorevent_source = message.source - n.errorevent_timestamp = message.timestamp - return n - - if isinstance(message, JSException): - n.jsexception_name = message.name - n.jsexception_payload = message.payload - n.jsexception_message = message.message - return n - if isinstance(message, Metadata): n.metadata_key = message.key n.metadata_value = message.value @@ -52,11 +35,15 @@ def handle_normal_message(message: Message) -> Optional[Event]: n.mouseclick_selector = message.selector return n - if isinstance(message, MouseClickDepricated): - n.mouseclick_hesitationtime = message.hesitation_time - n.mouseclick_id = message.id - n.mouseclick_label = message.label - n.mouseclick_selector = '' + if isinstance(message, NetworkRequest): + n.networkrequest_type = message.type + n.networkrequest_method = message.method + n.networkrequest_url = message.url + n.networkrequest_request = message.request + n.networkrequest_response = message.response + n.networkrequest_status = message.status + n.networkrequest_timestamp = message.timestamp + n.networkrequest_duration = message.duration return n if isinstance(message, PageEvent): @@ -74,11 +61,6 @@ def handle_normal_message(message: Message) -> Optional[Event]: n.pagerendertiming_visuallycomplete = message.visually_complete return n - if isinstance(message, RawCustomEvent): - n.rawcustomevent_name = message.name - n.rawcustomevent_payload = message.payload - return n - if isinstance(message, SetViewportSize): n.setviewportsize_height = message.height n.setviewportsize_width = message.width @@ -100,9 +82,10 @@ def handle_normal_message(message: Message) -> Optional[Event]: n.issueevent_messageid = message.message_id n.issueevent_timestamp = message.timestamp n.issueevent_type = message.type - n.issueevent_contextstring = message.context_string + n.issueevent_context_string = message.context_string n.issueevent_context = message.context n.issueevent_payload = message.payload + n.issueevent_url = message.url return n if isinstance(message, CustomIssue): @@ -147,14 +130,18 @@ def handle_session(n: Session, message: Message) -> Optional[Session]: return n if isinstance(message, BatchMetadata): - n.batchmeta_page_no = message.page_no - n.batchmeta_first_index = message.first_index - n.batchmeta_timestamp = message.timestamp + n.batchmetadata_version = message.version + n.batchmetadata_page_no = message.page_no + n.batchmetadata_first_index = message.first_index + n.batchmetadata_timestamp = message.timestamp + n.batchmetadata_location = message.location return n + if isinstance(message, PartitionedMessage): - n.part_no = message.part_no - n.part_total = message.part_total + n.partitionedmessage_part_no = message.part_no + n.partitionedmessage_part_total = message.part_total + return n # if isinstance(message, IOSBatchMeta): # n.iosbatchmeta_page_no = message.page_no @@ -182,10 +169,10 @@ def handle_session(n: Session, message: Message) -> Optional[Session]: n.urls_count += 1 except TypeError: n.urls_count = 1 - try: - n.urls.append(message.url) - except AttributeError: - n.urls = [message.url] + #itry: + # n.urls.append(message.url) + #except AttributeError: + # n.urls = [message.url] return n if isinstance(message, PerformanceTrackAggr): @@ -205,30 +192,30 @@ def handle_session(n: Session, message: Message) -> Optional[Session]: n.user_anonymous_id = message.id return n - if isinstance(message, JSException): + if isinstance(message, JSException) or isinstance(message, JSExceptionDeprecated): try: n.js_exceptions_count += 1 except TypeError: n.js_exceptions_count = 1 return n - if isinstance(message, LongTask): - try: - n.long_tasks_total_duration += message.duration - except TypeError: - n.long_tasks_total_duration = message.duration + #if isinstance(message, LongTask): + # try: + # n.long_tasks_total_duration += message.duration + # except TypeError: + # n.long_tasks_total_duration = message.duration - try: - if n.long_tasks_max_duration > message.duration: - n.long_tasks_max_duration = message.duration - except TypeError: - n.long_tasks_max_duration = message.duration + # try: + # if n.long_tasks_max_duration > message.duration: + # n.long_tasks_max_duration = message.duration + # except TypeError: + # n.long_tasks_max_duration = message.duration - try: - n.long_tasks_count += 1 - except TypeError: - n.long_tasks_count = 1 - return n + # try: + # n.long_tasks_count += 1 + # except TypeError: + # n.long_tasks_count = 1 + # return n if isinstance(message, InputEvent): try: @@ -239,58 +226,36 @@ def handle_session(n: Session, message: Message) -> Optional[Session]: if isinstance(message, MouseClick): try: - n.inputs_count += 1 + n.clicks_count += 1 except TypeError: - n.inputs_count = 1 + n.clicks_count = 1 return n - if isinstance(message, IssueEvent): + if isinstance(message, IssueEvent) or isinstance(message, IssueEventDeprecated): try: n.issues_count += 1 except TypeError: n.issues_count = 1 - - n.inputs_count = 1 - return n - - if isinstance(message, MouseClickDepricated): - try: - n.inputs_count += 1 - except TypeError: - n.inputs_count = 1 - return n - - if isinstance(message, IssueEvent): - try: - n.issues_count += 1 - except TypeError: - n.issues_count = 1 - - try: - n.issues.append(message.type) - except AttributeError: - n.issues = [message.type] + #try: + # n.issues.append(message.type) + #except AttributeError: + # n.issues = [message.type] return n def handle_message(message: Message) -> Optional[DetailedEvent]: n = DetailedEvent() - if isinstance(message, SessionEnd): - n.sessionend = True - n.sessionend_timestamp = message.timestamp - return n + # if isinstance(message, SessionEnd): + # n.sessionend = True + # n.sessionend_timestamp = message.timestamp + # return n if isinstance(message, Timestamp): n.timestamp_timestamp = message.timestamp return n - if isinstance(message, SessionDisconnect): - n.sessiondisconnect = True - n.sessiondisconnect_timestamp = message.timestamp - return n - if isinstance(message, SessionStart): n.sessionstart_trackerversion = message.tracker_version n.sessionstart_revid = message.rev_id @@ -352,35 +317,27 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: n.pagerendertiming_timetointeractive = message.time_to_interactive return n - if isinstance(message, ResourceTiming): - n.resourcetiming_timestamp = message.timestamp - n.resourcetiming_duration = message.duration - n.resourcetiming_ttfb = message.ttfb - n.resourcetiming_headersize = message.header_size - n.resourcetiming_encodedbodysize = message.encoded_body_size - n.resourcetiming_decodedbodysize = message.decoded_body_size - n.resourcetiming_url = message.url - n.resourcetiming_initiator = message.initiator + # if isinstance(message, ResourceTiming): + # n.resourcetiming_timestamp = message.timestamp + # n.resourcetiming_duration = message.duration + # n.resourcetiming_ttfb = message.ttfb + # n.resourcetiming_headersize = message.header_size + # n.resourcetiming_encodedbodysize = message.encoded_body_size + # n.resourcetiming_decodedbodysize = message.decoded_body_size + # n.resourcetiming_url = message.url + # n.resourcetiming_initiator = message.initiator + # return n + + + + if isinstance(message, IntegrationEvent): + n.integrationevent_timestamp = message.timestamp + n.integrationevent_source = message.source + n.integrationevent_name = message.name + n.integrationevent_message = message.message + n.integrationevent_payload = message.payload return n - if isinstance(message, JSException): - n.jsexception_name = message.name - n.jsexception_message = message.message - n.jsexception_payload = message.payload - return n - - if isinstance(message, RawErrorEvent): - n.rawerrorevent_timestamp = message.timestamp - n.rawerrorevent_source = message.source - n.rawerrorevent_name = message.name - n.rawerrorevent_message = message.message - n.rawerrorevent_payload = message.payload - return n - - if isinstance(message, RawCustomEvent): - n.rawcustomevent_name = message.name - n.rawcustomevent_payload = message.payload - return n if isinstance(message, UserID): n.userid_id = message.id @@ -402,14 +359,78 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: return n if isinstance(message, BatchMetadata): - n.batchmeta_page_no = message.page_no - n.batchmeta_first_index = message.first_index - n.batchmeta_timestamp = message.timestamp + n.batchmetadata_version = message.version + n.batchmetadata_page_no = message.page_no + n.batchmetadata_first_index = message.first_index + n.batchmetadata_timestamp = message.timestamp + n.batchmetadata_location = message.location return n if isinstance(message, PartitionedMessage): - n.part_no = message.part_no - n.part_total = message.part_total + n.partitionedmessage_part_no = message.part_no + n.partitionedmessage_part_total = message.part_total + return n + + if isinstance(message, InputChange): + n.inputchange_id=message.id + n.inputchange_value=message.value + n.inputchange_value_masked=message.value_masked + n.inputchange_label=message.label + n.inputchange_hesitation_time=message.hesitation_time + n.inputchange_input_duration=message.input_duration + return n + + + if isinstance(message, SelectionChange): + n.selectionchange_selection_start=message.selection_start + n.selectionchange_selection_end=message.selection_end + n.selectionchange_selection=message.selection + return n + + if isinstance(message, MouseThrashing): + n.mousethrashing_timestamp=message.timestamp + return n + + if isinstance(message, UnbindNodes): + n.unbindnodes_total_removed_percent=message.total_removed_percent + return n + + + if isinstance(message, ResourceTiming): + n.resourcetiming_timestamp=message.timestamp + n.resourcetiming_duration=message.duration + n.resourcetiming_ttfb=message.ttfb + n.resourcetiming_header_size=message.header_size + n.resourcetiming_encoded_body_size=message.encoded_body_size + n.resourcetiming_decoded_body_size=message.decoded_body_size + n.resourcetiming_url=message.url + n.resourcetiming_initiator=message.initiator + n.resourcetiming_transferred_size=message.transferred_size + n.resourcetiming_cached=message.cached + return n + + + if isinstance(message, IssueEvent): + n.issueevent_message_id=message.message_id + n.issueevent_timestamp=message.timestamp + n.issueevent_type=message.type + n.issueevent_context_string=message.context_string + n.issueevent_context=message.context + n.issueevent_payload=message.payload + n.issueevent_url=message.url + return n + + if isinstance(message, SessionEnd): + n.sessionend_timestamp=message.timestamp + n.sessionend_encryption_key=message.encryption_key + return n + + + if isinstance(message, SessionSearch): + n.sessionsearch_timestamp=message.timestamp + n.sessionsearch_partition=message.partition + return n + if isinstance(message, PerformanceTrack): n.performancetrack_frames = message.frames @@ -466,44 +487,73 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: n.inputevent_label = message.label return n - if isinstance(message, ClickEvent): - n.clickevent_messageid = message.message_id - n.clickevent_timestamp = message.timestamp - n.clickevent_hesitationtime = message.hesitation_time - n.clickevent_label = message.label - return n - - if isinstance(message, ErrorEvent): - n.errorevent_messageid = message.message_id - n.errorevent_timestamp = message.timestamp - n.errorevent_source = message.source - n.errorevent_name = message.name - n.errorevent_message = message.message - n.errorevent_payload = message.payload - return n - - if isinstance(message, ResourceEvent): - n.resourceevent_messageid = message.message_id - n.resourceevent_timestamp = message.timestamp - n.resourceevent_duration = message.duration - n.resourceevent_ttfb = message.ttfb - n.resourceevent_headersize = message.header_size - n.resourceevent_encodedbodysize = message.encoded_body_size - n.resourceevent_decodedbodysize = message.decoded_body_size - n.resourceevent_url = message.url - n.resourceevent_type = message.type - n.resourceevent_success = message.success - n.resourceevent_method = message.method - n.resourceevent_status = message.status - return n - if isinstance(message, CustomEvent): - n.customevent_messageid = message.message_id - n.customevent_timestamp = message.timestamp n.customevent_name = message.name n.customevent_payload = message.payload return n + if isinstance(message, LoadFontFace): + n.loadfontface_parent_id = message.parent_id + n.loadfontface_family = message.family + n.loadfontface_source = message.source + n.loadfontface_descriptors = message.descriptors + return n + + if isinstance(message, SetNodeFocus): + n.setnodefocus_id = message.id + return n + + if isinstance(message, AdoptedSSReplaceURLBased): + n.adoptedssreplaceurlbased_sheet_id = message.sheet_id + n.adoptedssreplaceurlbased_text = message.text + n.adoptedssreplaceurlbased_base_url = message.base_url + return n + + if isinstance(message, AdoptedSSReplace): + n.adoptedssreplace_sheet_id = message.sheet_id + n.adoptedssreplace_text = message.text + return n + + if isinstance(message, AdoptedSSInsertRuleURLBased): + n.adoptedssinsertruleurlbased_sheet_id = message.sheet_id + n.adoptedssinsertruleurlbased_rule = message.rule + n.adoptedssinsertruleurlbased_index = message.index + n.adoptedssinsertruleurlbased_base_url = message.base_url + return n + + if isinstance(message, AdoptedSSInsertRule): + n.adoptedssinsertrule_sheet_id = message.sheet_id + n.adoptedssinsertrule_rule = message.rule + n.adoptedssinsertrule_index = message.index + return n + + if isinstance(message, AdoptedSSDeleteRule): + n.adoptedssdeleterule_sheet_id = message.sheet_id + n.adoptedssdeleterule_index = message.index + return n + + if isinstance(message, AdoptedSSAddOwner): + n.adoptedssaddowner_sheet_id = message.sheet_id + n.adoptedssaddowner_id = message.id + return n + + if isinstance(message, AdoptedSSRemoveOwner): + n.adoptedssremoveowner_sheet_id = message.sheet_id + n.adoptedssremoveowner_id = message.id + return n + + if isinstance(message, JSException): + n.jsexception_name = message.name + n.jsexception_message = message.message + n.jsexception_payload = message.payload + n.jsexception_metadata = message.metadata + return n + + if isinstance(message, Zustand): + n.zustand_mutation = message.mutation + n.zustand_state = message.state + return n + # if isinstance(message, CreateDocument): # n.createdocument = True # return n @@ -542,15 +592,10 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: n.fetch_duration = message.duration return n - if isinstance(message, FetchEvent): - n.fetch_event_message_id = message.message_id - n.fetch_event_timestamp = message.timestamp - n.fetch_event_method = message.method - n.fetch_event_url = message.url - n.fetch_event_request = message.request - n.fetch_event_response = message.response - n.fetch_event_status = message.status - n.fetch_event_duration = message.duration + if isinstance(message, SetNodeAttributeDict): + n.setnodeattributedict_id = message.id, + n.setnodeattributedict_name_key = message.name_key + n.setnodeattributedict_value_key = message.value_key return n if isinstance(message, Profiler): @@ -567,16 +612,6 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: n.graphql_response = message.response return n - if isinstance(message, GraphQLEvent): - n.graphqlevent_messageid = message.message_id - n.graphqlevent_timestamp = message.timestamp - n.graphqlevent_name = message.name - return n - - if isinstance(message, DomDrop): - n.domdrop_timestamp = message.timestamp - return n - if isinstance(message, MouseClick): n.mouseclick_id = message.id n.mouseclick_hesitationtime = message.hesitation_time @@ -584,13 +619,6 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: n.mouseclick_selector = message.selector return n - if isinstance(message, MouseClickDepricated): - n.mouseclick_id = message.id - n.mouseclick_hesitationtime = message.hesitation_time - n.mouseclick_label = message.label - n.mouseclick_selector = '' - return n - if isinstance(message, SetPageLocation): n.setpagelocation_url = message.url n.setpagelocation_referrer = message.referrer @@ -612,27 +640,15 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: n.longtasks_containername = message.container_name return n - if isinstance(message, SetNodeURLBasedAttribute): - n.setnodeurlbasedattribute_id = message.id - n.setnodeurlbasedattribute_name = message.name - n.setnodeurlbasedattribute_value = message.value - n.setnodeurlbasedattribute_baseurl = message.base_url - return n - - if isinstance(message, SetStyleData): - n.setstyledata_id = message.id - n.setstyledata_data = message.data - n.setstyledata_baseurl = message.base_url - return n - - if isinstance(message, IssueEvent): - n.issueevent_messageid = message.message_id - n.issueevent_timestamp = message.timestamp - n.issueevent_type = message.type - n.issueevent_contextstring = message.context_string - n.issueevent_context = message.context - n.issueevent_payload = message.payload - return n + #if isinstance(message, IssueEvent): + # n.issueevent_message_id = message.message_id + # n.issueevent_timestamp = message.timestamp + # n.issueevent_type = message.type + # n.issueevent_context_string = message.context_string + # n.issueevent_context = message.context + # n.issueevent_payload = message.payload + # n.issueevent_url = message.url + # return n if isinstance(message, TechnicalInfo): n.technicalinfo_type = message.type @@ -644,10 +660,6 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: n.customissue_payload = message.payload return n - if isinstance(message, PageClose): - n.pageclose = True - return n - if isinstance(message, AssetCache): n.asset_cache_url = message.url return n @@ -677,7 +689,7 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: return n if isinstance(message, IOSBatchMeta): - n.iosbatchmeta_page_no = message.page_no + n.iosbatchmeta_lenght = message.length n.iosbatchmeta_first_index = message.first_index n.iosbatchmeta_timestamp = message.timestamp return n diff --git a/ee/connectors/msgcodec/codec.py b/ee/connectors/msgcodec/codec.py index 577f02f48..33f1eaaca 100644 --- a/ee/connectors/msgcodec/codec.py +++ b/ee/connectors/msgcodec/codec.py @@ -34,6 +34,16 @@ class Codec: s += 7 i += 1 + @staticmethod + def read_size(reader: io.BytesIO): + size = 0 + for i in range(3): + b = reader.read(1) + num = int.from_bytes(b, "big", signed=False) + size += num << (8*i) + return size + + @staticmethod def read_int(reader: io.BytesIO) -> int: """ @@ -57,7 +67,11 @@ class Codec: @staticmethod def read_string(reader: io.BytesIO) -> str: length = Codec.read_uint(reader) - s = reader.read(length) + try: + s = reader.read(length) + except Exception as e: + print(f'Error while reading string of length {length}') + raise Exception(e) try: return s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") except UnicodeDecodeError: diff --git a/ee/connectors/msgcodec/messages.py b/ee/connectors/msgcodec/messages.py index 663ba8f99..7e2f28152 100644 --- a/ee/connectors/msgcodec/messages.py +++ b/ee/connectors/msgcodec/messages.py @@ -71,7 +71,7 @@ class CreateDocument(Message): __id__ = 7 def __init__(self, ): - + pass class CreateElementNode(Message): diff --git a/ee/connectors/msgcodec/msgcodec.py b/ee/connectors/msgcodec/msgcodec.py index 9aef2b475..857ed303a 100644 --- a/ee/connectors/msgcodec/msgcodec.py +++ b/ee/connectors/msgcodec/msgcodec.py @@ -7,6 +7,9 @@ import io class MessageCodec(Codec): + def __init__(self, msg_selector: List[int] = list()): + self.msg_selector = msg_selector + def read_message_id(self, reader: io.BytesIO) -> int: """ Read and return the first byte where the message id is encoded @@ -46,27 +49,41 @@ class MessageCodec(Codec): def decode_detailed(self, b: bytes) -> List[Message]: reader = io.BytesIO(b) messages_list = list() - messages_list.append(self.handler(reader, 0)) + try: + messages_list.append(self.handler(reader, 0)) + except IndexError: + print('[WARN] Broken batch') + return list() if isinstance(messages_list[0], BatchMeta): # Old BatchMeta mode = 0 elif isinstance(messages_list[0], BatchMetadata): # New BatchMeta - mode = 1 + if messages_list[0].version == 0: + mode = 0 + else: + mode = 1 else: return messages_list while True: try: - messages_list.append(self.handler(reader, mode)) + msg_decoded = self.handler(reader, mode) + if msg_decoded is not None: + messages_list.append(msg_decoded) except IndexError: break return messages_list def handler(self, reader: io.BytesIO, mode=0) -> Message: message_id = self.read_message_id(reader) + #print(f'[INFO-context] Current mode {mode}') + #print(f'[INFO] Currently processing message type {message_id}') if mode == 1: - # We skip the three bytes representing the length of message. It can be used to skip unwanted messages - reader.read(3) + # We read the three bytes representing the length of message. It can be used to skip unwanted messages + r_size = self.read_size(reader) + if message_id not in self.msg_selector: + reader.read(r_size) + return None return self.read_head_message(reader, message_id) elif mode == 0: # Old format with no bytes for message length diff --git a/ee/connectors/sql/redshift_events.sql b/ee/connectors/sql/redshift_events.sql index 773444233..7b04e3d29 100644 --- a/ee/connectors/sql/redshift_events.sql +++ b/ee/connectors/sql/redshift_events.sql @@ -1,52 +1,31 @@ CREATE TABLE IF NOT EXISTS connector_events ( sessionid BIGINT, - connectioninformation_downlink BIGINT, - connectioninformation_type VARCHAR(300), - consolelog_level VARCHAR(300), - consolelog_value VARCHAR(300), - customevent_messageid BIGINT, - customevent_name VARCHAR(300), - customevent_payload VARCHAR(300), - customevent_timestamp BIGINT, - errorevent_message VARCHAR(300), - errorevent_messageid BIGINT, - errorevent_name VARCHAR(300), - errorevent_payload VARCHAR(300), - errorevent_source VARCHAR(300), - errorevent_timestamp BIGINT, - jsexception_message VARCHAR(300), - jsexception_name VARCHAR(300), - jsexception_payload VARCHAR(300), - metadata_key VARCHAR(300), - metadata_value VARCHAR(300), - mouseclick_id BIGINT, - mouseclick_hesitationtime BIGINT, - mouseclick_label VARCHAR(300), - pageevent_firstcontentfulpaint BIGINT, - pageevent_firstpaint BIGINT, - pageevent_messageid BIGINT, - pageevent_referrer VARCHAR(300), - pageevent_speedindex BIGINT, - pageevent_timestamp BIGINT, - pageevent_url VARCHAR(300), - pagerendertiming_timetointeractive BIGINT, - pagerendertiming_visuallycomplete BIGINT, - rawcustomevent_name VARCHAR(300), - rawcustomevent_payload VARCHAR(300), - setviewportsize_height BIGINT, - setviewportsize_width BIGINT, - timestamp_timestamp BIGINT, - user_anonymous_id VARCHAR(300), - user_id VARCHAR(300), - issueevent_messageid BIGINT, + consolelog_level VARCHAR(5000), + consolelog_value VARCHAR(5000), + customevent_name VARCHAR(5000), + customevent_payload VARCHAR(5000), + jsexception_message VARCHAR(5000), + jsexception_name VARCHAR(5000), + jsexception_payload VARCHAR(5000), + jsexception_metadata VARCHAR(5000), + networkrequest_type VARCHAR(5000), + networkrequest_method VARCHAR(5000), + networkrequest_url VARCHAR(5000), + networkrequest_request VARCHAR(5000), + networkrequest_response VARCHAR(5000), + networkrequest_status BIGINT, + networkrequest_timestamp BIGINT, + networkrequest_duration BIGINT, + issueevent_message_id BIGINT, issueevent_timestamp BIGINT, - issueevent_type VARCHAR(300), - issueevent_contextstring VARCHAR(300), - issueevent_context VARCHAR(300), - issueevent_payload VARCHAR(300), - customissue_name VARCHAR(300), - customissue_payload VARCHAR(300), + issueevent_type VARCHAR(5000), + issueevent_context_string VARCHAR(5000), + issueevent_context VARCHAR(5000), + issueevent_payload VARCHAR(5000), + issueevent_url VARCHAR(5000), + customissue_name VARCHAR(5000), + customissue_payload VARCHAR(5000), received_at BIGINT, batch_order_number BIGINT ); diff --git a/ee/connectors/sql/redshift_events_detailed.sql b/ee/connectors/sql/redshift_events_detailed.sql index f50078828..1c9449dad 100644 --- a/ee/connectors/sql/redshift_events_detailed.sql +++ b/ee/connectors/sql/redshift_events_detailed.sql @@ -1,238 +1,91 @@ CREATE TABLE IF NOT EXISTS connector_events_detailed ( - sessionid BIGINT, - clickevent_hesitationtime BIGINT, - clickevent_label VARCHAR(300), - clickevent_messageid BIGINT, - clickevent_timestamp BIGINT, - connectioninformation_downlink BIGINT, - connectioninformation_type VARCHAR(300), - consolelog_level VARCHAR(300), - consolelog_value VARCHAR(300), - cpuissue_duration BIGINT, - cpuissue_rate BIGINT, - cpuissue_timestamp BIGINT, - createdocument BOOLEAN, - createelementnode_id BIGINT, - createelementnode_parentid BIGINT, - cssdeleterule_index BIGINT, - cssdeleterule_stylesheetid BIGINT, - cssinsertrule_index BIGINT, - cssinsertrule_rule VARCHAR(300), - cssinsertrule_stylesheetid BIGINT, - customevent_messageid BIGINT, - customevent_name VARCHAR(300), - customevent_payload VARCHAR(300), - customevent_timestamp BIGINT, - domdrop_timestamp BIGINT, - errorevent_message VARCHAR(300), - errorevent_messageid BIGINT, - errorevent_name VARCHAR(300), - errorevent_payload VARCHAR(300), - errorevent_source VARCHAR(300), - errorevent_timestamp BIGINT, - fetch_duration BIGINT, - fetch_method VARCHAR(300), - fetch_request VARCHAR(300), - fetch_response VARCHAR(300), - fetch_status BIGINT, - fetch_timestamp BIGINT, - fetch_url VARCHAR(300), - graphql_operationkind VARCHAR(300), - graphql_operationname VARCHAR(300), - graphql_response VARCHAR(300), - graphql_variables VARCHAR(300), - graphqlevent_messageid BIGINT, - graphqlevent_name VARCHAR(300), - graphqlevent_timestamp BIGINT, - inputevent_label VARCHAR(300), - inputevent_messageid BIGINT, - inputevent_timestamp BIGINT, - inputevent_value VARCHAR(300), - inputevent_valuemasked BOOLEAN, - jsexception_message VARCHAR(300), - jsexception_name VARCHAR(300), - jsexception_payload VARCHAR(300), - memoryissue_duration BIGINT, - memoryissue_rate BIGINT, - memoryissue_timestamp BIGINT, - metadata_key VARCHAR(300), - metadata_value VARCHAR(300), - mobx_payload VARCHAR(300), - mobx_type VARCHAR(300), - mouseclick_id BIGINT, - mouseclick_hesitationtime BIGINT, - mouseclick_label VARCHAR(300), - mousemove_x BIGINT, - mousemove_y BIGINT, - movenode_id BIGINT, - movenode_index BIGINT, - movenode_parentid BIGINT, - ngrx_action VARCHAR(300), - ngrx_duration BIGINT, - ngrx_state VARCHAR(300), - otable_key VARCHAR(300), - otable_value VARCHAR(300), - pageevent_domcontentloadedeventend BIGINT, - pageevent_domcontentloadedeventstart BIGINT, - pageevent_firstcontentfulpaint BIGINT, - pageevent_firstpaint BIGINT, - pageevent_loaded BOOLEAN, - pageevent_loadeventend BIGINT, - pageevent_loadeventstart BIGINT, - pageevent_messageid BIGINT, - pageevent_referrer VARCHAR(300), - pageevent_requeststart BIGINT, - pageevent_responseend BIGINT, - pageevent_responsestart BIGINT, - pageevent_speedindex BIGINT, - pageevent_timestamp BIGINT, - pageevent_url VARCHAR(300), - pageloadtiming_domcontentloadedeventend BIGINT, - pageloadtiming_domcontentloadedeventstart BIGINT, - pageloadtiming_firstcontentfulpaint BIGINT, - pageloadtiming_firstpaint BIGINT, - pageloadtiming_loadeventend BIGINT, - pageloadtiming_loadeventstart BIGINT, - pageloadtiming_requeststart BIGINT, - pageloadtiming_responseend BIGINT, - pageloadtiming_responsestart BIGINT, - pagerendertiming_speedindex BIGINT, - pagerendertiming_timetointeractive BIGINT, - pagerendertiming_visuallycomplete BIGINT, - performancetrack_frames BIGINT, - performancetrack_ticks BIGINT, - performancetrack_totaljsheapsize BIGINT, - performancetrack_usedjsheapsize BIGINT, - performancetrackaggr_avgcpu BIGINT, - performancetrackaggr_avgfps BIGINT, - performancetrackaggr_avgtotaljsheapsize BIGINT, - performancetrackaggr_avgusedjsheapsize BIGINT, - performancetrackaggr_maxcpu BIGINT, - performancetrackaggr_maxfps BIGINT, - performancetrackaggr_maxtotaljsheapsize BIGINT, - performancetrackaggr_maxusedjsheapsize BIGINT, - performancetrackaggr_mincpu BIGINT, - performancetrackaggr_minfps BIGINT, - performancetrackaggr_mintotaljsheapsize BIGINT, - performancetrackaggr_minusedjsheapsize BIGINT, - performancetrackaggr_timestampend BIGINT, - performancetrackaggr_timestampstart BIGINT, - profiler_args VARCHAR(300), - profiler_duration BIGINT, - profiler_name VARCHAR(300), - profiler_result VARCHAR(300), - rawcustomevent_name VARCHAR(300), - rawcustomevent_payload VARCHAR(300), - rawerrorevent_message VARCHAR(300), - rawerrorevent_name VARCHAR(300), - rawerrorevent_payload VARCHAR(300), - rawerrorevent_source VARCHAR(300), - rawerrorevent_timestamp BIGINT, - redux_action VARCHAR(300), - redux_duration BIGINT, - redux_state VARCHAR(300), - removenode_id BIGINT, - removenodeattribute_id BIGINT, - removenodeattribute_name VARCHAR(300), - resourceevent_decodedbodysize BIGINT, - resourceevent_duration BIGINT, - resourceevent_encodedbodysize BIGINT, - resourceevent_headersize BIGINT, - resourceevent_messageid BIGINT, - resourceevent_method VARCHAR(300), - resourceevent_status BIGINT, - resourceevent_success BOOLEAN, - resourceevent_timestamp BIGINT, - resourceevent_ttfb BIGINT, - resourceevent_type VARCHAR(300), - resourceevent_url VARCHAR(300), - resourcetiming_decodedbodysize BIGINT, - resourcetiming_duration BIGINT, - resourcetiming_encodedbodysize BIGINT, - resourcetiming_headersize BIGINT, - resourcetiming_initiator VARCHAR(300), - resourcetiming_timestamp BIGINT, - resourcetiming_ttfb BIGINT, - resourcetiming_url VARCHAR(300), - sessiondisconnect BOOLEAN, - sessiondisconnect_timestamp BIGINT, - sessionend BOOLEAN, - sessionend_timestamp BIGINT, - sessionstart_projectid BIGINT, - sessionstart_revid VARCHAR(300), - sessionstart_timestamp BIGINT, - sessionstart_trackerversion VARCHAR(300), - sessionstart_useragent VARCHAR(300), - sessionstart_userbrowser VARCHAR(300), - sessionstart_userbrowserversion VARCHAR(300), - sessionstart_usercountry VARCHAR(300), - sessionstart_userdevice VARCHAR(300), - sessionstart_userdeviceheapsize BIGINT, - sessionstart_userdevicememorysize BIGINT, - sessionstart_userdevicetype VARCHAR(300), - sessionstart_useros VARCHAR(300), - sessionstart_userosversion VARCHAR(300), - sessionstart_useruuid VARCHAR(300), - setcssdata_data BIGINT, - setcssdata_id BIGINT, - setinputchecked_checked BIGINT, - setinputchecked_id BIGINT, - setinputtarget_id BIGINT, - setinputtarget_label BIGINT, - setinputvalue_id BIGINT, - setinputvalue_mask BIGINT, - setinputvalue_value BIGINT, - setnodeattribute_id BIGINT, - setnodeattribute_name BIGINT, - setnodeattribute_value BIGINT, - setnodedata_data BIGINT, - setnodedata_id BIGINT, - setnodescroll_id BIGINT, - setnodescroll_x BIGINT, - setnodescroll_y BIGINT, - setpagelocation_navigationstart BIGINT, - setpagelocation_referrer VARCHAR(300), - setpagelocation_url VARCHAR(300), - setpagevisibility_hidden BOOLEAN, - setviewportscroll_x BIGINT, - setviewportscroll_y BIGINT, - setviewportsize_height BIGINT, - setviewportsize_width BIGINT, - stateaction_type VARCHAR(300), - stateactionevent_messageid BIGINT, - stateactionevent_timestamp BIGINT, - stateactionevent_type VARCHAR(300), - timestamp_timestamp BIGINT, - useranonymousid_id VARCHAR(300), - userid_id VARCHAR(300), - vuex_mutation VARCHAR(300), - vuex_state VARCHAR(300), - longtasks_timestamp BIGINT, - longtasks_duration BIGINT, - longtasks_context BIGINT, - longtasks_containertype BIGINT, - longtasks_containersrc VARCHAR(300), - longtasks_containerid VARCHAR(300), - longtasks_containername BIGINT, - setnodeurlbasedattribute_id BIGINT, - setnodeurlbasedattribute_name VARCHAR(300), - setnodeurlbasedattribute_value VARCHAR(300), - setnodeurlbasedattribute_baseurl VARCHAR(300), - setstyledata_id BIGINT, - setstyledata_data VARCHAR(300), - setstyledata_baseurl VARCHAR(300), - issueevent_messageid BIGINT, - issueevent_timestamp BIGINT, - issueevent_type VARCHAR(300), - issueevent_contextstring VARCHAR(300), - issueevent_context VARCHAR(300), - issueevent_payload VARCHAR(300), - technicalinfo_type VARCHAR(300), - technicalinfo_value VARCHAR(300), - customissue_name VARCHAR(300), - customissue_payload VARCHAR(300), - pageclose BIGINT, - received_at BIGINT, - batch_order_number BIGINT +sessionid BIGINT, +clickevent_hesitationtime BIGINT, +clickevent_label VARCHAR(5000), +clickevent_messageid BIGINT, +clickevent_timestamp BIGINT, +connectioninformation_downlink BIGINT, +connectioninformation_type VARCHAR(5000), +consolelog_level VARCHAR(5000), +consolelog_value VARCHAR(5000), +customevent_name VARCHAR(5000), +customevent_payload VARCHAR(5000), +fetch_duration BIGINT, +fetch_method VARCHAR(5000), +fetch_request VARCHAR(5000), +fetch_response VARCHAR(5000), +fetch_status BIGINT, +fetch_timestamp BIGINT, +fetch_url VARCHAR(5000), +graphql_operationkind VARCHAR(5000), +graphql_operationname VARCHAR(5000), +graphql_response VARCHAR(5000), +graphql_variables VARCHAR(5000), +inputevent_label VARCHAR(5000), +inputevent_messageid BIGINT, +inputevent_timestamp BIGINT, +inputevent_value VARCHAR(5000), +inputevent_valuemasked BOOLEAN, +jsexception_message VARCHAR(5000), +jsexception_name VARCHAR(5000), +jsexception_payload VARCHAR(5000), +jsexception_metadata VARCHAR(5000), +mouseclick_id BIGINT, +mouseclick_hesitationtime BIGINT, +mouseclick_label VARCHAR(5000), +networkrequest_type VARCHAR(5000), +networkrequest_method VARCHAR(5000), +networkrequest_url VARCHAR(5000), +networkrequest_request VARCHAR(5000), +networkrequest_response VARCHAR(5000), +networkrequest_status BIGINT, +networkrequest_timestamp BIGINT, +networkrequest_duration BIGINT, +pageevent_domcontentloadedeventend BIGINT, +pageevent_domcontentloadedeventstart BIGINT, +pageevent_firstcontentfulpaint BIGINT, +pageevent_firstpaint BIGINT, +pageevent_loaded BOOLEAN, +pageevent_loadeventend BIGINT, +pageevent_loadeventstart BIGINT, +pageevent_messageid BIGINT, +pageevent_referrer VARCHAR(5000), +pageevent_requeststart BIGINT, +pageevent_responseend BIGINT, +pageevent_responsestart BIGINT, +pageevent_speedindex BIGINT, +pageevent_timestamp BIGINT, +pageevent_url VARCHAR(5000), +sessionend_timestamp BIGINT, +sessionend_encryption_key VARCHAR(5000), +sessionstart_projectid BIGINT, +sessionstart_revid VARCHAR(5000), +sessionstart_timestamp BIGINT, +sessionstart_trackerversion VARCHAR(5000), +sessionstart_useragent VARCHAR(5000), +sessionstart_userbrowser VARCHAR(5000), +sessionstart_userbrowserversion VARCHAR(5000), +sessionstart_usercountry VARCHAR(5000), +sessionstart_userdevice VARCHAR(5000), +sessionstart_userdeviceheapsize BIGINT, +sessionstart_userdevicememorysize BIGINT, +sessionstart_userdevicetype VARCHAR(5000), +sessionstart_useros VARCHAR(5000), +sessionstart_userosversion VARCHAR(5000), +sessionstart_useruuid VARCHAR(5000), +setpagelocation_navigationstart BIGINT, +setpagelocation_referrer VARCHAR(5000), +setpagelocation_url VARCHAR(5000), +issueevent_message_id BIGINT, +issueevent_timestamp BIGINT, +issueevent_type VARCHAR(5000), +issueevent_context_string VARCHAR(5000), +issueevent_context VARCHAR(5000), +issueevent_payload VARCHAR(5000), +issueevent_url VARCHAR(5000), +customissue_name VARCHAR(5000), +customissue_payload VARCHAR(5000), +received_at BIGINT, +batch_order_number BIGINT ); diff --git a/ee/connectors/sql/redshift_sessions.sql b/ee/connectors/sql/redshift_sessions.sql index ba1887ac1..d7ad8a603 100644 --- a/ee/connectors/sql/redshift_sessions.sql +++ b/ee/connectors/sql/redshift_sessions.sql @@ -38,13 +38,8 @@ CREATE TABLE IF NOT EXISTS connector_user_sessions max_used_js_heap_size bigint, -- ISSUES AND EVENTS js_exceptions_count bigint, - long_tasks_total_duration bigint, - long_tasks_max_duration bigint, - long_tasks_count bigint, inputs_count bigint, clicks_count bigint, issues_count bigint, - issues VARCHAR, - urls_count bigint, - urls VARCHAR + urls_count bigint ); diff --git a/ee/connectors/utils/cache.py b/ee/connectors/utils/cache.py new file mode 100644 index 000000000..6af36ac21 --- /dev/null +++ b/ee/connectors/utils/cache.py @@ -0,0 +1,103 @@ +from utils.pg_client import PostgresClient +from queue import Queue +from decouple import config +from time import time + + +def _project_from_session(sessionId): + """Search projectId of requested sessionId in PG table sessions""" + with PostgresClient() as conn: + conn.execute( + conn.mogrify("SELECT project_id FROM sessions WHERE session_id=%(sessionId)s LIMIT 1", + {'sessionId': sessionId}) + ) + res = conn.fetchone() + if res is None: + print(f'[WARN] sessionid {sessionId} not found in sessions table') + return None + return res['project_id'] + + +class CachedSessions: + + def __init__(self): + """cached_sessions of open and recently closed sessions with its current status. + env: + MAX_SESSION_LIFE: cache lifespan of session (default 7200 seconds)""" + self.session_project = dict() + self.max_alive_time = config('MAX_SESSION_LIFE', default=7200, cast=int) # Default 2 hours + + def create(self, sessionid): + """Saves a new session with status OPEN and set its insertion time""" + self.session_project[sessionid] = (time(), 'OPEN') + + def add(self, sessionid): + """Handle the creation of a cached session or update its status if already in cache""" + if sessionid in self.session_project.keys(): + if self.session_project[sessionid][1] == 'CLOSE': + tmp = self.session_project[sessionid] + self.session_project[sessionid] = (tmp[0], 'UPDATE') + else: + self.create(sessionid) + + def close(self, sessionid): + """Sets status of session to closed session (received sessionend message)""" + tmp = self.session_project[sessionid] + old_status = tmp[1] + self.session_project[sessionid] = (tmp[0], 'CLOSE') + return old_status + + def clear_sessions(self): + """Delete all sessions that reached max_alive_time""" + to_clean_list = list() + current_time = time() + for sessionid, values in self.session_project.items(): + if current_time - values[0] > self.max_alive_time: + to_clean_list.append(sessionid) + del self.session_project[sessionid] + return to_clean_list + + +class ProjectFilter: + + def __init__(self, filter=list()): + """Filters all sessions that comes from selected projects. This class reads from PG to find projectId and uses cache to avoid duplicated requests. + env: + max_cache_size: max allowed cache lenght - starts cleanup when oversize + cache_lifespan: max lifetime of cached - if surpased it is deleted in cleanup phase""" + self.filter = filter + self.cache = dict() + self.cached_sessions = CachedSessions() + self.to_clean = list() + self.count_bad = 0 + self.max_cache_size = config('max_cache_size', default=50, cast=int) + self.cache_lifespan = config('cache_lifespan', default=900, cast=int) + + def is_valid(self, sessionId): + """Verify if sessionId is from selected project""" + if len(self.filter)==0: + return True + elif sessionId in self.cache.keys(): + return self.cache[sessionId][1] + else: + found_project_id = _project_from_session(sessionId) + if found_project_id is None: + self.count_bad += 1 + return False + else: + project_is_valid = found_project_id in self.filter + self.cache[sessionId] = [time(), project_is_valid] + return project_is_valid + + def cleanup(self): + """Deletes cache when reached cache_lifespan value""" + current_time = time() + self.cache = {sessionid: values for sessionid, values in self.cache.items() if current_time - values[0] < self.cache_lifespan} + + def handle_clean(self): + """Verifies and execute cleanup if needed""" + if len(self.filter)==0: + return + elif len(self.cache) > self.max_cache_size: + self.cleanup() + diff --git a/ee/connectors/utils/pg_client.py b/ee/connectors/utils/pg_client.py new file mode 100644 index 000000000..64ca1719f --- /dev/null +++ b/ee/connectors/utils/pg_client.py @@ -0,0 +1,182 @@ +import logging +import time +from threading import Semaphore + +import psycopg2 +import psycopg2.extras +from decouple import config +from psycopg2 import pool + +logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO)) +logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO)) + +_PG_CONFIG = {"host": config("pg_host"), + "database": config("pg_dbname"), + "user": config("pg_user"), + "password": config("pg_password"), + "port": config("pg_port", cast=int), + "application_name": config("APP_NAME", default="PY")} +PG_CONFIG = dict(_PG_CONFIG) +if config("PG_TIMEOUT", cast=int, default=0) > 0: + PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}" + + +class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool): + def __init__(self, minconn, maxconn, *args, **kwargs): + self._semaphore = Semaphore(maxconn) + super().__init__(minconn, maxconn, *args, **kwargs) + + def getconn(self, *args, **kwargs): + self._semaphore.acquire() + try: + return super().getconn(*args, **kwargs) + except psycopg2.pool.PoolError as e: + if str(e) == "connection pool is closed": + make_pool() + raise e + + def putconn(self, *args, **kwargs): + try: + super().putconn(*args, **kwargs) + self._semaphore.release() + except psycopg2.pool.PoolError as e: + if str(e) == "trying to put unkeyed connection": + print("!!! trying to put unkeyed connection") + print(f"env-PG_POOL:{config('PG_POOL', default=None)}") + return + raise e + + +postgreSQL_pool: ORThreadedConnectionPool = None + +RETRY_MAX = config("PG_RETRY_MAX", cast=int, default=50) +RETRY_INTERVAL = config("PG_RETRY_INTERVAL", cast=int, default=2) +RETRY = 0 + + +def make_pool(): + if not config('PG_POOL', cast=bool, default=True): + return + global postgreSQL_pool + global RETRY + if postgreSQL_pool is not None: + try: + postgreSQL_pool.closeall() + except (Exception, psycopg2.DatabaseError) as error: + logging.error("Error while closing all connexions to PostgreSQL", error) + try: + postgreSQL_pool = ORThreadedConnectionPool(config("PG_MINCONN", cast=int, default=20), + config("PG_MAXCONN", cast=int, default=80), + **PG_CONFIG) + if (postgreSQL_pool): + logging.info("Connection pool created successfully") + except (Exception, psycopg2.DatabaseError) as error: + logging.error("Error while connecting to PostgreSQL", error) + if RETRY < RETRY_MAX: + RETRY += 1 + logging.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}") + time.sleep(RETRY_INTERVAL) + make_pool() + else: + raise error + + +class PostgresClient: + connection = None + cursor = None + long_query = False + unlimited_query = False + + def __init__(self, long_query=False, unlimited_query=False, use_pool=True): + self.long_query = long_query + self.unlimited_query = unlimited_query + self.use_pool = use_pool + if unlimited_query: + long_config = dict(_PG_CONFIG) + long_config["application_name"] += "-UNLIMITED" + self.connection = psycopg2.connect(**long_config) + elif long_query: + long_config = dict(_PG_CONFIG) + long_config["application_name"] += "-LONG" + long_config["options"] = f"-c statement_timeout=" \ + f"{config('pg_long_timeout', cast=int, default=5 * 60) * 1000}" + self.connection = psycopg2.connect(**long_config) + elif not use_pool or not config('PG_POOL', cast=bool, default=True): + single_config = dict(_PG_CONFIG) + single_config["application_name"] += "-NOPOOL" + single_config["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int, default=30) * 1000}" + self.connection = psycopg2.connect(**single_config) + else: + self.connection = postgreSQL_pool.getconn() + + def __enter__(self): + if self.cursor is None: + self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + self.cursor.cursor_execute = self.cursor.execute + self.cursor.execute = self.__execute + self.cursor.recreate = self.recreate_cursor + return self.cursor + + def __exit__(self, *args): + try: + self.connection.commit() + self.cursor.close() + if not self.use_pool or self.long_query or self.unlimited_query: + self.connection.close() + except Exception as error: + logging.error("Error while committing/closing PG-connection", error) + if str(error) == "connection already closed" \ + and self.use_pool \ + and not self.long_query \ + and not self.unlimited_query \ + and config('PG_POOL', cast=bool, default=True): + logging.info("Recreating the connexion pool") + make_pool() + else: + raise error + finally: + if config('PG_POOL', cast=bool, default=True) \ + and self.use_pool \ + and not self.long_query \ + and not self.unlimited_query: + postgreSQL_pool.putconn(self.connection) + + def __execute(self, query, vars=None): + try: + result = self.cursor.cursor_execute(query=query, vars=vars) + except psycopg2.Error as error: + logging.error(f"!!! Error of type:{type(error)} while executing query:") + logging.error(query) + logging.info("starting rollback to allow future execution") + self.connection.rollback() + raise error + return result + + def recreate_cursor(self, rollback=False): + if rollback: + try: + self.connection.rollback() + except Exception as error: + logging.error("Error while rollbacking connection for recreation", error) + try: + self.cursor.close() + except Exception as error: + logging.error("Error while closing cursor for recreation", error) + self.cursor = None + return self.__enter__() + + +async def init(): + logging.info(f">PG_POOL:{config('PG_POOL', default=None)}") + if config('PG_POOL', cast=bool, default=True): + make_pool() + + +async def terminate(): + global postgreSQL_pool + if postgreSQL_pool is not None: + try: + postgreSQL_pool.closeall() + logging.info("Closed all connexions to PostgreSQL") + except (Exception, psycopg2.DatabaseError) as error: + logging.error("Error while closing all connexions to PostgreSQL", error)