openreplay/ee/api/routers/scim.py
2025-05-30 14:18:48 +02:00

1041 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import re
import uuid
from typing import Optional
from decouple import config
from fastapi import Depends, HTTPException, Header, Query, Response, Request
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, Field
import schemas
from chalicelib.core import users, roles, tenants
from chalicelib.utils.scim_auth import auth_required, create_tokens, verify_refresh_token
from routers.base import get_routers
logger = logging.getLogger(__name__)
public_app, app, app_apikey = get_routers(prefix="/sso/scim/v2")
"""Authentication endpoints"""
class RefreshRequest(BaseModel):
refresh_token: str
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Login endpoint to generate tokens
@public_app.post("/token")
async def login(host: str = Header(..., alias="Host"), form_data: OAuth2PasswordRequestForm = Depends()):
subdomain = host.split(".")[0]
# Missing authentication part, to add
if form_data.username != config("SCIM_USER") or form_data.password != config("SCIM_PASSWORD"):
raise HTTPException(status_code=401, detail="Invalid credentials")
tenant = tenants.get_by_name(subdomain)
access_token, refresh_token = create_tokens(tenant_id=tenant["tenantId"])
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
# Refresh token endpoint
@public_app.post("/refresh")
async def refresh_token(r: RefreshRequest):
payload = verify_refresh_token(r.refresh_token)
new_access_token, _ = create_tokens(tenant_id=payload["tenant_id"])
return {"access_token": new_access_token, "token_type": "bearer"}
RESOURCE_TYPE_IDS_TO_RESOURCE_TYPE_DETAILS = {
"User": {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:ResourceType"],
"id": "User",
"name": "User",
"endpoint": "/Users",
"description": "User account",
"schema": "urn:ietf:params:scim:schemas:core:2.0:User",
}
}
def _not_found_error_response(resource_id: str):
return JSONResponse(
status_code=404,
content={
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"detail": f"Resource {resource_id} not found",
"status": "404",
}
)
@public_app.get("/ResourceTypes", dependencies=[Depends(auth_required)])
async def get_resource_types(r: Request):
return JSONResponse(
status_code=200,
content={
"totalResults": len(RESOURCE_TYPE_IDS_TO_RESOURCE_TYPE_DETAILS),
"itemsPerPage": len(RESOURCE_TYPE_IDS_TO_RESOURCE_TYPE_DETAILS),
"startIndex": 1,
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
"Resources": sorted(
[
{
**value,
"meta": {
"location": str(r.url_for("get_resource_type", resource_id=value["id"])),
"resourceType": "ResourceType",
}
}
for value in RESOURCE_TYPE_IDS_TO_RESOURCE_TYPE_DETAILS.values()
]
),
},
)
@public_app.get("/ResourceTypes/{resource_id}", dependencies=[Depends(auth_required)])
async def get_resource_type(r: Request, resource_id: str):
if resource_id not in RESOURCE_TYPE_IDS_TO_RESOURCE_TYPE_DETAILS:
return _not_found_error_response(resource_id)
content = {
**RESOURCE_TYPE_IDS_TO_RESOURCE_TYPE_DETAILS[resource_id],
"meta": {
"location": str(r.url),
"resourceType": "ResourceType",
}
}
return JSONResponse(
status_code=200,
content=content,
)
SCHEMA_IDS_TO_SCHEMA_DETAILS = {
"urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig": {
"id": "urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig",
"name": "ServiceProviderConfig",
"description": "Defines the configuration options for the SCIM service provider.",
"attributes": [
{
"name": "documentationUri",
"type": "string",
"multiValued": False,
"description": "The base or canonical URL of the service provider's documentation.",
"required": False,
"caseExact": False,
"mutability": "readWrite",
"returned": "default",
"uniqueness": "none"
},
{
"name": "patch",
"type": "complex",
"multiValued": False,
"description": "A complex attribute indicating the service provider's support for PATCH requests.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value that indicates whether PATCH is supported.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
}
]
},
{
"name": "bulk",
"type": "complex",
"multiValued": False,
"description": "A complex attribute that indicates the service provider's support for bulk operations.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value that indicates whether bulk operations are supported.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
},
{
"name": "maxOperations",
"type": "integer",
"multiValued": False,
"description": "The maximum number of operations that can be performed in a bulk request.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
},
{
"name": "maxPayloadSize",
"type": "integer",
"multiValued": False,
"description": "The maximum payload size in bytes for a bulk request.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
}
]
},
{
"name": "filter",
"type": "complex",
"multiValued": False,
"description": "A complex attribute that indicates the service provider's support for filtering.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value that indicates whether filtering is supported.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
},
{
"name": "maxResults",
"type": "integer",
"multiValued": False,
"description": "The maximum number of resources returned in a search response.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
}
]
},
{
"name": "changePassword",
"type": "complex",
"multiValued": False,
"description": "A complex attribute that indicates the service provider's support for change password requests.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value that indicates whether the change password operation is supported.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
}
]
},
{
"name": "sort",
"type": "complex",
"multiValued": False,
"description": "A complex attribute that indicates the service provider's support for sorting.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value that indicates whether sorting is supported.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
}
]
},
{
"name": "etag",
"type": "complex",
"multiValued": False,
"description": "A complex attribute that indicates the service provider's support for ETag in responses.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"subAttributes": [
{
"name": "supported",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value that indicates whether ETag is supported.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
}
]
},
{
"name": "authenticationSchemes",
"type": "complex",
"multiValued": True,
"description": "A multi-valued complex attribute that defines the authentication schemes supported by the service provider.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"subAttributes": [
{
"name": "type",
"type": "string",
"multiValued": False,
"description": "The type of the authentication scheme.",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
},
{
"name": "name",
"type": "string",
"multiValued": False,
"description": "The common name of the authentication scheme.",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
},
{
"name": "description",
"type": "string",
"multiValued": False,
"description": "A description of the authentication scheme.",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
},
{
"name": "specUri",
"type": "string",
"multiValued": False,
"description": "A URI that points to the authentication scheme's specification.",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
},
{
"name": "documentationUri",
"type": "string",
"multiValued": False,
"description": "A URI that points to the documentation for this scheme.",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
}
]
}
]
},
"urn:ietf:params:scim:schemas:core:2.0:ResourceType": {
"id": "urn:ietf:params:scim:schemas:core:2.0:ResourceType",
"name": "ResourceType",
"description": "Represents the configuration of a SCIM resource type.",
"attributes": [
{
"name": "id",
"type": "string",
"multiValued": False,
"description": "The resource type's unique identifier.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "global"
},
{
"name": "name",
"type": "string",
"multiValued": False,
"description": "The resource type's humanreadable name.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "description",
"type": "string",
"multiValued": False,
"description": "A brief description of the resource type.",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
},
{
"name": "endpoint",
"type": "string",
"multiValued": False,
"description": "The HTTP endpoint where the resource type is exposed.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "schema",
"type": "string",
"multiValued": False,
"description": "The primary schema URN for the resource type.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "schemaExtensions",
"type": "complex",
"multiValued": True,
"description": "A list of schema extensions for the resource type.",
"required": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"subAttributes": [
{
"name": "schema",
"type": "string",
"multiValued": False,
"description": "The URN of the extension schema.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "required",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value indicating whether the extension is required.",
"required": True,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
}
]
}
]
},
"urn:ietf:params:scim:schemas:core:2.0:Schema": {
"id": "urn:ietf:params:scim:schemas:core:2.0:Schema",
"name": "Schema",
"description": "Defines the attributes and metadata for a SCIM resource.",
"attributes": [
{
"name": "id",
"type": "string",
"multiValued": False,
"description": "The unique identifier of the schema.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "global"
},
{
"name": "name",
"type": "string",
"multiValued": False,
"description": "The humanreadable name of the schema.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "description",
"type": "string",
"multiValued": False,
"description": "A description of the schema.",
"required": False,
"caseExact": False,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none"
},
{
"name": "attributes",
"type": "complex",
"multiValued": True,
"description": "The list of attributes defined by the schema.",
"required": True,
"mutability": "readOnly",
"returned": "default",
"uniqueness": "none",
"subAttributes": [
{
"name": "name",
"type": "string",
"multiValued": False,
"description": "Attribute name.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "type",
"type": "string",
"multiValued": False,
"description": "Data type of the attribute.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "multiValued",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value that indicates whether the attribute is multi-valued.",
"required": True,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "required",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value that indicates whether the attribute is required.",
"required": True,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "caseExact",
"type": "boolean",
"multiValued": False,
"description": "A Boolean value that indicates whether string comparisons are case sensitive.",
"required": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "mutability",
"type": "string",
"multiValued": False,
"description": "Defines whether the attribute is readOnly, readWrite, immutable, or writeOnly.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "returned",
"type": "string",
"multiValued": False,
"description": "Specifies when the attribute is returned in a response.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
},
{
"name": "uniqueness",
"type": "string",
"multiValued": False,
"description": "Indicates whether the attribute must be unique.",
"required": True,
"caseExact": False,
"mutability": "readOnly",
"returned": "always",
"uniqueness": "none"
}
]
}
]
}
}
@public_app.get("/Schemas", dependencies=[Depends(auth_required)])
async def get_schemas():
return JSONResponse(
status_code=200,
content={
"totalResults": len(SCHEMA_IDS_TO_SCHEMA_DETAILS),
"itemsPerPage": len(SCHEMA_IDS_TO_SCHEMA_DETAILS),
"startIndex": 1,
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
"Resources": [
value
for _, value in sorted(SCHEMA_IDS_TO_SCHEMA_DETAILS.items())
]
},
)
@public_app.get("/Schemas/{schema_id}", dependencies=[Depends(auth_required)])
async def get_schema(schema_id: str):
if schema_id not in SCHEMA_IDS_TO_SCHEMA_DETAILS:
return _not_found_error_response(schema_id)
return JSONResponse(
status_code=200,
content=SCHEMA_IDS_TO_SCHEMA_DETAILS[schema_id],
)
"""
User endpoints
"""
class Name(BaseModel):
givenName: str
familyName: str
class Email(BaseModel):
primary: bool
value: str
type: str
class UserRequest(BaseModel):
schemas: list[str]
userName: str
name: Name
emails: list[Email]
displayName: str
locale: str
externalId: str
groups: list[dict]
password: str = Field(default=None)
active: bool
class UserResponse(BaseModel):
schemas: list[str]
id: str
userName: str
name: Name
emails: list[Email] # ignore for now
displayName: str
locale: str
externalId: str
active: bool
groups: list[dict]
meta: dict = Field(default={"resourceType": "User"})
class PatchUserRequest(BaseModel):
schemas: list[str]
Operations: list[dict]
@public_app.get("/Users", dependencies=[Depends(auth_required)])
async def get_users(
start_index: int = Query(1, alias="startIndex"),
count: Optional[int] = Query(None, alias="count"),
email: Optional[str] = Query(None, alias="filter"),
):
"""Get SCIM Users"""
if email:
email = email.split(" ")[2].strip('"')
result_users = users.get_users_paginated(start_index, count, email)
serialized_users = []
for user in result_users:
serialized_users.append(
UserResponse(
schemas = ["urn:ietf:params:scim:schemas:core:2.0:User"],
id = user["data"]["userId"],
userName = user["email"],
name = Name.model_validate(user["data"]["name"]),
emails = [Email.model_validate(user["data"]["emails"])],
displayName = user["name"],
locale = user["data"]["locale"],
externalId = user["internalId"],
active = True, # ignore for now, since, can't insert actual timestamp
groups = [], # ignore
).model_dump(mode='json')
)
return JSONResponse(
status_code=200,
content={
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
"totalResults": len(serialized_users),
"startIndex": start_index,
"itemsPerPage": len(serialized_users),
"Resources": serialized_users,
},
)
@public_app.get("/Users/{user_id}", dependencies=[Depends(auth_required)])
def get_user(user_id: str):
"""Get SCIM User"""
tenant_id = 1
user = users.get_by_uuid(user_id, tenant_id)
if not user:
return JSONResponse(
status_code=404,
content={
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"detail": "User not found",
"status": 404,
}
)
res = UserResponse(
schemas = ["urn:ietf:params:scim:schemas:core:2.0:User"],
id = user["data"]["userId"],
userName = user["email"],
name = Name.model_validate(user["data"]["name"]),
emails = [Email.model_validate(user["data"]["emails"])],
displayName = user["name"],
locale = user["data"]["locale"],
externalId = user["internalId"],
active = True, # ignore for now, since, can't insert actual timestamp
groups = [], # ignore
)
return JSONResponse(status_code=201, content=res.model_dump(mode='json'))
@public_app.post("/Users", dependencies=[Depends(auth_required)])
async def create_user(r: UserRequest):
"""Create SCIM User"""
tenant_id = 1
existing_user = users.get_by_email_only(r.userName)
deleted_user = users.get_deleted_user_by_email(r.userName)
if existing_user:
return JSONResponse(
status_code = 409,
content = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"detail": "User already exists in the database.",
"status": 409,
}
)
elif deleted_user:
user_id = users.get_deleted_by_uuid(deleted_user["data"]["userId"], tenant_id)
user = users.restore_scim_user(user_id=user_id["userId"], tenant_id=tenant_id, user_uuid=uuid.uuid4().hex, email=r.emails[0].value, admin=False,
display_name=r.displayName, full_name=r.name.model_dump(mode='json'), emails=r.emails[0].model_dump(mode='json'),
origin="okta", locale=r.locale, role_id=None, internal_id=r.externalId)
else:
try:
user = users.create_scim_user(tenant_id=tenant_id, user_uuid=uuid.uuid4().hex, email=r.emails[0].value, admin=False,
display_name=r.displayName, full_name=r.name.model_dump(mode='json'), emails=r.emails[0].model_dump(mode='json'),
origin="okta", locale=r.locale, role_id=None, internal_id=r.externalId)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
res = UserResponse(
schemas = ["urn:ietf:params:scim:schemas:core:2.0:User"],
id = user["data"]["userId"],
userName = r.userName,
name = r.name,
emails = r.emails,
displayName = r.displayName,
locale = r.locale,
externalId = r.externalId,
active = r.active, # ignore for now, since, can't insert actual timestamp
groups = [], # ignore
)
return JSONResponse(status_code=201, content=res.model_dump(mode='json'))
@public_app.put("/Users/{user_id}", dependencies=[Depends(auth_required)])
def update_user(user_id: str, r: UserRequest):
"""Update SCIM User"""
tenant_id = 1
user = users.get_by_uuid(user_id, tenant_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
changes = r.model_dump(mode='json', exclude={"schemas", "emails", "name", "locale", "groups", "password", "active"}) # some of these should be added later if necessary
nested_changes = r.model_dump(mode='json', include={"name", "emails"})
mapping = {"userName": "email", "displayName": "name", "externalId": "internal_id"} # mapping between scim schema field names and local database model, can be done as config?
for k, v in mapping.items():
if k in changes:
changes[v] = changes.pop(k)
changes["data"] = {}
for k, v in nested_changes.items():
value_to_insert = v[0] if k == "emails" else v
changes["data"][k] = value_to_insert
try:
users.update(tenant_id, user["userId"], changes)
res = UserResponse(
schemas = ["urn:ietf:params:scim:schemas:core:2.0:User"],
id = user["data"]["userId"],
userName = r.userName,
name = r.name,
emails = r.emails,
displayName = r.displayName,
locale = r.locale,
externalId = r.externalId,
active = r.active, # ignore for now, since, can't insert actual timestamp
groups = [], # ignore
)
return JSONResponse(status_code=201, content=res.model_dump(mode='json'))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@public_app.patch("/Users/{user_id}", dependencies=[Depends(auth_required)])
def deactivate_user(user_id: str, r: PatchUserRequest):
"""Deactivate user, soft-delete"""
tenant_id = 1
active = r.model_dump(mode='json')["Operations"][0]["value"]["active"]
if active:
raise HTTPException(status_code=404, detail="Activating user is not supported")
user = users.get_by_uuid(user_id, tenant_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
users.delete_member_as_admin(tenant_id, user["userId"])
return Response(status_code=204, content="")
@public_app.delete("/Users/{user_uuid}", dependencies=[Depends(auth_required)])
def delete_user(user_uuid: str):
"""Delete user from database, hard-delete"""
tenant_id = 1
user = users.get_by_uuid(user_uuid, tenant_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
users.__hard_delete_user_uuid(user_uuid)
return Response(status_code=204, content="")
"""
Group endpoints
"""
class Operation(BaseModel):
op: str
path: str = Field(default=None)
value: list[dict] | dict = Field(default=None)
class GroupGetResponse(BaseModel):
schemas: list[str] = Field(default=["urn:ietf:params:scim:api:messages:2.0:ListResponse"])
totalResults: int
startIndex: int
itemsPerPage: int
resources: list = Field(alias="Resources")
class GroupRequest(BaseModel):
schemas: list[str] = Field(default=["urn:ietf:params:scim:schemas:core:2.0:Group"])
displayName: str = Field(default=None)
members: list = Field(default=None)
operations: list[Operation] = Field(default=None, alias="Operations")
class GroupPatchRequest(BaseModel):
schemas: list[str] = Field(default=["urn:ietf:params:scim:api:messages:2.0:PatchOp"])
operations: list[Operation] = Field(alias="Operations")
class GroupResponse(BaseModel):
schemas: list[str] = Field(default=["urn:ietf:params:scim:schemas:core:2.0:Group"])
id: str
displayName: str
members: list
meta: dict = Field(default={"resourceType": "Group"})
@public_app.get("/Groups", dependencies=[Depends(auth_required)])
def get_groups(
start_index: int = Query(1, alias="startIndex"),
count: Optional[int] = Query(None, alias="count"),
group_name: Optional[str] = Query(None, alias="filter"),
):
"""Get groups"""
tenant_id = 1
res = []
if group_name:
group_name = group_name.split(" ")[2].strip('"')
groups = roles.get_roles_with_uuid_paginated(tenant_id, start_index, count, group_name)
res = [{
"id": group["data"]["groupId"],
"meta": {
"created": group["createdAt"],
"lastModified": "", # not currently a field
"version": "v1.0"
},
"displayName": group["name"]
} for group in groups
]
return JSONResponse(
status_code=200,
content=GroupGetResponse(
totalResults=len(groups),
startIndex=start_index,
itemsPerPage=len(groups),
Resources=res
).model_dump(mode='json'))
@public_app.get("/Groups/{group_id}", dependencies=[Depends(auth_required)])
def get_group(group_id: str):
"""Get a group by id"""
tenant_id = 1
group = roles.get_role_by_group_id(tenant_id, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
members = roles.get_users_by_group_uuid(tenant_id, group["data"]["groupId"])
members = [{"value": member["data"]["userId"], "display": member["name"]} for member in members]
return JSONResponse(
status_code=200,
content=GroupResponse(
id=group["data"]["groupId"],
displayName=group["name"],
members=members,
).model_dump(mode='json'))
@public_app.post("/Groups", dependencies=[Depends(auth_required)])
def create_group(r: GroupRequest):
"""Create a group"""
tenant_id = 1
member_role = roles.get_member_permissions(tenant_id)
try:
data = schemas.RolePayloadSchema(name=r.displayName, permissions=member_role["permissions"]) # permissions by default are same as for member role
group = roles.create_as_admin(tenant_id, uuid.uuid4().hex, data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
added_members = []
for member in r.members:
user = users.get_by_uuid(member["value"], tenant_id)
if user:
users.update(tenant_id, user["userId"], {"role_id": group["roleId"]})
added_members.append({
"value": user["data"]["userId"],
"display": user["name"]
})
return JSONResponse(
status_code=200,
content=GroupResponse(
id=group["data"]["groupId"],
displayName=group["name"],
members=added_members,
).model_dump(mode='json'))
@public_app.put("/Groups/{group_id}", dependencies=[Depends(auth_required)])
def update_put_group(group_id: str, r: GroupRequest):
"""Update a group or members of the group (not used by anything yet)"""
tenant_id = 1
group = roles.get_role_by_group_id(tenant_id, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
if r.operations and r.operations[0].op == "replace" and r.operations[0].path is None:
roles.update_group_name(tenant_id, group["data"]["groupId"], r.operations[0].value["displayName"])
return Response(status_code=200, content="")
members = r.members
modified_members = []
for member in members:
user = users.get_by_uuid(member["value"], tenant_id)
if user:
users.update(tenant_id, user["userId"], {"role_id": group["roleId"]})
modified_members.append({
"value": user["data"]["userId"],
"display": user["name"]
})
return JSONResponse(
status_code=200,
content=GroupResponse(
id=group_id,
displayName=group["name"],
members=modified_members,
).model_dump(mode='json'))
@public_app.patch("/Groups/{group_id}", dependencies=[Depends(auth_required)])
def update_patch_group(group_id: str, r: GroupPatchRequest):
"""Update a group or members of the group, used by AIW"""
tenant_id = 1
group = roles.get_role_by_group_id(tenant_id, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
if r.operations[0].op == "replace" and r.operations[0].path is None:
roles.update_group_name(tenant_id, group["data"]["groupId"], r.operations[0].value["displayName"])
return Response(status_code=200, content="")
modified_members = []
for op in r.operations:
if op.op == "add" or op.op == "replace":
# Both methods work as "replace"
for u in op.value:
user = users.get_by_uuid(u["value"], tenant_id)
if user:
users.update(tenant_id, user["userId"], {"role_id": group["roleId"]})
modified_members.append({
"value": user["data"]["userId"],
"display": user["name"]
})
elif op.op == "remove":
user_id = re.search(r'\[value eq \"([a-f0-9]+)\"\]', op.path).group(1)
roles.remove_group_membership(tenant_id, group_id, user_id)
return JSONResponse(
status_code=200,
content=GroupResponse(
id=group_id,
displayName=group["name"],
members=modified_members,
).model_dump(mode='json'))
@public_app.delete("/Groups/{group_id}", dependencies=[Depends(auth_required)])
def delete_group(group_id: str):
"""Delete a group, hard-delete"""
# possibly need to set the user's roles to default member role, instead of null
tenant_id = 1
group = roles.get_role_by_group_id(tenant_id, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
roles.delete_scim_group(tenant_id, group["data"]["groupId"])
return Response(status_code=200, content="")