diff --git a/api/chalicelib/core/assist_stats.py b/api/chalicelib/core/assist_stats.py new file mode 100644 index 000000000..d2bd6505c --- /dev/null +++ b/api/chalicelib/core/assist_stats.py @@ -0,0 +1,386 @@ +import logging +from datetime import datetime + +from chalicelib.utils import pg_client, helper +from schemas import AssistStatsSessionsRequest, AssistStatsSessionsResponse, AssistStatsTopMembersResponse + + +def insert_aggregated_data(): + try: + logging.info("Assist Stats: Inserting aggregated data") + end_timestamp = int(datetime.timestamp(datetime.now())) * 1000 + start_timestamp = __last_run_end_timestamp_from_aggregates() + + if start_timestamp is None: # first run + logging.info("Assist Stats: First run, inserting data for last 7 days") + start_timestamp = end_timestamp - (7 * 24 * 60 * 60 * 1000) + + offset = 0 + chunk_size = 1000 + + while True: + constraints = [ + "timestamp BETWEEN %(start_timestamp)s AND %(end_timestamp)s" + ] + + params = { + "limit": chunk_size, + "offset": offset, + "start_timestamp": start_timestamp + 1, + "end_timestamp": end_timestamp, + "step_size": f"{60} seconds", + } + + logging.info(f"Assist Stats: Fetching data from {start_timestamp} to {end_timestamp}") + aggregated_data = __get_all_events_hourly_averages(constraints, params) + + if not aggregated_data: # No more data to insert + logging.info("Assist Stats: No more data to insert") + break + + logging.info(f"Assist Stats: Inserting {len(aggregated_data)} rows") + + for data in aggregated_data: + sql = """ + INSERT INTO assist_events_aggregates + (timestamp, project_id, agent_id, assist_avg, call_avg, control_avg, assist_total, call_total, control_total) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + params = ( + data['time'], + data['project_id'], + data['agent_id'], + data['assist_avg'], + data['call_avg'], + data['control_avg'], + data['assist_total'], + data['call_total'], + data['control_total'] + ) + + with pg_client.PostgresClient() as cur: + cur.execute(sql, params) + + offset += chunk_size + + # get the first timestamp from the table assist_events based on start_timestamp + sql = f""" + SELECT MAX(timestamp) as first_timestamp + FROM assist_events + WHERE timestamp > %(start_timestamp)s AND duration > 0 + GROUP BY timestamp + ORDER BY timestamp DESC LIMIT 1 + """ + with pg_client.PostgresClient() as cur: + cur.execute(sql, params) + result = cur.fetchone() + first_timestamp = result['first_timestamp'] if result else None + + # insert the first timestamp into assist_events_aggregates_logs + if first_timestamp is not None: + sql = "INSERT INTO assist_events_aggregates_logs (time) VALUES (%s)" + params = (first_timestamp,) + with pg_client.PostgresClient() as cur: + cur.execute(sql, params) + + except Exception as e: + logging.error(f"Error inserting aggregated data -: {e}") + + +def __last_run_end_timestamp_from_aggregates(): + sql = "SELECT MAX(time) as last_run_time FROM assist_events_aggregates_logs;" + with pg_client.PostgresClient() as cur: + cur.execute(sql) + result = cur.fetchone() + last_run_time = result['last_run_time'] if result else None + + if last_run_time is None: # first run handle all data + sql = "SELECT MIN(timestamp) as last_timestamp FROM assist_events;" + with pg_client.PostgresClient() as cur: + cur.execute(sql) + result = cur.fetchone() + last_run_time = result['last_timestamp'] if result else None + + return last_run_time + + +def __get_all_events_hourly_averages(constraints, params): + sql = f""" + WITH time_series AS ( + SELECT + EXTRACT(epoch FROM generate_series( + date_trunc('hour', to_timestamp(%(start_timestamp)s/1000)), + date_trunc('hour', to_timestamp(%(end_timestamp)s/1000)) + interval '1 hour', + interval %(step_size)s + ))::bigint as unix_time + ) + SELECT + time_series.unix_time * 1000 as time, + project_id, + agent_id, + ROUND(AVG(CASE WHEN event_type = 'assist' THEN duration ELSE 0 END)) as assist_avg, + ROUND(AVG(CASE WHEN event_type = 'call' THEN duration ELSE 0 END)) as call_avg, + ROUND(AVG(CASE WHEN event_type = 'control' THEN duration ELSE 0 END)) as control_avg, + ROUND(SUM(CASE WHEN event_type = 'assist' THEN duration ELSE 0 END)) as assist_total, + ROUND(SUM(CASE WHEN event_type = 'call' THEN duration ELSE 0 END)) as call_total, + ROUND(SUM(CASE WHEN event_type = 'control' THEN duration ELSE 0 END)) as control_total + FROM + time_series + LEFT JOIN assist_events ON time_series.unix_time = EXTRACT(epoch FROM DATE_TRUNC('hour', to_timestamp(assist_events.timestamp/1000))) + WHERE + {' AND '.join(f'{constraint}' for constraint in constraints)} + GROUP BY time, project_id, agent_id + ORDER BY time + LIMIT %(limit)s OFFSET %(offset)s; + """ + with pg_client.PostgresClient() as cur: + query = cur.mogrify(sql, params) + cur.execute(query) + rows = cur.fetchall() + return rows + + +def get_averages( + project_id: int, + start_timestamp: int, + end_timestamp: int, + user_id: int = None, +): + constraints = [ + "project_id = %(project_id)s", + "timestamp BETWEEN %(start_timestamp)s AND %(end_timestamp)s", + ] + + params = { + "project_id": project_id, + "limit": 5, + "offset": 0, + "start_timestamp": start_timestamp, + "end_timestamp": end_timestamp, + "step_size": f"{60} seconds", + } + + if user_id is not None: + constraints.append("agent_id = %(agent_id)s") + params["agent_id"] = user_id + + totals = __get_all_events_totals(constraints, params) + rows = __get_all_events_averages(constraints, params) + + params["start_timestamp"] = start_timestamp - (end_timestamp - start_timestamp) + params["end_timestamp"] = start_timestamp + previous_totals = __get_all_events_totals(constraints, params) + + return { + "currentPeriod": totals[0], + "previousPeriod": previous_totals[0], + "list": helper.list_to_camel_case(rows), + } + + +def __get_all_events_totals(constraints, params): + sql = f""" + SELECT + ROUND(SUM(assist_total)) as assist_total, + ROUND(AVG(assist_avg)) as assist_avg, + ROUND(SUM(call_total)) as call_total, + ROUND(AVG(call_avg)) as call_avg, + ROUND(SUM(control_total)) as control_total, + ROUND(AVG(control_avg)) as control_avg + FROM assist_events_aggregates + WHERE {' AND '.join(f'{constraint}' for constraint in constraints)} + """ + with pg_client.PostgresClient() as cur: + query = cur.mogrify(sql, params) + cur.execute(query) + rows = cur.fetchall() + return helper.list_to_camel_case(rows) + + +def __get_all_events_averages(constraints, params): + sql = f""" + SELECT + timestamp, + assist_avg, + call_avg, + control_avg, + assist_total, + call_total, + control_total + FROM assist_events_aggregates + WHERE + {' AND '.join(f'{constraint}' for constraint in constraints)} + ORDER BY timestamp; + """ + with pg_client.PostgresClient() as cur: + query = cur.mogrify(sql, params) + cur.execute(query) + rows = cur.fetchall() + return rows + + +def __get_all_events_averagesx(constraints, params): + sql = f""" + WITH time_series AS ( + SELECT + EXTRACT(epoch FROM generate_series( + date_trunc('minute', to_timestamp(%(start_timestamp)s/1000)), + date_trunc('minute', to_timestamp(%(end_timestamp)s/1000)), + interval %(step_size)s + ))::bigint as unix_time + ) + SELECT + time_series.unix_time as time, + project_id, + ROUND(AVG(CASE WHEN event_type = 'assist' THEN duration ELSE 0 END)) as assist_avg, + ROUND(AVG(CASE WHEN event_type = 'call' THEN duration ELSE 0 END)) as call_avg, + ROUND(AVG(CASE WHEN event_type = 'control' THEN duration ELSE 0 END)) as control_avg, + ROUND(SUM(CASE WHEN event_type = 'assist' THEN duration ELSE 0 END)) as assist_total, + ROUND(SUM(CASE WHEN event_type = 'call' THEN duration ELSE 0 END)) as call_total, + ROUND(SUM(CASE WHEN event_type = 'control' THEN duration ELSE 0 END)) as control_total + FROM + time_series + LEFT JOIN assist_events ON time_series.unix_time = EXTRACT(epoch FROM DATE_TRUNC('minute', to_timestamp(assist_events.timestamp/1000))) + WHERE + {' AND '.join(f'{constraint}' for constraint in constraints)} + GROUP BY time, project_id + ORDER BY time; + + """ + with pg_client.PostgresClient() as cur: + query = cur.mogrify(sql, params) + cur.execute(query) + rows = cur.fetchall() + return rows + + +def get_top_members( + project_id: int, + start_timestamp: int, + end_timestamp: int, + sort_by: str, + sort_order: str, + user_id: int = None, +) -> AssistStatsTopMembersResponse: + event_type_mapping = { + "sessionsAssisted": "assist", + "assistDuration": "assist", + "callDuration": "call", + "controlDuration": "control" + } + + event_type = event_type_mapping.get(sort_by) + if event_type is None: + raise ValueError("Invalid sortBy option") + + constraints = [ + "project_id = %(project_id)s", + "timestamp BETWEEN %(start_timestamp)s AND %(end_timestamp)s", + "duration > 0", + # "event_type = %(event_type)s", + ] + + params = { + "project_id": project_id, + "limit": 5, + "offset": 0, + "sort_by": sort_by, + "sort_order": sort_order.upper(), + "start_timestamp": start_timestamp, + "end_timestamp": end_timestamp, + # "event_type": event_type, + } + + if user_id is not None: + constraints.append("agent_id = %(agent_id)s") + params["agent_id"] = user_id + + sql = f""" + SELECT + COUNT(1) OVER () AS total, + ae.agent_id, + u.name AS name, + CASE WHEN '{sort_by}' = 'sessionsAssisted' + THEN SUM(CASE WHEN ae.event_type = 'assist' THEN 1 ELSE 0 END) + ELSE SUM(CASE WHEN ae.event_type <> 'assist' THEN ae.duration ELSE 0 END) + END AS count + FROM assist_events ae + JOIN users u ON u.user_id = ae.agent_id + WHERE {' AND '.join(f'ae.{constraint}' for constraint in constraints)} + AND ae.event_type = '{event_type}' + GROUP BY ae.agent_id, u.name + ORDER BY count {params['sort_order']} + LIMIT %(limit)s OFFSET %(offset)s + """ + + with pg_client.PostgresClient() as cur: + query = cur.mogrify(sql, params) + cur.execute(query) + rows = cur.fetchall() + + if len(rows) == 0: + return AssistStatsTopMembersResponse(total=0, list=[]) + + count = rows[0]["total"] + rows = helper.list_to_camel_case(rows) + for row in rows: + row.pop("total") + + return AssistStatsTopMembersResponse(total=count, list=rows) + + +def get_sessions( + project_id: int, + data: AssistStatsSessionsRequest, +) -> AssistStatsSessionsResponse: + constraints = [ + "project_id = %(project_id)s", + "timestamp BETWEEN %(start_timestamp)s AND %(end_timestamp)s", + ] + + params = { + "project_id": project_id, + "limit": data.limit, + "offset": (data.page - 1) * data.limit, + "sort_by": data.sort, + "sort_order": data.order.upper(), + "start_timestamp": data.startTimestamp, + "end_timestamp": data.endTimestamp, + } + + if data.userId is not None: + constraints.append("agent_id = %(agent_id)s") + params["agent_id"] = data.userId + + sql = f""" + SELECT + COUNT(1) OVER () AS count, + ae.session_id, + MIN(ae.timestamp) as timestamp, + SUM(CASE WHEN ae.event_type = 'call' THEN ae.duration ELSE 0 END) AS call_duration, + SUM(CASE WHEN ae.event_type = 'control' THEN ae.duration ELSE 0 END) AS control_duration, + SUM(CASE WHEN ae.event_type = 'assist' THEN ae.duration ELSE 0 END) AS assist_duration, + (SELECT json_agg(json_build_object('name', u.name, 'id', u.user_id)) + FROM users u + WHERE u.user_id = ANY (array_agg(ae.agent_id))) AS team_members + FROM assist_events ae + WHERE {' AND '.join(f'ae.{constraint}' for constraint in constraints)} + GROUP BY ae.session_id + ORDER BY {params['sort_by']} {params['sort_order']} + LIMIT %(limit)s OFFSET %(offset)s + """ + + with pg_client.PostgresClient() as cur: + query = cur.mogrify(sql, params) + cur.execute(query) + rows = cur.fetchall() + + if len(rows) == 0: + return AssistStatsSessionsResponse(total=0, page=1, list=[]) + + count = rows[0]["count"] + + rows = helper.list_to_camel_case(rows) + for row in rows: + row.pop("count") + return AssistStatsSessionsResponse(total=count, page=data.page, list=rows) diff --git a/api/crons/core_dynamic_crons.py b/api/crons/core_dynamic_crons.py index f1bc87929..6b4a87dd2 100644 --- a/api/crons/core_dynamic_crons.py +++ b/api/crons/core_dynamic_crons.py @@ -3,6 +3,7 @@ from apscheduler.triggers.interval import IntervalTrigger from chalicelib.core import telemetry from chalicelib.core import weekly_report, jobs, health +from chalicelib.core import assist_stats async def run_scheduled_jobs() -> None: @@ -25,6 +26,10 @@ async def weekly_health_cron() -> None: health.weekly_cron() +async def assist_events_aggregates_cron() -> None: + assist_stats.insert_aggregated_data() + + cron_jobs = [ {"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"), "misfire_grace_time": 60 * 60, "max_instances": 1}, @@ -35,5 +40,7 @@ cron_jobs = [ {"func": health_cron, "trigger": IntervalTrigger(hours=0, minutes=30, start_date="2023-04-01 0:0:0", jitter=300), "misfire_grace_time": 60 * 60, "max_instances": 1}, {"func": weekly_health_cron, "trigger": CronTrigger(day_of_week="sun", hour=5), - "misfire_grace_time": 60 * 60, "max_instances": 1} + "misfire_grace_time": 60 * 60, "max_instances": 1}, + {"func": assist_events_aggregates_cron, + "trigger": IntervalTrigger(hours=1, start_date="2023-04-01 0:0:0", jitter=10), } ] diff --git a/api/routers/core.py b/api/routers/core.py index d0f8027f7..1b736a396 100644 --- a/api/routers/core.py +++ b/api/routers/core.py @@ -1,7 +1,9 @@ -from typing import Union - +import json +from datetime import datetime, timedelta +from typing import Union, List, Dict from decouple import config -from fastapi import Depends, Body +from fastapi import Depends, Body, Query +from starlette.responses import FileResponse import schemas from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assignments, projects, \ @@ -10,7 +12,7 @@ from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assig log_tool_stackdriver, reset_password, log_tool_cloudwatch, log_tool_sentry, log_tool_sumologic, log_tools, sessions, \ log_tool_newrelic, announcements, log_tool_bugsnag, weekly_report, integration_jira_cloud, integration_github, \ assist, mobile, tenants, boarding, notifications, webhook, users, \ - custom_metrics, saved_search, integrations_global + custom_metrics, saved_search, integrations_global, assist_stats from chalicelib.core.collaboration_msteams import MSTeams from chalicelib.core.collaboration_slack import Slack from or_dependencies import OR_context, OR_role @@ -862,3 +864,53 @@ async def check_recording_status(project_id: int): @public_app.get('/', tags=["health"]) def health_check(): return {} + + +@public_app.get('/{project_id}/assist-stats/avg', tags=["assist-stats"]) +def get_assist_stats_avg( + project_id: int, + startTimestamp: int = None, + endTimestamp: int = None, + userId: str = None +): + return assist_stats.get_averages( + project_id=project_id, + start_timestamp=startTimestamp, + end_timestamp=endTimestamp, + user_id=userId) + + +@public_app.get( + '/{project_id}/assist-stats/top-members', + tags=["assist-stats"], + response_model=schemas.AssistStatsTopMembersResponse +) +def get_assist_stats_top_members( + project_id: int, + startTimestamp: int = None, + endTimestamp: int = None, + sortyBy: str = "sessionsAssisted", + sortOder: str = "desc" +): + return assist_stats.get_top_members( + project_id=project_id, + start_timestamp=startTimestamp, + end_timestamp=endTimestamp, + sort_by=sortyBy, + sort_order=sortOder + ) + + +@public_app.post( + '/{project_id}/assist-stats/sessions', + tags=["assist-stats"], + response_model=schemas.AssistStatsSessionsResponse +) +def get_assist_stats_sessions( + project_id: int, + data: schemas.AssistStatsSessionsRequest = Body(...), +): + return assist_stats.get_sessions( + project_id=project_id, + data=data + ) diff --git a/api/schemas/schemas.py b/api/schemas/schemas.py index 1255e1288..bfd8982f7 100644 --- a/api/schemas/schemas.py +++ b/api/schemas/schemas.py @@ -1,7 +1,7 @@ from typing import Annotated, Any from typing import Optional, List, Union, Literal -from pydantic import Field, EmailStr, HttpUrl, SecretStr, AnyHttpUrl +from pydantic import Field, EmailStr, HttpUrl, SecretStr, AnyHttpUrl, validator from pydantic import field_validator, model_validator, computed_field from chalicelib.utils.TimeUTC import TimeUTC @@ -1609,3 +1609,72 @@ class ModuleStatus(BaseModel): module: Literal["assist", "notes", "bug-reports", "offline-recordings", "alerts"] = Field(..., description="Possible values: notes, bugs, live") status: bool = Field(...) + + +class AssistStatsAverage(BaseModel): + key: str = Field(...) + avg: float = Field(...) + chartData: List[dict] = Field(...) + + +class AssistStatsMember(BaseModel): + name: str + count: int + + +class AssistStatsSessionAgent(BaseModel): + name: str + id: int + + +class AssistStatsTopMembersResponse(BaseModel): + total: int + list: List[AssistStatsMember] + + +class AssistStatsSessionRecording(BaseModel): + recordId: int = Field(...) + name: str = Field(...) + duration: int = Field(...) + + +class AssistStatsSession(BaseModel): + sessionId: str = Field(...) + timestamp: int = Field(...) + teamMembers: List[AssistStatsSessionAgent] = Field(...) + assistDuration: Optional[int] = Field(default=0) + callDuration: Optional[int] = Field(default=0) + controlDuration: Optional[int] = Field(default=0) + # recordings: list[AssistStatsSessionRecording] = Field(default=[]) + + +assist_sort_options = ["timestamp", "assist_duration", "call_duration", "control_duration"] + + +class AssistStatsSessionsRequest(BaseModel): + startTimestamp: int = Field(...) + endTimestamp: int = Field(...) + limit: Optional[int] = Field(default=10) + page: Optional[int] = Field(default=1) + sort: Optional[str] = Field(default="timestamp", + enum=assist_sort_options) + order: Optional[str] = Field(default="desc", choices=["desc", "asc"]) + userId: Optional[int] = Field(default=None) + + @field_validator("sort") + def validate_sort(cls, v): + if v not in assist_sort_options: + raise ValueError(f"Invalid sort option. Allowed options: {', '.join(assist_sort_options)}") + return v + + @field_validator("order") + def validate_order(cls, v): + if v not in ["desc", "asc"]: + raise ValueError("Invalid order option. Must be 'desc' or 'asc'.") + return v + + +class AssistStatsSessionsResponse(BaseModel): + total: int = Field(...) + page: int = Field(...) + list: List[AssistStatsSession] = Field(default=[]) diff --git a/scripts/schema/db/init_dbs/postgresql/1.15.0/1.15.0.sql b/scripts/schema/db/init_dbs/postgresql/1.15.0/1.15.0.sql index 7b39c7d7a..f44747777 100644 --- a/scripts/schema/db/init_dbs/postgresql/1.15.0/1.15.0.sql +++ b/scripts/schema/db/init_dbs/postgresql/1.15.0/1.15.0.sql @@ -118,6 +118,39 @@ ALTER TABLE IF EXISTS events.clicks ALTER TABLE IF EXISTS public.metrics ADD COLUMN IF NOT EXISTS card_info jsonb NULL; + +CREATE TABLE IF NOT EXISTS assist_events +( + event_id varchar NOT NULL PRIMARY KEY, + project_id integer NOT NULL, + session_id varchar NOT NULL, + event_type varchar NOT NULL, + event_state varchar NOT NULL, + timestamp integer NOT NULL, + user_id varchar, + agent_id varchar +); + +CREATE TABLE IF NOT EXISTS assist_events_aggregates +( + timestamp BIGINT not null, + project_id integer not null, + agent_id integer not null, + assist_avg BIGINT, + call_avg BIGINT, + control_avg BIGINT, + assist_total BIGINT, + call_total BIGINT, + control_total BIGINT +); + + +CREATE TABLE IF NOT EXISTS assist_events_aggregates_logs +( + time BIGINT not null +); + + COMMIT; \elif :is_next