diff --git a/api/app.py b/api/app.py index d7e5215a5..491a2f533 100644 --- a/api/app.py +++ b/api/app.py @@ -16,7 +16,7 @@ from chalicelib.utils import helper from chalicelib.utils import pg_client, ch_client from crons import core_crons, core_dynamic_crons from routers import core, core_dynamic -from routers.subs import insights, metrics, v1_api, health, usability_tests, spot, product_anaytics +from routers.subs import insights, metrics, v1_api, health, usability_tests, spot, product_analytics loglevel = config("LOGLEVEL", default=logging.WARNING) print(f">Loglevel set to: {loglevel}") @@ -129,6 +129,6 @@ app.include_router(spot.public_app) app.include_router(spot.app) app.include_router(spot.app_apikey) -app.include_router(product_anaytics.public_app) -app.include_router(product_anaytics.app) -app.include_router(product_anaytics.app_apikey) +app.include_router(product_analytics.public_app, prefix="/pa") +app.include_router(product_analytics.app, prefix="/pa") +app.include_router(product_analytics.app_apikey, prefix="/pa") diff --git a/api/chalicelib/core/metrics/product_anaytics2.py b/api/chalicelib/core/metrics/product_anaytics2.py deleted file mode 100644 index 9e32e088d..000000000 --- a/api/chalicelib/core/metrics/product_anaytics2.py +++ /dev/null @@ -1,14 +0,0 @@ -from chalicelib.utils.ch_client import ClickHouseClient - - -def search_events(project_id: int, data: dict): - with ClickHouseClient() as ch_client: - r = ch_client.format( - """SELECT * - FROM taha.events - WHERE project_id=%(project_id)s - ORDER BY created_at;""", - params={"project_id": project_id}) - x = ch_client.execute(r) - - return x diff --git a/api/chalicelib/core/product_analytics/__init__.py b/api/chalicelib/core/product_analytics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/chalicelib/core/product_analytics/events.py b/api/chalicelib/core/product_analytics/events.py new file mode 100644 index 000000000..6e1e2e2cc --- /dev/null +++ b/api/chalicelib/core/product_analytics/events.py @@ -0,0 +1,28 @@ +from chalicelib.utils import helper +from chalicelib.utils.ch_client import ClickHouseClient + + +def get_events(project_id: int): + with ClickHouseClient() as ch_client: + r = ch_client.format( + """SELECT event_name, display_name + FROM product_analytics.all_events + WHERE project_id=%(project_id)s + ORDER BY display_name;""", + parameters={"project_id": project_id}) + x = ch_client.execute(r) + + return helper.list_to_camel_case(x) + + +def search_events(project_id: int, data: dict): + with ClickHouseClient() as ch_client: + r = ch_client.format( + """SELECT * + FROM product_analytics.events + WHERE project_id=%(project_id)s + ORDER BY created_at;""", + parameters={"project_id": project_id}) + x = ch_client.execute(r) + + return helper.list_to_camel_case(x) diff --git a/api/chalicelib/core/product_analytics/properties.py b/api/chalicelib/core/product_analytics/properties.py new file mode 100644 index 000000000..cb9d082ec --- /dev/null +++ b/api/chalicelib/core/product_analytics/properties.py @@ -0,0 +1,19 @@ +from chalicelib.utils import helper +from chalicelib.utils.ch_client import ClickHouseClient + + +def get_properties(project_id: int, event_name): + with ClickHouseClient() as ch_client: + r = ch_client.format( + """SELECT all_properties.property_name, + all_properties.display_name + FROM product_analytics.event_properties + INNER JOIN product_analytics.all_properties USING (property_name) + WHERE event_properties.project_id=%(project_id)s + AND all_properties.project_id=%(project_id)s + AND event_properties.event_name=%(event_name)s + ORDER BY created_at;""", + parameters={"project_id": project_id,"event_name": event_name}) + properties = ch_client.execute(r) + + return helper.list_to_camel_case(properties) diff --git a/api/chalicelib/core/sessions/sessions_ch.py b/api/chalicelib/core/sessions/sessions_ch.py index 04503edfe..c060e06cd 100644 --- a/api/chalicelib/core/sessions/sessions_ch.py +++ b/api/chalicelib/core/sessions/sessions_ch.py @@ -671,24 +671,36 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu events_conditions.append({"type": event_where[-1]}) if not is_any: if schemas.ClickEventExtraOperator.has_value(event.operator): - event_where.append(json_condition( - "main", - "$properties", - "selector", op, event.value, e_k) + # event_where.append(json_condition( + # "main", + # "$properties", + # "selector", op, event.value, e_k) + # ) + event_where.append( + sh.multi_conditions(f"main.`$properties`.selector {op} %({e_k})s", + event.value, value_key=e_k) ) events_conditions[-1]["condition"] = event_where[-1] else: if is_not: - event_where.append(json_condition( - "sub", "$properties", _column, op, event.value, e_k - )) + # event_where.append(json_condition( + # "sub", "$properties", _column, op, event.value, e_k + # )) + event_where.append( + sh.multi_conditions(f"sub.`$properties`.{_column} {op} %({e_k})s", + event.value, value_key=e_k) + ) events_conditions_not.append( { "type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"}) events_conditions_not[-1]["condition"] = event_where[-1] else: + # event_where.append( + # json_condition("main", "$properties", _column, op, event.value, e_k) + # ) event_where.append( - json_condition("main", "$properties", _column, op, event.value, e_k) + sh.multi_conditions(f"main.`$properties`.{_column} {op} %({e_k})s", + event.value, value_key=e_k) ) events_conditions[-1]["condition"] = event_where[-1] else: @@ -870,12 +882,15 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu events_conditions[-1]["condition"] = [] if not is_any and event.value not in [None, "*", ""]: event_where.append( - sh.multi_conditions(f"(toString(main1.`$properties`.message) {op} %({e_k})s OR toString(main1.`$properties`.name) {op} %({e_k})s)", - event.value, value_key=e_k)) + sh.multi_conditions( + f"(toString(main1.`$properties`.message) {op} %({e_k})s OR toString(main1.`$properties`.name) {op} %({e_k})s)", + event.value, value_key=e_k)) events_conditions[-1]["condition"].append(event_where[-1]) events_extra_join += f" AND {event_where[-1]}" if len(event.source) > 0 and event.source[0] not in [None, "*", ""]: - event_where.append(sh.multi_conditions(f"toString(main1.`$properties`.source) = %({s_k})s", event.source, value_key=s_k)) + event_where.append( + sh.multi_conditions(f"toString(main1.`$properties`.source) = %({s_k})s", event.source, + value_key=s_k)) events_conditions[-1]["condition"].append(event_where[-1]) events_extra_join += f" AND {event_where[-1]}" @@ -1193,6 +1208,28 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu events_conditions[-1]["condition"] = " AND ".join(events_conditions[-1]["condition"]) else: continue + if event.properties is not None and len(event.properties.filters) > 0: + event_fiters = [] + for l, property in enumerate(event.properties.filters): + a_k = f"{e_k}_att_{l}" + full_args = {**full_args, + **sh.multi_values(property.value, value_key=a_k)} + op = sh.get_sql_operator(property.operator) + condition = f"main.properties.{property.name} {op} %({a_k})s" + if property.is_predefined: + condition = f"main.{property.name} {op} %({a_k})s" + event_where.append( + sh.multi_conditions(condition, property.value, value_key=a_k) + ) + event_fiters.append(event_where[-1]) + if len(event_fiters) > 0: + events_conditions[-1]["condition"] += " AND (" + for l, e_f in enumerate(event_fiters): + if l > 0: + events_conditions[-1]["condition"] += event.properties.operators[l - 1] + e_f + else: + events_conditions[-1]["condition"] += e_f + events_conditions[-1]["condition"] += ")" if event_index == 0 or or_events: event_where += ss_constraints if is_not: diff --git a/api/chalicelib/core/sessions/sessions_search_ch.py b/api/chalicelib/core/sessions/sessions_search_ch.py index 234d0fd26..fa7994963 100644 --- a/api/chalicelib/core/sessions/sessions_search_ch.py +++ b/api/chalicelib/core/sessions/sessions_search_ch.py @@ -175,11 +175,11 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project: schemas. ORDER BY sort_key {data.order} LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""", parameters=full_args) - logging.debug("--------------------") - logging.debug(main_query) - logging.debug("--------------------") + try: + logging.debug("--------------------") sessions_list = cur.execute(main_query) + logging.debug("--------------------") except Exception as err: logging.warning("--------- SESSIONS-CH SEARCH QUERY EXCEPTION -----------") logging.warning(main_query) diff --git a/api/chalicelib/utils/__init__.py b/api/chalicelib/utils/__init__.py index 54e0b4c65..df64e4775 100644 --- a/api/chalicelib/utils/__init__.py +++ b/api/chalicelib/utils/__init__.py @@ -11,9 +11,3 @@ if smtp.has_smtp(): logger.info("valid SMTP configuration found") else: logger.info("no SMTP configuration found or SMTP validation failed") - -if config("EXP_CH_DRIVER", cast=bool, default=True): - logging.info(">>> Using new CH driver") - from . import ch_client_exp as ch_client -else: - from . import ch_client diff --git a/api/chalicelib/utils/ch_client.py b/api/chalicelib/utils/ch_client.py index 5fbaa5752..bfc62b919 100644 --- a/api/chalicelib/utils/ch_client.py +++ b/api/chalicelib/utils/ch_client.py @@ -1,73 +1,185 @@ import logging +import threading +import time +from functools import wraps +from queue import Queue, Empty -import clickhouse_driver +import clickhouse_connect +from clickhouse_connect.driver.query import QueryContext from decouple import config logger = logging.getLogger(__name__) +_CH_CONFIG = {"host": config("ch_host"), + "user": config("ch_user", default="default"), + "password": config("ch_password", default=""), + "port": config("ch_port_http", cast=int), + "client_name": config("APP_NAME", default="PY")} +CH_CONFIG = dict(_CH_CONFIG) + settings = {} if config('ch_timeout', cast=int, default=-1) > 0: - logger.info(f"CH-max_execution_time set to {config('ch_timeout')}s") + logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s") settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)} if config('ch_receive_timeout', cast=int, default=-1) > 0: - logger.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s") + logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s") settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)} +extra_args = {} +if config("CH_COMPRESSION", cast=bool, default=True): + extra_args["compression"] = "lz4" + + +def transform_result(self, original_function): + @wraps(original_function) + def wrapper(*args, **kwargs): + if kwargs.get("parameters"): + if config("LOCAL_DEV", cast=bool, default=False): + logger.debug(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters"))) + else: + logger.debug( + str.encode(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters")))) + elif len(args) > 0: + if config("LOCAL_DEV", cast=bool, default=False): + logger.debug(args[0]) + else: + logger.debug(str.encode(args[0])) + result = original_function(*args, **kwargs) + if isinstance(result, clickhouse_connect.driver.query.QueryResult): + column_names = result.column_names + result = result.result_rows + result = [dict(zip(column_names, row)) for row in result] + + return result + + return wrapper + + +class ClickHouseConnectionPool: + def __init__(self, min_size, max_size): + self.min_size = min_size + self.max_size = max_size + self.pool = Queue() + self.lock = threading.Lock() + self.total_connections = 0 + + # Initialize the pool with min_size connections + for _ in range(self.min_size): + client = clickhouse_connect.get_client(**CH_CONFIG, + database=config("ch_database", default="default"), + settings=settings, + **extra_args) + self.pool.put(client) + self.total_connections += 1 + + def get_connection(self): + try: + # Try to get a connection without blocking + client = self.pool.get_nowait() + return client + except Empty: + with self.lock: + if self.total_connections < self.max_size: + client = clickhouse_connect.get_client(**CH_CONFIG, + database=config("ch_database", default="default"), + settings=settings, + **extra_args) + self.total_connections += 1 + return client + # If max_size reached, wait until a connection is available + client = self.pool.get() + return client + + def release_connection(self, client): + self.pool.put(client) + + def close_all(self): + with self.lock: + while not self.pool.empty(): + client = self.pool.get() + client.close() + self.total_connections = 0 + + +CH_pool: ClickHouseConnectionPool = None + +RETRY_MAX = config("CH_RETRY_MAX", cast=int, default=50) +RETRY_INTERVAL = config("CH_RETRY_INTERVAL", cast=int, default=2) +RETRY = 0 + + +def make_pool(): + if not config('CH_POOL', cast=bool, default=True): + return + global CH_pool + global RETRY + if CH_pool is not None: + try: + CH_pool.close_all() + except Exception as error: + logger.error("Error while closing all connexions to CH", exc_info=error) + try: + CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4), + max_size=config("CH_MAXCONN", cast=int, default=8)) + if CH_pool is not None: + logger.info("Connection pool created successfully for CH") + except ConnectionError as error: + logger.error("Error while connecting to CH", exc_info=error) + if RETRY < RETRY_MAX: + RETRY += 1 + logger.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}") + time.sleep(RETRY_INTERVAL) + make_pool() + else: + raise error + class ClickHouseClient: __client = None def __init__(self, database=None): - extra_args = {} - if config("CH_COMPRESSION", cast=bool, default=True): - extra_args["compression"] = "lz4" - self.__client = clickhouse_driver.Client(host=config("ch_host"), - database=database if database else config("ch_database", - default="default"), - user=config("ch_user", default="default"), - password=config("ch_password", default=""), - port=config("ch_port", cast=int), - settings=settings, - **extra_args) \ - if self.__client is None else self.__client + if self.__client is None: + if database is not None or not config('CH_POOL', cast=bool, default=True): + self.__client = clickhouse_connect.get_client(**CH_CONFIG, + database=database if database else config("ch_database", + default="default"), + settings=settings, + **extra_args) + + else: + self.__client = CH_pool.get_connection() + + self.__client.execute = transform_result(self, self.__client.query) + self.__client.format = self.format def __enter__(self): - return self - - def execute(self, query, parameters=None, **args): - try: - results = self.__client.execute(query=query, params=parameters, with_column_types=True, **args) - keys = tuple(x for x, y in results[1]) - return [dict(zip(keys, i)) for i in results[0]] - except Exception as err: - logger.error("--------- CH EXCEPTION -----------", exc_info=err) - logger.error("--------- CH QUERY EXCEPTION -----------") - logger.error(self.format(query=query, parameters=parameters) - .replace('\n', '\\n') - .replace(' ', ' ') - .replace(' ', ' ')) - logger.error("--------------------") - raise err - - def insert(self, query, params=None, **args): - return self.__client.execute(query=query, params=params, **args) - - def client(self): return self.__client - def format(self, query, parameters): - if parameters is None: - return query - return self.__client.substitute_params(query, parameters, self.__client.connection.context) + def format(self, query, parameters=None): + if parameters: + ctx = QueryContext(query=query, parameters=parameters) + return ctx.final_query + return query def __exit__(self, *args): - pass + if config('CH_POOL', cast=bool, default=True): + CH_pool.release_connection(self.__client) + else: + self.__client.close() async def init(): - logger.info(f">CH_POOL:not defined") + logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}") + if config('CH_POOL', cast=bool, default=True): + make_pool() async def terminate(): - pass + global CH_pool + if CH_pool is not None: + try: + CH_pool.close_all() + logger.info("Closed all connexions to CH") + except Exception as error: + logger.error("Error while closing all connexions to CH", exc_info=error) diff --git a/api/chalicelib/utils/ch_client_exp.py b/api/chalicelib/utils/ch_client_exp.py deleted file mode 100644 index afed93457..000000000 --- a/api/chalicelib/utils/ch_client_exp.py +++ /dev/null @@ -1,178 +0,0 @@ -import logging -import threading -import time -from functools import wraps -from queue import Queue, Empty - -import clickhouse_connect -from clickhouse_connect.driver.query import QueryContext -from decouple import config - -logger = logging.getLogger(__name__) - -_CH_CONFIG = {"host": config("ch_host"), - "user": config("ch_user", default="default"), - "password": config("ch_password", default=""), - "port": config("ch_port_http", cast=int), - "client_name": config("APP_NAME", default="PY")} -CH_CONFIG = dict(_CH_CONFIG) - -settings = {} -if config('ch_timeout', cast=int, default=-1) > 0: - logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s") - settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)} - -if config('ch_receive_timeout', cast=int, default=-1) > 0: - logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s") - settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)} - -extra_args = {} -if config("CH_COMPRESSION", cast=bool, default=True): - extra_args["compression"] = "lz4" - - -def transform_result(self, original_function): - @wraps(original_function) - def wrapper(*args, **kwargs): - if kwargs.get("parameters"): - logger.debug(str.encode(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters")))) - elif len(args) > 0: - logger.debug(str.encode(args[0])) - result = original_function(*args, **kwargs) - if isinstance(result, clickhouse_connect.driver.query.QueryResult): - column_names = result.column_names - result = result.result_rows - result = [dict(zip(column_names, row)) for row in result] - - return result - - return wrapper - - -class ClickHouseConnectionPool: - def __init__(self, min_size, max_size): - self.min_size = min_size - self.max_size = max_size - self.pool = Queue() - self.lock = threading.Lock() - self.total_connections = 0 - - # Initialize the pool with min_size connections - for _ in range(self.min_size): - client = clickhouse_connect.get_client(**CH_CONFIG, - database=config("ch_database", default="default"), - settings=settings, - **extra_args) - self.pool.put(client) - self.total_connections += 1 - - def get_connection(self): - try: - # Try to get a connection without blocking - client = self.pool.get_nowait() - return client - except Empty: - with self.lock: - if self.total_connections < self.max_size: - client = clickhouse_connect.get_client(**CH_CONFIG, - database=config("ch_database", default="default"), - settings=settings, - **extra_args) - self.total_connections += 1 - return client - # If max_size reached, wait until a connection is available - client = self.pool.get() - return client - - def release_connection(self, client): - self.pool.put(client) - - def close_all(self): - with self.lock: - while not self.pool.empty(): - client = self.pool.get() - client.close() - self.total_connections = 0 - - -CH_pool: ClickHouseConnectionPool = None - -RETRY_MAX = config("CH_RETRY_MAX", cast=int, default=50) -RETRY_INTERVAL = config("CH_RETRY_INTERVAL", cast=int, default=2) -RETRY = 0 - - -def make_pool(): - if not config('CH_POOL', cast=bool, default=True): - return - global CH_pool - global RETRY - if CH_pool is not None: - try: - CH_pool.close_all() - except Exception as error: - logger.error("Error while closing all connexions to CH", exc_info=error) - try: - CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4), - max_size=config("CH_MAXCONN", cast=int, default=8)) - if CH_pool is not None: - logger.info("Connection pool created successfully for CH") - except ConnectionError as error: - logger.error("Error while connecting to CH", exc_info=error) - if RETRY < RETRY_MAX: - RETRY += 1 - logger.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}") - time.sleep(RETRY_INTERVAL) - make_pool() - else: - raise error - - -class ClickHouseClient: - __client = None - - def __init__(self, database=None): - if self.__client is None: - if database is not None or not config('CH_POOL', cast=bool, default=True): - self.__client = clickhouse_connect.get_client(**CH_CONFIG, - database=database if database else config("ch_database", - default="default"), - settings=settings, - **extra_args) - - else: - self.__client = CH_pool.get_connection() - - self.__client.execute = transform_result(self, self.__client.query) - self.__client.format = self.format - - def __enter__(self): - return self.__client - - def format(self, query, parameters=None): - if parameters: - ctx = QueryContext(query=query, parameters=parameters) - return ctx.final_query - return query - - def __exit__(self, *args): - if config('CH_POOL', cast=bool, default=True): - CH_pool.release_connection(self.__client) - else: - self.__client.close() - - -async def init(): - logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}") - if config('CH_POOL', cast=bool, default=True): - make_pool() - - -async def terminate(): - global CH_pool - if CH_pool is not None: - try: - CH_pool.close_all() - logger.info("Closed all connexions to CH") - except Exception as error: - logger.error("Error while closing all connexions to CH", exc_info=error) diff --git a/api/routers/subs/product_analytics.py b/api/routers/subs/product_analytics.py new file mode 100644 index 000000000..5bfe42de7 --- /dev/null +++ b/api/routers/subs/product_analytics.py @@ -0,0 +1,28 @@ +import schemas +from chalicelib.core.product_analytics import events, properties +from fastapi import Depends +from or_dependencies import OR_context +from routers.base import get_routers + +public_app, app, app_apikey = get_routers() + + +@app.get('/{projectId}/properties/search', tags=["product_analytics"]) +def get_event_properties(projectId: int, event_name: str = None, + context: schemas.CurrentContext = Depends(OR_context)): + if not event_name or len(event_name) == 0: + return {"data": []} + return {"data": properties.get_properties(project_id=projectId, event_name=event_name)} + + +@app.get('/{projectId}/events/names', tags=["dashboard"]) +def get_all_events(projectId: int, + context: schemas.CurrentContext = Depends(OR_context)): + return {"data": events.get_events(project_id=projectId)} + + +@app.post('/{projectId}/events/search', tags=["dashboard"]) +def search_events(projectId: int, + # data: schemas.CreateDashboardSchema = Body(...), + context: schemas.CurrentContext = Depends(OR_context)): + return {"data": events.search_events(project_id=projectId, data={})} diff --git a/api/routers/subs/product_anaytics.py b/api/routers/subs/product_anaytics.py deleted file mode 100644 index 95851c253..000000000 --- a/api/routers/subs/product_anaytics.py +++ /dev/null @@ -1,15 +0,0 @@ -import schemas -from chalicelib.core.metrics import product_anaytics2 -from fastapi import Depends -from or_dependencies import OR_context -from routers.base import get_routers - - -public_app, app, app_apikey = get_routers() - - -@app.post('/{projectId}/events/search', tags=["dashboard"]) -def search_events(projectId: int, - # data: schemas.CreateDashboardSchema = Body(...), - context: schemas.CurrentContext = Depends(OR_context)): - return product_anaytics2.search_events(project_id=projectId, data={}) diff --git a/api/schemas/schemas.py b/api/schemas/schemas.py index 067a00c74..bdc7bcb52 100644 --- a/api/schemas/schemas.py +++ b/api/schemas/schemas.py @@ -545,6 +545,70 @@ class RequestGraphqlFilterSchema(BaseModel): return values +class EventPredefinedPropertyType(str, Enum): + TIME = "$time" + SOURCE = "$source" + DURATION_S = "$duration_s" + DESCRIPTION = "description" + AUTO_CAPTURED = "$auto_captured" + SDK_EDITION = "$sdk_edition" + SDK_VERSION = "$sdk_version" + DEVICE_ID = "$device_id" + OS = "$os" + OS_VERSION = "$os_version" + BROWSER = "$browser" + BROWSER_VERSION = "$browser_version" + DEVICE = "$device" + SCREEN_HEIGHT = "$screen_height" + SCREEN_WIDTH = "$screen_width" + CURRENT_URL = "$current_url" + INITIAL_REFERRER = "$initial_referrer" + REFERRING_DOMAIN = "$referring_domain" + REFERRER = "$referrer" + INITIAL_REFERRING_DOMAIN = "$initial_referring_domain" + SEARCH_ENGINE = "$search_engine" + SEARCH_ENGINE_KEYWORD = "$search_engine_keyword" + UTM_SOURCE = "utm_source" + UTM_MEDIUM = "utm_medium" + UTM_CAMPAIGN = "utm_campaign" + COUNTRY = "$country" + STATE = "$state" + CITY = "$city" + ISSUE_TYPE = "issue_type" + TAGS = "$tags" + IMPORT = "$import" + + +class PropertyFilterSchema(BaseModel): + name: Union[EventPredefinedPropertyType, str] = Field(...) + operator: Union[SearchEventOperator, MathOperator] = Field(...) + value: List[Union[int, str]] = Field(...) + property_type: Optional[Literal["string", "number", "date"]] = Field(default=None) + + @computed_field + @property + def is_predefined(self) -> bool: + return EventPredefinedPropertyType.has_value(self.name) + + @model_validator(mode="after") + def transform_name(self): + if isinstance(self.name, Enum): + self.name = self.name.value + return self + + +class EventPropertiesSchema(BaseModel): + operators: List[Literal["and", "or"]] = Field(...) + filters: List[PropertyFilterSchema] = Field(...) + + @model_validator(mode="after") + def event_filter_validator(self): + assert len(self.filters) == 0 \ + or len(self.operators) == len(self.filters) - 1, \ + "Number of operators must match the number of filter-1" + return self + + class SessionSearchEventSchema2(BaseModel): is_event: Literal[True] = True value: List[Union[str, int]] = Field(...) @@ -553,6 +617,7 @@ class SessionSearchEventSchema2(BaseModel): source: Optional[List[Union[ErrorSource, int, str]]] = Field(default=None) sourceOperator: Optional[MathOperator] = Field(default=None) filters: Optional[List[RequestGraphqlFilterSchema]] = Field(default_factory=list) + properties: Optional[EventPropertiesSchema] = Field(default=None) _remove_duplicate_values = field_validator('value', mode='before')(remove_duplicate_values) _single_to_list_values = field_validator('value', mode='before')(single_to_list) diff --git a/ee/api/.gitignore b/ee/api/.gitignore index 9317c3d12..d46a28ff0 100644 --- a/ee/api/.gitignore +++ b/ee/api/.gitignore @@ -230,6 +230,7 @@ Pipfile.lock /chalicelib/core/socket_ios.py /chalicelib/core/sourcemaps /chalicelib/core/tags.py +/chalicelib/core/product_analytics /chalicelib/saml /chalicelib/utils/__init__.py /chalicelib/utils/args_transformer.py diff --git a/ee/api/app.py b/ee/api/app.py index 7ad085882..e1dece2dd 100644 --- a/ee/api/app.py +++ b/ee/api/app.py @@ -150,9 +150,9 @@ app.include_router(spot.public_app) app.include_router(spot.app) app.include_router(spot.app_apikey) -app.include_router(product_anaytics.public_app) -app.include_router(product_anaytics.app) -app.include_router(product_anaytics.app_apikey) +app.include_router(product_anaytics.public_app, prefix="/ap") +app.include_router(product_anaytics.app, prefix="/ap") +app.include_router(product_anaytics.app_apikey, prefix="/ap") if config("ENABLE_SSO", cast=bool, default=True): app.include_router(saml.public_app) diff --git a/ee/api/clean-dev.sh b/ee/api/clean-dev.sh index bfd7a7a90..abfa59369 100755 --- a/ee/api/clean-dev.sh +++ b/ee/api/clean-dev.sh @@ -52,6 +52,7 @@ rm -rf ./chalicelib/core/socket_ios.py rm -rf ./chalicelib/core/sourcemaps rm -rf ./chalicelib/core/user_testing.py rm -rf ./chalicelib/core/tags.py +rm -rf ./chalicelib/core/product_analytics rm -rf ./chalicelib/saml rm -rf ./chalicelib/utils/__init__.py rm -rf ./chalicelib/utils/args_transformer.py