diff --git a/api/app.py b/api/app.py index 34b78a2ca..4f9c92d72 100644 --- a/api/app.py +++ b/api/app.py @@ -10,7 +10,9 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from psycopg import AsyncConnection from starlette.responses import StreamingResponse +from psycopg.rows import dict_row +import orpy from chalicelib.utils import helper from chalicelib.utils import pg_client from crons import core_crons, core_dynamic_crons @@ -20,8 +22,6 @@ from routers.subs import insights, metrics, v1_api, health, usability_tests loglevel = config("LOGLEVEL", default=logging.WARNING) print(f">Loglevel set to: {loglevel}") logging.basicConfig(level=loglevel) -import orpy -from psycopg.rows import dict_row class ORPYAsyncConnection(AsyncConnection): diff --git a/api/chalicelib/core/metadata.py b/api/chalicelib/core/metadata.py index e1f00a88e..909832b56 100644 --- a/api/chalicelib/core/metadata.py +++ b/api/chalicelib/core/metadata.py @@ -284,8 +284,8 @@ def get_keys_by_projects(project_ids): # return {"data": get(project_id)} -def get_remaining_metadata_with_count(tenant_id): - all_projects = projects.get_projects(tenant_id=tenant_id) +async def get_remaining_metadata_with_count(tenant_id): + all_projects = await projects.get_projects(tenant_id=tenant_id) results = [] used_metas = get_batch([p["projectId"] for p in all_projects]) for p in all_projects: diff --git a/api/chalicelib/core/projects.py b/api/chalicelib/core/projects.py index 42877cf8b..656085314 100644 --- a/api/chalicelib/core/projects.py +++ b/api/chalicelib/core/projects.py @@ -7,7 +7,7 @@ import schemas from chalicelib.core import users from chalicelib.utils import pg_client, helper from chalicelib.utils.TimeUTC import TimeUTC - +import orpy def __exists_by_name(name: str, exclude_id: Optional[int]) -> bool: with pg_client.PostgresClient() as cur: @@ -52,8 +52,9 @@ def __create(tenant_id, data): return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True) -def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False): - with pg_client.PostgresClient() as cur: +async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False): + + async def _get_projects(cnx): extra_projection = "" if gdpr: extra_projection += ',s.gdpr' @@ -67,16 +68,15 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False): AND sessions.start_ts <= %(now)s )) AS first_recorded""" - query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""} + query = f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""} SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at, s.created_at, s.sessions_last_check_at, s.sample_rate, s.platform {extra_projection} FROM public.projects AS s WHERE s.deleted_at IS NULL - ORDER BY s.name {") AS raw" if recorded else ""};""", - {"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4}) - cur.execute(query) - rows = cur.fetchall() + ORDER BY s.name {") AS raw" if recorded else ""};""" + rows = await cnx.execute(query, {"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4}) + rows = await rows.fetchall() # if recorded is requested, check if it was saved or computed if recorded: u_values = [] @@ -94,11 +94,10 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False): r.pop("first_recorded") r.pop("sessions_last_check_at") if len(u_values) > 0: - query = cur.mogrify(f"""UPDATE public.projects + await cnx.execute(f"""UPDATE public.projects SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded FROM (VALUES {",".join(u_values)}) AS u(project_id,first_recorded) WHERE projects.project_id=u.project_id;""", params) - cur.execute(query) else: for r in rows: r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"]) @@ -106,13 +105,18 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False): return helper.list_to_camel_case(rows) + async with orpy.get().database.connection() as cnx: + with cnx.transaction(): + out = await _get_projects(cnx) + return out + def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None): with pg_client.PostgresClient() as cur: extra_select = "" if include_last_session: - extra_select += """,(SELECT max(ss.start_ts) - FROM public.sessions AS ss + extra_select += """,(SELECT max(ss.start_ts) + FROM public.sessions AS ss WHERE ss.project_id = %(project_id)s) AS last_recorded_session_at""" if include_gdpr: extra_select += ",s.gdpr" diff --git a/api/chalicelib/core/sessions.py b/api/chalicelib/core/sessions.py index 2a16c3cc3..f4fad4701 100644 --- a/api/chalicelib/core/sessions.py +++ b/api/chalicelib/core/sessions.py @@ -1093,12 +1093,12 @@ def search_query_parts(data: schemas.SessionsSearchPayloadSchema, error_status, return full_args, query_part -def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): +async def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): if project_id is None: - all_projects = projects.get_projects(tenant_id=tenant_id) + all_projects = await projects.get_projects(tenant_id=tenant_id) else: all_projects = [ - projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, + await projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, include_gdpr=False)] all_projects = {int(p["projectId"]): p["name"] for p in all_projects} diff --git a/api/chalicelib/core/users.py b/api/chalicelib/core/users.py index 9dfbbbf14..a774f0ed8 100644 --- a/api/chalicelib/core/users.py +++ b/api/chalicelib/core/users.py @@ -444,7 +444,7 @@ def change_password(tenant_id, user_id, email, old_password, new_password): } -def set_password_invitation(user_id, new_password): +async def set_password_invitation(user_id, new_password): changes = {"password": new_password, "invitationToken": None, "invitedAt": None, "changePwdExpireAt": None, "changePwdToken": None} @@ -455,11 +455,11 @@ def set_password_invitation(user_id, new_password): r["limits"] = { "teamMember": -1, "projects": -1, - "metadata": metadata.get_remaining_metadata_with_count(tenant_id)} + "metadata": await metadata.get_remaining_metadata_with_count(tenant_id)} c = tenants.get_by_tenant_id(tenant_id) c.pop("createdAt") - c["projects"] = projects.get_projects(tenant_id=tenant_id, recorded=True) + c["projects"] = await projects.get_projects(tenant_id=tenant_id, recorded=True) c["smtp"] = smtp.has_smtp() c["iceServers"] = assist.get_ice_servers() return { diff --git a/api/routers/core_dynamic.py b/api/routers/core_dynamic.py index dfadd05c7..c8e10952a 100644 --- a/api/routers/core_dynamic.py +++ b/api/routers/core_dynamic.py @@ -175,7 +175,7 @@ def process_invitation_link(token: str): @public_app.post('/password/reset', tags=["users"]) -def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)): +async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)): if data is None or len(data.invitation) < 64 or len(data.passphrase) < 8: return {"errors": ["please provide a valid invitation & pass"]} user = users.get_by_invitation_token(token=data.invitation, pass_token=data.passphrase) @@ -184,7 +184,7 @@ def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = if user["expiredChange"]: return {"errors": ["expired change, please re-use the invitation link"]} - return users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"]) + return await users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"]) @app.put('/client/members/{memberId}', tags=["client"], dependencies=[OR_role("owner", "admin")]) @@ -195,7 +195,7 @@ def edit_member(memberId: int, data: schemas.EditMemberSchema, @app.get('/metadata/session_search', tags=["metadata"]) -def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None, +async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None, context: schemas.CurrentContext = Depends(OR_context)): if key is None or value is None or len(value) == 0 and len(key) == 0: return {"errors": ["please provide a key&value for search"]} @@ -204,13 +204,13 @@ def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = if len(key) == 0: return {"errors": ["please provide a key for search"]} return { - "data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, + "data": await sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, m_key=key, project_id=projectId)} @app.get('/projects', tags=['projects']) -def get_projects(context: schemas.CurrentContext = Depends(OR_context)): - return {"data": projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True)} +async def get_projects(context: schemas.CurrentContext = Depends(OR_context)): + return {"data": await projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True)} # for backward compatibility diff --git a/api/routers/subs/health.py b/api/routers/subs/health.py index 245f039c7..9ef4f8545 100644 --- a/api/routers/subs/health.py +++ b/api/routers/subs/health.py @@ -19,5 +19,4 @@ if not tenants.tenants_exists_sync(use_pool=False): async def get_public_health_status(): if await tenants.tenants_exists(): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Not Found") - return {"data": health.get_health()} diff --git a/api/routers/subs/v1_api.py b/api/routers/subs/v1_api.py index b4e9c2aaa..6264f9c39 100644 --- a/api/routers/subs/v1_api.py +++ b/api/routers/subs/v1_api.py @@ -73,8 +73,8 @@ def cancel_job(projectKey: str, jobId: int, _=Body(None), context: schemas.Curre @app_apikey.get('/v1/projects', tags=["api"]) -def get_projects(context: schemas.CurrentContext = Depends(OR_context)): - records = projects.get_projects(tenant_id=context.tenant_id) +async def get_projects(context: schemas.CurrentContext = Depends(OR_context)): + records = await projects.get_projects(tenant_id=context.tenant_id) for record in records: del record['projectId'] diff --git a/backend/internal/http/router/handlers-web.go b/backend/internal/http/router/handlers-web.go index 82825815c..f64a70f2a 100644 --- a/backend/internal/http/router/handlers-web.go +++ b/backend/internal/http/router/handlers-web.go @@ -372,6 +372,78 @@ func (e *Router) featureFlagsHandlerWeb(w http.ResponseWriter, r *http.Request) ResponseWithJSON(w, resp, startTime, r.URL.Path, bodySize) } +type ScreenshotMessage struct { + Name string + Data []byte +} + +func (e *Router) imagesUploaderHandlerWeb(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + + sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) + if err != nil { // Should accept expired token? + ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0) + return + } + + if r.Body == nil { + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, 0) + return + } + r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit) + defer r.Body.Close() + + // Parse the multipart form + err = r.ParseMultipartForm(10 << 20) // Max upload size 10 MB + if err == http.ErrNotMultipart || err == http.ErrMissingBoundary { + ResponseWithError(w, http.StatusUnsupportedMediaType, err, startTime, r.URL.Path, 0) + return + } else if err != nil { + ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) // TODO: send error here only on staging + return + } + + // Iterate over uploaded files + for _, fileHeaderList := range r.MultipartForm.File { + for _, fileHeader := range fileHeaderList { + file, err := fileHeader.Open() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Read the file content + fileBytes, err := ioutil.ReadAll(file) + if err != nil { + file.Close() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + file.Close() + + fileName := util.SafeString(fileHeader.Filename) + log.Printf("fileName: %s, fileSize: %d", fileName, len(fileBytes)) + + // Create a message to send to Kafka + msg := ScreenshotMessage{ + Name: fileName, + Data: fileBytes, + } + data, err := json.Marshal(&msg) + if err != nil { + log.Printf("can't marshal screenshot message, err: %s", err) + continue + } + + // Send the message to queue + if err := e.services.Producer.Produce(e.cfg.TopicCanvasImages, sessionData.ID, data); err != nil { + log.Printf("failed to produce canvas image message: %v", err) + } + } + } + ResponseOK(w, startTime, r.URL.Path, 0) +} + func (e *Router) getUXTestInfo(w http.ResponseWriter, r *http.Request) { startTime := time.Now() bodySize := 0 @@ -501,6 +573,7 @@ func (e *Router) getUXUploadUrl(w http.ResponseWriter, r *http.Request) { URL string `json:"url"` } ResponseWithJSON(w, &UrlResponse{URL: url}, startTime, r.URL.Path, bodySize) + } type ScreenshotMessage struct { diff --git a/ee/api/chalicelib/core/projects.py b/ee/api/chalicelib/core/projects.py index 3d33160bd..d38244c0b 100644 --- a/ee/api/chalicelib/core/projects.py +++ b/ee/api/chalicelib/core/projects.py @@ -7,7 +7,7 @@ import schemas from chalicelib.core import users from chalicelib.utils import pg_client, helper from chalicelib.utils.TimeUTC import TimeUTC - +import orpy def __exists_by_name(tenant_id: int, name: str, exclude_id: Optional[int]) -> bool: with pg_client.PostgresClient() as cur: @@ -53,8 +53,9 @@ def __create(tenant_id, data): return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True) -def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, user_id: int = None): - with pg_client.PostgresClient() as cur: +async def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, user_id: int = None): + + async def _get_projects(cnx): role_query = """INNER JOIN LATERAL (SELECT 1 FROM users INNER JOIN roles USING (role_id) @@ -77,7 +78,7 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, use AND sessions.start_ts <= %(now)s )) AS first_recorded""" - query = cur.mogrify(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""} + rows = await cnx.execute(f"""{"SELECT *, first_recorded IS NOT NULL AS recorded FROM (" if recorded else ""} SELECT s.project_id, s.name, s.project_key, s.save_request_payloads, s.first_recorded_session_at, s.created_at, s.sessions_last_check_at, s.sample_rate, s.platform {extra_projection} @@ -88,8 +89,7 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, use ORDER BY s.name {") AS raw" if recorded else ""};""", {"now": TimeUTC.now(), "check_delta": TimeUTC.MS_HOUR * 4, "tenant_id": tenant_id, "user_id": user_id}) - cur.execute(query) - rows = cur.fetchall() + rows = await rows.fetchall() # if recorded is requested, check if it was saved or computed if recorded: u_values = [] @@ -107,11 +107,10 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, use r.pop("first_recorded") r.pop("sessions_last_check_at") if len(u_values) > 0: - query = cur.mogrify(f"""UPDATE public.projects + cnx.execute(f"""UPDATE public.projects SET sessions_last_check_at=(now() at time zone 'utc'), first_recorded_session_at=u.first_recorded FROM (VALUES {",".join(u_values)}) AS u(project_id,first_recorded) WHERE projects.project_id=u.project_id;""", params) - cur.execute(query) else: for r in rows: r["created_at"] = TimeUTC.datetime_to_timestamp(r["created_at"]) @@ -119,6 +118,11 @@ def get_projects(tenant_id: int, gdpr: bool = False, recorded: bool = False, use return helper.list_to_camel_case(rows) + async with orpy.get().database.connection() as cnx: + async with cnx.transaction(): + out = await _get_projects(cnx) + return out + def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None): with pg_client.PostgresClient() as cur: diff --git a/ee/api/chalicelib/core/sessions_exp.py b/ee/api/chalicelib/core/sessions_exp.py index 9f8876312..eee2c41fa 100644 --- a/ee/api/chalicelib/core/sessions_exp.py +++ b/ee/api/chalicelib/core/sessions_exp.py @@ -1497,12 +1497,12 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu return full_args, query_part -def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): +async def search_by_metadata(tenant_id, user_id, m_key, m_value, project_id=None): if project_id is None: - all_projects = projects.get_projects(tenant_id=tenant_id) + all_projects = await projects.get_projects(tenant_id=tenant_id) else: all_projects = [ - projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, + await projects.get_project(tenant_id=tenant_id, project_id=int(project_id), include_last_session=False, include_gdpr=False)] all_projects = {int(p["projectId"]): p["name"] for p in all_projects} diff --git a/ee/api/chalicelib/core/users.py b/ee/api/chalicelib/core/users.py index 06dfb485f..22bf2b2b9 100644 --- a/ee/api/chalicelib/core/users.py +++ b/ee/api/chalicelib/core/users.py @@ -523,7 +523,7 @@ def change_password(tenant_id, user_id, email, old_password, new_password): } -def set_password_invitation(tenant_id, user_id, new_password): +async def set_password_invitation(tenant_id, user_id, new_password): changes = {"password": new_password, "invitationToken": None, "invitedAt": None, "changePwdExpireAt": None, "changePwdToken": None} @@ -534,11 +534,11 @@ def set_password_invitation(tenant_id, user_id, new_password): r["limits"] = { "teamMember": -1, "projects": -1, - "metadata": metadata.get_remaining_metadata_with_count(tenant_id)} + "metadata": await metadata.get_remaining_metadata_with_count(tenant_id)} c = tenants.get_by_tenant_id(tenant_id) c.pop("createdAt") - c["projects"] = projects.get_projects(tenant_id=tenant_id, recorded=True, user_id=user_id) + c["projects"] = await projects.get_projects(tenant_id=tenant_id, recorded=True, user_id=user_id) c["smtp"] = smtp.has_smtp() c["iceServers"] = assist.get_ice_servers() return { diff --git a/ee/api/routers/core_dynamic.py b/ee/api/routers/core_dynamic.py index bc51a803d..57f62150b 100644 --- a/ee/api/routers/core_dynamic.py +++ b/ee/api/routers/core_dynamic.py @@ -38,7 +38,7 @@ async def get_all_signup(): if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exists_sync(use_pool=False): @public_app.post('/signup', tags=['signup']) @public_app.put('/signup', tags=['signup']) - async def signup_handler(data: schemas.UserSignupSchema = Body(...)): + async def signup_handler(data: schemas.UserSignupSchema): content = await signup.create_tenant(data) if "errors" in content: return content @@ -51,7 +51,7 @@ if config("MULTI_TENANTS", cast=bool, default=False) or not tenants.tenants_exis @public_app.post('/login', tags=["authentication"]) -def login_user(response: JSONResponse, data: schemas.UserLoginSchema = Body(...)): +def login_user(response: JSONResponse, data: schemas.UserLoginSchema): if helper.allow_captcha() and not captcha.is_valid(data.g_recaptcha_response): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -182,7 +182,7 @@ async def process_invitation_link(token: str, request: Request): @public_app.post('/password/reset', tags=["users"]) -def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)): +async def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = Body(...)): if data is None or len(data.invitation) < 64 or len(data.passphrase) < 8: return {"errors": ["please provide a valid invitation & pass"]} user = users.get_by_invitation_token(token=data.invitation, pass_token=data.passphrase) @@ -191,7 +191,7 @@ def change_password_by_invitation(data: schemas.EditPasswordByInvitationSchema = if user["expiredChange"]: return {"errors": ["expired change, please re-use the invitation link"]} - return users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"], + return await users.set_password_invitation(new_password=data.password.get_secret_value(), user_id=user["userId"], tenant_id=user["tenantId"]) @@ -203,7 +203,7 @@ def edit_member(memberId: int, data: schemas.EditMemberSchema, @app.get('/metadata/session_search', tags=["metadata"]) -def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None, +async def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = None, context: schemas.CurrentContext = Depends(OR_context)): if key is None or value is None or len(value) == 0 and len(key) == 0: return {"errors": ["please provide a key&value for search"]} @@ -216,13 +216,13 @@ def search_sessions_by_metadata(key: str, value: str, projectId: Optional[int] = if len(key) == 0: return {"errors": ["please provide a key for search"]} return { - "data": sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, + "data": await sessions.search_by_metadata(tenant_id=context.tenant_id, user_id=context.user_id, m_value=value, m_key=key, project_id=projectId)} @app.get('/projects', tags=['projects']) -def get_projects(context: schemas.CurrentContext = Depends(OR_context)): - return {"data": projects.get_projects(tenant_id=context.tenant_id, gdpr=True, +async def get_projects(context: schemas.CurrentContext = Depends(OR_context)): + return {"data": await projects.get_projects(tenant_id=context.tenant_id, gdpr=True, recorded=True, user_id=context.user_id)}