diff --git a/assist/servers/websocket.js b/assist/servers/websocket.js index 79956e646..698f8be4a 100644 --- a/assist/servers/websocket.js +++ b/assist/servers/websocket.js @@ -1,126 +1,47 @@ -const _io = require('socket.io'); const express = require('express'); const { - extractRoomId, extractPeerId, - extractProjectKeyFromRequest, - extractSessionIdFromRequest, hasFilters, isValidSession, extractPayloadFromRequest, sortPaginate, - getValidAttributes, - uniqueAutocomplete, getAvailableRooms, - getCompressionConfig } = require('../utils/helper'); const { IDENTITIES, - EVENTS_DEFINITION, - extractSessionInfo, socketConnexionTimeout, - errorHandler, authorizer } = require('../utils/assistHelper'); const { - startAssist, - endAssist, - handleEvent -} = require('../utils/stats'); + onConnect +} = require('../utils/socketHandlers'); +const { + createSocketIOServer +} = require('../utils/wsServer'); +const { + respond, + socketsList, + socketsListByProject, + socketsLiveByProject, + autocomplete +} = require('../utils/httpHandlers'); + +let io; const wsRouter = express.Router(); -let io; -const debug = process.env.debug === "1"; - -const createSocketIOServer = function (server, prefix) { - io = _io(server, { - maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: (prefix ? prefix : '') + '/socket', - ...getCompressionConfig() - }); -} - -const respond = function (res, data) { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify({"data": data})); -} - -const socketsList = async function (req, res) { - debug && console.log("[WS]looking for all available sessions"); - let filters = await extractPayloadFromRequest(req); - let withFilters = hasFilters(filters); - let liveSessionsPerProject = {}; - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(roomId); - if (projectKey !== undefined) { - liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); - if (withFilters) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo - && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessionsPerProject[projectKey].add(sessionId); - } - } - } else { - liveSessionsPerProject[projectKey].add(sessionId); - } - } - } - let liveSessions = {}; - liveSessionsPerProject.forEach((sessions, projectId) => { - liveSessions[projectId] = Array.from(sessions); - }); - respond(res, liveSessions); -} - -const socketsListByProject = async function (req, res) { - debug && console.log("[WS]looking for available sessions"); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - let filters = await extractPayloadFromRequest(req); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(roomId); - if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - if (withFilters) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo - && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions.add(sessionId); - } - } - } else { - liveSessions.add(sessionId); - } - } - } - let sessions = Array.from(liveSessions); - respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) - : sessions.length > 0 ? sessions[0] - : null); -} +const debug_log = process.env.debug === "1"; const socketsLive = async function (req, res) { - debug && console.log("[WS]looking for all available LIVE sessions"); - let filters = await extractPayloadFromRequest(req); + debug_log && console.log("[WS]looking for all available LIVE sessions"); + let filters = await extractPayloadFromRequest(req, res); let withFilters = hasFilters(filters); let liveSessionsPerProject = {}; let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey} = extractPeerId(peerId); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); if (projectKey !== undefined) { - let connected_sockets = await io.in(peerId).fetchSockets(); + let connected_sockets = await io.in(roomId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); @@ -142,114 +63,6 @@ const socketsLive = async function (req, res) { respond(res, sortPaginate(liveSessions, filters)); } -const socketsLiveByProject = async function (req, res) { - debug && console.log("[WS]looking for available LIVE sessions"); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - let filters = await extractPayloadFromRequest(req); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - const sessIDs = new Set(); - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(roomId); - if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - let connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (withFilters) { - if (item.handshake.query.sessionInfo && - isValidSession(item.handshake.query.sessionInfo, filters.filter) && - !sessIDs.has(item.handshake.query.sessionInfo.sessionID) - ) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } else { - if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } - } - } - } - } - let sessions = Array.from(liveSessions); - respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); -} - -const autocomplete = async function (req, res) { - debug && console.log("[WS]autocomplete"); - let _projectKey = extractProjectKeyFromRequest(req); - let filters = await extractPayloadFromRequest(req); - let results = []; - if (filters.query && Object.keys(filters.query).length > 0) { - let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey} = extractPeerId(peerId); - if (projectKey === _projectKey) { - let connected_sockets = await io.in(peerId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { - results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)]; - } - } - } - } - } - respond(res, uniqueAutocomplete(results)); -} - -const findSessionSocketId = async (io, roomId, tabId) => { - let pickFirstSession = tabId === undefined; - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (pickFirstSession) { - return item.id; - } else if (item.tabId === tabId) { - return item.id; - } - } - } - return null; -}; - -async function sessions_agents_count(io, socket) { - let c_sessions = 0, c_agents = 0; - const rooms = await getAvailableRooms(io); - if (rooms.get(socket.roomId)) { - const connected_sockets = await io.in(socket.roomId).fetchSockets(); - - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - c_sessions++; - } else { - c_agents++; - } - } - } else { - c_agents = -1; - c_sessions = -1; - } - return {c_sessions, c_agents}; -} - -async function get_all_agents_ids(io, socket) { - let agents = []; - const rooms = await getAvailableRooms(io); - if (rooms.get(socket.roomId)) { - const connected_sockets = await io.in(socket.roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.agent) { - agents.push(item.id); - } - } - } - return agents; -} - wsRouter.get(`/sockets-list`, socketsList); wsRouter.post(`/sockets-list`, socketsList); wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); @@ -264,160 +77,10 @@ wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject); wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveByProject); -async function onConnect(socket) { - socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err)); - debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); - socket._connectedAt = new Date(); - - let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.handshake.query.peerId); - socket.peerId = socket.handshake.query.peerId; - socket.roomId = extractRoomId(socket.peerId); - connTabId = connTabId ?? (Math.random() + 1).toString(36).substring(2); - socket.tabId = connTabId; - socket.sessId = connSessionId; - socket.projectId = socket.handshake.query.projectId; - socket.identity = socket.handshake.query.identity; - debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`); - - let {c_sessions, c_agents} = await sessions_agents_count(io, socket); - if (socket.identity === IDENTITIES.session) { - if (c_sessions > 0) { - const rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey} = extractPeerId(roomId); - if (projectKey === connProjectKey) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.tabId === connTabId) { - debug && console.log(`session already connected, refusing new connexion`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); - return socket.disconnect(); - } - } - } - } - } - extractSessionInfo(socket); - if (c_agents > 0) { - debug && console.log(`notifying new session about agent-existence`); - let agents_ids = await get_all_agents_ids(io, socket); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); - } - - } else if (c_sessions <= 0) { - debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - } - await socket.join(socket.roomId); - const rooms = await getAvailableRooms(io); - if (rooms.get(socket.roomId)) { - debug && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${rooms.get(socket.roomId).size}`); - } - if (socket.identity === IDENTITIES.agent) { - if (socket.handshake.query.agentInfo !== undefined) { - socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); - socket.agentID = socket.handshake.query.agentInfo.id; - // Stats - startAssist(socket, socket.agentID); - } - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); - } - - // Set disconnect handler - socket.on('disconnect', () => onDisconnect(socket)); - - // - socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, (...args) => onUpdateEvent(socket, ...args)); - - // - socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err)); - socket.on(EVENTS_DEFINITION.listen.CONNECT_FAILED, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_FAILED, err)); - - // - socket.onAny((eventName, ...args) => onAny(socket, eventName, ...args)); -} - -async function onDisconnect(socket) { - debug && console.log(`${socket.id} disconnected from ${socket.roomId}`); - if (socket.identity === IDENTITIES.agent) { - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); - // Stats - endAssist(socket, socket.agentID); - } - debug && console.log("checking for number of connected agents and sessions"); - let {c_sessions, c_agents} = await sessions_agents_count(io, socket); - if (c_sessions === -1 && c_agents === -1) { - debug && console.log(`room not found: ${socket.roomId}`); - } - if (c_sessions === 0) { - debug && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - } - if (c_agents === 0) { - debug && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); - } -} - -async function onUpdateEvent(socket, ...args) { - debug && console.log(`${socket.id} sent update event.`); - if (socket.identity !== IDENTITIES.session) { - debug && console.log('Ignoring update event.'); - return - } - // Back compatibility (add top layer with meta information) - if (args[0]?.meta === undefined && socket.identity === IDENTITIES.session) { - args[0] = {meta: {tabId: socket.tabId, version: 1}, data: args[0]}; - } - Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0]?.meta?.tabId}); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); - // Update sessionInfo for all sessions in room - const rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - if (roomId === socket.roomId) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { - Object.assign(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); - } - } - } - } -} - -async function onAny(socket, eventName, ...args) { - if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) { - debug && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`); - return - } - // Back compatibility (add top layer with meta information) - if (args[0]?.meta === undefined && socket.identity === IDENTITIES.session) { - args[0] = {meta: {tabId: socket.tabId, version: 1}, data: args[0]}; - } - if (socket.identity === IDENTITIES.session) { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.roomId}`); - // TODO: emit message to all agents in the room (except tabs) - socket.to(socket.roomId).emit(eventName, args[0]); - } else { - // Stats - handleEvent(eventName, socket, args[0]); - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`); - let socketId = await findSessionSocketId(io, socket.roomId, args[0]?.meta?.tabId); - if (socketId === null) { - debug && console.log(`session not found for:${socket.roomId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - } else { - debug && console.log("message sent"); - io.to(socketId).emit(eventName, socket.id, args[0]); - } - } -} - module.exports = { wsRouter, start: (server, prefix) => { - createSocketIOServer(server, prefix); + io = createSocketIOServer(server, prefix); io.use(async (socket, next) => await authorizer.check(socket, next)); io.on('connection', (socket) => onConnect(socket)); @@ -430,13 +93,13 @@ module.exports = { const arr = Array.from(rooms); const filtered = arr.filter(room => !room[1].has(room[0])); for (let i of filtered) { - let {projectKey, sessionId, tabId} = extractPeerId(i[0]); + let {projectKey, sessionId} = extractPeerId(i[0]); if (projectKey !== null && sessionId !== null) { count++; } } console.log(` ====== Valid Rooms: ${count} ====== `); - if (debug) { + if (debug_log) { for (let item of filtered) { console.log(`Room: ${item[0]} connected: ${item[1].size}`); } diff --git a/assist/utils/assistHelper.js b/assist/utils/assistHelper.js index dc5444940..6ce254bd4 100644 --- a/assist/utils/assistHelper.js +++ b/assist/utils/assistHelper.js @@ -48,11 +48,14 @@ const BASE_sessionInfo = { "projectId": 0 }; - +/** + * extracts and populate socket with information + * @Param {socket} used socket + * */ const extractSessionInfo = function (socket) { if (socket.handshake.query.sessionInfo !== undefined) { - debug && console.log("received headers"); - debug && console.log(socket.handshake.headers); + debug && console.log(`received headers: ${socket.handshake.headers}`); + socket.handshake.query.sessionInfo = JSON.parse(socket.handshake.query.sessionInfo); socket.handshake.query.sessionInfo = {...BASE_sessionInfo, ...socket.handshake.query.sessionInfo}; @@ -106,16 +109,13 @@ function socketConnexionTimeout(io) { } function errorHandler(listenerName, error) { - console.error(`Error detected from ${listenerName}`); - console.error(error); + console.error(`Error detected from ${listenerName}\n${error}`); } - function generateAccessToken(payload) { return jwt.sign(payload, process.env.ASSIST_JWT_SECRET, {expiresIn: process.env.ASSIST_JWT_EXPIRATION || '30m'}); } - const JWT_TOKEN_PREFIX = "Bearer "; function check(socket, next) { diff --git a/assist/utils/extractors.js b/assist/utils/extractors.js new file mode 100644 index 000000000..9908bc960 --- /dev/null +++ b/assist/utils/extractors.js @@ -0,0 +1,13 @@ +const { + extractProjectKeyFromRequest, + extractSessionIdFromRequest, + extractPayloadFromRequest, + getAvailableRooms +} = require("./helper"); + +module.exports = { + extractProjectKeyFromRequest, + extractSessionIdFromRequest, + extractPayloadFromRequest, + getAvailableRooms +} \ No newline at end of file diff --git a/assist/utils/helper.js b/assist/utils/helper.js index e842282a0..9f765786f 100644 --- a/assist/utils/helper.js +++ b/assist/utils/helper.js @@ -142,7 +142,7 @@ const transformFilters = function (filter) { } return filter; } -const extractPayloadFromRequest = async function (req) { +const extractPayloadFromRequest = async function (req, res) { let filters = { "query": {}, // for autocomplete "filter": {}, // for sessions search diff --git a/assist/utils/httpHandlers.js b/assist/utils/httpHandlers.js new file mode 100644 index 000000000..a11ff5e16 --- /dev/null +++ b/assist/utils/httpHandlers.js @@ -0,0 +1,164 @@ +const { + hasFilters, + extractPeerId, + isValidSession, + sortPaginate, + getValidAttributes, + uniqueAutocomplete +} = require("./helper"); +const { + extractProjectKeyFromRequest, + extractSessionIdFromRequest, + extractPayloadFromRequest, + getAvailableRooms +} = require("./extractors"); +const { + IDENTITIES +} = require("./assistHelper"); +const { + getServer +} = require('../utils/wsServer'); + +const debug_log = process.env.debug === "1"; + +const respond = function (res, data) { + let result = {data} + if (process.env.uws !== "true") { + res.statusCode = 200; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify(result)); + } else { + res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); + } +} + +const socketsList = async function (req, res) { + let io = getServer(); + debug_log && console.log("[WS]looking for all available sessions"); + let filters = await extractPayloadFromRequest(req, res); + let withFilters = hasFilters(filters); + let liveSessionsPerProject = {}; + let rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey, sessionId} = extractPeerId(roomId); + if (projectKey !== undefined) { + liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); + if (withFilters) { + const connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo + && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { + liveSessionsPerProject[projectKey].add(sessionId); + } + } + } else { + liveSessionsPerProject[projectKey].add(sessionId); + } + } + } + let liveSessions = {}; + liveSessionsPerProject.forEach((sessions, projectId) => { + liveSessions[projectId] = Array.from(sessions); + }); + respond(res, liveSessions); +} + +const socketsListByProject = async function (req, res) { + let io = getServer(); + debug_log && console.log("[WS]looking for available sessions"); + let _projectKey = extractProjectKeyFromRequest(req); + let _sessionId = extractSessionIdFromRequest(req); + let filters = await extractPayloadFromRequest(req, res); + let withFilters = hasFilters(filters); + let liveSessions = new Set(); + let rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey, sessionId} = extractPeerId(roomId); + if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { + if (withFilters) { + const connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo + && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { + liveSessions.add(sessionId); + } + } + } else { + liveSessions.add(sessionId); + } + } + } + let sessions = Array.from(liveSessions); + respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) + : sessions.length > 0 ? sessions[0] + : null); +} + +const socketsLiveByProject = async function (req, res) { + let io = getServer(); + debug_log && console.log("[WS]looking for available LIVE sessions"); + let _projectKey = extractProjectKeyFromRequest(req); + let _sessionId = extractSessionIdFromRequest(req); + let filters = await extractPayloadFromRequest(req, res); + let withFilters = hasFilters(filters); + let liveSessions = new Set(); + const sessIDs = new Set(); + let rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey, sessionId} = extractPeerId(roomId); + if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { + let connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session) { + if (withFilters) { + if (item.handshake.query.sessionInfo && + isValidSession(item.handshake.query.sessionInfo, filters.filter) && + !sessIDs.has(item.handshake.query.sessionInfo.sessionID) + ) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); + } + } else { + if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); + } + } + } + } + } + } + let sessions = Array.from(liveSessions); + respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); +} + +const autocomplete = async function (req, res) { + let io = getServer(); + debug_log && console.log("[WS]autocomplete"); + let _projectKey = extractProjectKeyFromRequest(req); + let filters = await extractPayloadFromRequest(req); + let results = []; + if (filters.query && Object.keys(filters.query).length > 0) { + let rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); + if (projectKey === _projectKey) { + let connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { + results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)]; + } + } + } + } + } + respond(res, uniqueAutocomplete(results)); +} + +module.exports = { + respond, + socketsList, + socketsListByProject, + socketsLiveByProject, + autocomplete +} \ No newline at end of file diff --git a/assist/utils/socketHandlers.js b/assist/utils/socketHandlers.js new file mode 100644 index 000000000..1c8fa4dda --- /dev/null +++ b/assist/utils/socketHandlers.js @@ -0,0 +1,234 @@ +const { + extractPeerId, + extractRoomId, + getAvailableRooms +} = require("./helper"); +const { + IDENTITIES, + EVENTS_DEFINITION, + extractSessionInfo, + errorHandler +} = require("./assistHelper"); +const { + startAssist, + endAssist, + handleEvent +} = require("./stats"); +const { + getServer +} = require('../utils/wsServer'); + +const debug_log = process.env.debug === "1"; +const error_log = process.env.ERROR === "1"; + +const findSessionSocketId = async (io, roomId, tabId) => { + let pickFirstSession = tabId === undefined; + const connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session) { + if (pickFirstSession) { + return item.id; + } else if (item.tabId === tabId) { + return item.id; + } + } + } + return null; +}; + +async function sessions_agents_count(io, socket) { + let c_sessions = 0, c_agents = 0; + const rooms = await getAvailableRooms(io); + if (rooms.get(socket.roomId)) { + const connected_sockets = await io.in(socket.roomId).fetchSockets(); + + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session) { + c_sessions++; + } else { + c_agents++; + } + } + } else { + c_agents = -1; + c_sessions = -1; + } + return {c_sessions, c_agents}; +} + +async function get_all_agents_ids(io, socket) { + let agents = []; + const rooms = await getAvailableRooms(io); + if (rooms.get(socket.roomId)) { + const connected_sockets = await io.in(socket.roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.agent) { + agents.push(item.id); + } + } + } + return agents; +} + +function processNewSocket(socket) { + socket._connectedAt = new Date(); + socket.identity = socket.handshake.query.identity; + socket.peerId = socket.handshake.query.peerId; + let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.peerId); + socket.roomId = extractRoomId(socket.peerId); + socket.projectId = socket.handshake.query.projectId; + socket.projectKey = connProjectKey; + socket.sessId = connSessionId; + socket.tabId = connTabId ?? (Math.random() + 1).toString(36).substring(2); + debug_log && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`); +} + +async function onConnect(socket) { + debug_log && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); + processNewSocket(socket); + + const io = getServer(); + let {c_sessions, c_agents} = await sessions_agents_count(io, socket); + if (socket.identity === IDENTITIES.session) { + // Check if session already connected, if so, refuse new connexion + if (c_sessions > 0) { + const rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); + if (projectKey === socket.projectKey) { + const connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.tabId === socket.tabId) { + error_log && console.log(`session already connected, refusing new connexion, peerId: ${socket.peerId}`); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); + return socket.disconnect(); + } + } + } + } + } + extractSessionInfo(socket); + // Inform all connected agents about reconnected session + if (c_agents > 0) { + debug_log && console.log(`notifying new session about agent-existence`); + let agents_ids = await get_all_agents_ids(io, socket); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); + } + + } else if (c_sessions <= 0) { + debug_log && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + } + await socket.join(socket.roomId); + const rooms = await getAvailableRooms(io); + if (rooms.has(socket.roomId)) { + let connectedSockets = await io.in(socket.roomId).fetchSockets(); + debug_log && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${connectedSockets.length}`); + } + if (socket.identity === IDENTITIES.agent) { + if (socket.handshake.query.agentInfo !== undefined) { + socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); + socket.agentID = socket.handshake.query.agentInfo.id; + // Stats + startAssist(socket, socket.agentID); + } + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + } + + // Set disconnect handler + socket.on('disconnect', () => onDisconnect(socket)); + + // Handle update event + socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, (...args) => onUpdateEvent(socket, ...args)); + + // Handle errors + socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err)); + socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err)); + socket.on(EVENTS_DEFINITION.listen.CONNECT_FAILED, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_FAILED, err)); + + // Handle all other events + socket.onAny((eventName, ...args) => onAny(socket, eventName, ...args)); +} + +async function onDisconnect(socket) { + const io = getServer(); + debug_log && console.log(`${socket.id} disconnected from ${socket.roomId}`); + if (socket.identity === IDENTITIES.agent) { + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); + // Stats + endAssist(socket, socket.agentID); + } + debug_log && console.log("checking for number of connected agents and sessions"); + let {c_sessions, c_agents} = await sessions_agents_count(io, socket); + if (c_sessions === -1 && c_agents === -1) { + debug_log && console.log(`room not found: ${socket.roomId}`); + } + if (c_sessions === 0) { + debug_log && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + } + if (c_agents === 0) { + debug_log && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); + } +} + +async function onUpdateEvent(socket, ...args) { + const io = getServer(); + debug_log && console.log(`${socket.id} sent update event.`); + if (socket.identity !== IDENTITIES.session) { + debug_log && console.log('Ignoring update event.'); + return + } + // Back compatibility (add top layer with meta information) + if (args[0]?.meta === undefined && socket.identity === IDENTITIES.session) { + args[0] = {meta: {tabId: socket.tabId, version: 1}, data: args[0]}; + } + Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0]?.meta?.tabId}); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); + // Update sessionInfo for all sessions in room + const rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + if (roomId === socket.roomId) { + const connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { + Object.assign(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); + } + } + } + } +} + +async function onAny(socket, eventName, ...args) { + const io = getServer(); + if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) { + debug_log && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`); + return + } + // Back compatibility (add top layer with meta information) + if (args[0]?.meta === undefined && socket.identity === IDENTITIES.session) { + args[0] = {meta: {tabId: socket.tabId, version: 1}, data: args[0]}; + } + if (socket.identity === IDENTITIES.session) { + debug_log && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.roomId}`); + socket.to(socket.roomId).emit(eventName, args[0]); + } else { + // Stats + handleEvent(eventName, socket, args[0]); + debug_log && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`); + let socketId = await findSessionSocketId(io, socket.roomId, args[0]?.meta?.tabId); + if (socketId === null) { + debug_log && console.log(`session not found for:${socket.roomId}`); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + } else { + debug_log && console.log("message sent"); + io.to(socketId).emit(eventName, socket.id, args[0]); + } + } +} + +module.exports = { + onConnect, +} \ No newline at end of file diff --git a/assist/utils/stats.js b/assist/utils/stats.js index 23e0bf261..c24b38687 100644 --- a/assist/utils/stats.js +++ b/assist/utils/stats.js @@ -1,20 +1,6 @@ const StatsHost = process.env.STATS_HOST || 'http://assist-stats-openreplay.app.svc.cluster.local:8000/events'; -async function postData(payload) { - const options = { - method: 'POST', - body: JSON.stringify(payload), - headers: { 'Content-Type': 'application/json' } - } - - try { - const response = await fetch(StatsHost, options) - const jsonResponse = await response.json(); - console.log('JSON response', JSON.stringify(jsonResponse, null, 4)) - } catch(err) { - console.log('ERROR', err); - } -} +const debug = process.env.debug === "1"; class InMemoryCache { constructor() { @@ -40,7 +26,21 @@ class InMemoryCache { const cache = new InMemoryCache(); -const debug = process.env.debug === "1"; +async function postData(payload) { + const options = { + method: 'POST', + body: JSON.stringify(payload), + headers: { 'Content-Type': 'application/json' } + } + + try { + const response = await fetch(StatsHost, options) + const jsonResponse = await response.json(); + debug && console.log('JSON response', JSON.stringify(jsonResponse, null, 4)) + } catch(err) { + debug && console.log('ERROR', err); + } +} function startAssist(socket, agentID) { const tsNow = +new Date(); @@ -96,14 +96,14 @@ function startCall(socket, agentID) { // Save uniq eventID to cache cache.set(`${socket.sessId}_call`, eventID); // Debug logs - console.log(`s_call_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`); + debug && console.log(`s_call_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`); } function endCall(socket, agentID) { const tsNow = +new Date(); const eventID = cache.get(`${socket.sessId}_call`); if (eventID === undefined) { - console.log(`have to skip s_call_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`); + debug && console.log(`have to skip s_call_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`); return } void postData({ @@ -117,7 +117,7 @@ function endCall(socket, agentID) { }); cache.delete(`${socket.sessId}_call`) // Debug logs - console.log(`s_call_ended, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`); + debug && console.log(`s_call_ended, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`); } function startControl(socket, agentID) { @@ -134,14 +134,14 @@ function startControl(socket, agentID) { }); cache.set(`${socket.sessId}_control`, eventID) // Debug logs - console.log(`s_control_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`); + debug && console.log(`s_control_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`); } function endControl(socket, agentID) { const tsNow = +new Date(); const eventID = cache.get(`${socket.sessId}_control`); if (eventID === undefined) { - console.log(`have to skip s_control_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`); + debug && console.log(`have to skip s_control_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`); return } void postData({ @@ -155,7 +155,7 @@ function endControl(socket, agentID) { }); cache.delete(`${socket.sessId}_control`) // Debug logs - console.log(`s_control_ended, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`); + debug && console.log(`s_control_ended, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`); } function startRecord(socket, agentID) { @@ -172,7 +172,7 @@ function startRecord(socket, agentID) { }); cache.set(`${socket.sessId}_record`, eventID) // Debug logs - console.log(`s_recording_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`); + debug && console.log(`s_recording_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`); } function endRecord(socket, agentID) { @@ -189,7 +189,7 @@ function endRecord(socket, agentID) { }); cache.delete(`${socket.sessId}_record`) // Debug logs - console.log(`s_recording_ended, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`); + debug && console.log(`s_recording_ended, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`); } function handleEvent(eventName, socket, agentID) { diff --git a/assist/utils/wsServer.js b/assist/utils/wsServer.js new file mode 100644 index 000000000..00165eafd --- /dev/null +++ b/assist/utils/wsServer.js @@ -0,0 +1,29 @@ +const _io = require("socket.io"); +const {getCompressionConfig} = require("./helper"); + +let io; + +const getServer = function () { + return io; +} + +const createSocketIOServer = function (server, prefix) { + if (io) { + return io; + } + io = _io(server, { + maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"] + }, + path: (prefix ? prefix : '') + '/socket', + ...getCompressionConfig() + }); + return io; +} + +module.exports = { + createSocketIOServer, + getServer, +} \ No newline at end of file diff --git a/ee/assist/servers/websocket-cluster.js b/ee/assist/servers/websocket-cluster.js index f116eeecb..c80549461 100644 --- a/ee/assist/servers/websocket-cluster.js +++ b/ee/assist/servers/websocket-cluster.js @@ -1,34 +1,32 @@ -const _io = require('socket.io'); const express = require('express'); const { - extractRoomId, extractPeerId, hasFilters, isValidSession, sortPaginate, - getValidAttributes, - uniqueAutocomplete } = require('../utils/helper'); const { IDENTITIES, - EVENTS_DEFINITION, - extractSessionInfo, socketConnexionTimeout, - errorHandler, authorizer } = require('../utils/assistHelper'); const { - extractProjectKeyFromRequest, - extractSessionIdFromRequest, extractPayloadFromRequest, - getCompressionConfig, getAvailableRooms } = require('../utils/helper-ee'); const { - startAssist, - endAssist, - handleEvent -} = require('../utils/stats'); + createSocketIOServer +} = require('../utils/wsServer'); +const { + onConnect +} = require('../utils/socketHandlers'); +const { + respond, + socketsList, + socketsListByProject, + socketsLiveByProject, + autocomplete +} = require('../utils/httpHandlers'); const {createAdapter} = require("@socket.io/redis-adapter"); const {createClient} = require("redis"); @@ -38,129 +36,19 @@ const pubClient = createClient({url: REDIS_URL}); const subClient = pubClient.duplicate(); console.log(`Using Redis: ${REDIS_URL}`); let io; -const debug = process.env.debug === "1"; - -const createSocketIOServer = function (server, prefix) { - if (process.env.uws !== "true") { - io = _io(server, { - maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: (prefix ? prefix : '') + '/socket', - ...getCompressionConfig() - }); - } else { - io = new _io.Server({ - maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: (prefix ? prefix : '') + '/socket', - ...getCompressionConfig() - }); - io.attachApp(server); - } -} - -// TODO: Maybe we should use a Set instead of an array -const uniqueSessions = function (data) { - let resArr = []; - let resArrIDS = []; - for (let item of data) { - if (resArrIDS.indexOf(item.sessionID) < 0) { - resArr.push(item); - resArrIDS.push(item.sessionID); - } - } - return resArr; -} - - -const respond = function (res, data) { - let result = {data} - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } -} - -const socketsList = async function (req, res) { - debug && console.log("[WS]looking for all available sessions"); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessionsPerProject = {}; - let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey !== undefined) { - liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); - if (withFilters) { - const connected_sockets = await io.in(peerId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo - && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessionsPerProject[projectKey].add(sessionId); - } - } - } else { - liveSessionsPerProject[projectKey].add(sessionId); - } - } - } - let liveSessions = {}; - liveSessionsPerProject.forEach((sessions, projectId) => { - liveSessions[projectId] = Array.from(sessions); - }); - respond(res, liveSessions); -} - -const socketsListByProject = async function (req, res) { - debug && console.log("[WS]looking for available sessions"); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - if (withFilters) { - const connected_sockets = await io.in(peerId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo - && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions.add(sessionId); - } - } - } else { - liveSessions.add(sessionId); - } - } - } - let sessions = Array.from(liveSessions); - respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) - : sessions.length > 0 ? sessions[0] - : null); -} +const debug_log = process.env.debug === "1"; const socketsLive = async function (req, res) { - debug && console.log("[WS]looking for all available LIVE sessions"); + debug_log && console.log("[WS]looking for all available LIVE sessions"); let filters = await extractPayloadFromRequest(req, res); let withFilters = hasFilters(filters); let liveSessionsPerProject = {}; const sessIDs = new Set(); let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey} = extractPeerId(peerId); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); if (projectKey !== undefined) { - let connected_sockets = await io.in(peerId).fetchSockets(); + let connected_sockets = await io.in(roomId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); @@ -180,8 +68,6 @@ const socketsLive = async function (req, res) { } } } - // Should be already unique - // liveSessionsPerProject[projectKey] = uniqueSessions(liveSessionsPerProject[projectKey]); } } let liveSessions = {}; @@ -191,116 +77,6 @@ const socketsLive = async function (req, res) { respond(res, sortPaginate(liveSessions, filters)); } -const socketsLiveByProject = async function (req, res) { - debug && console.log("[WS]looking for available LIVE sessions"); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - const sessIDs = new Set(); - let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - let connected_sockets = await io.in(peerId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (withFilters) { - if (item.handshake.query.sessionInfo && - isValidSession(item.handshake.query.sessionInfo, filters.filter) && - !sessIDs.has(item.handshake.query.sessionInfo.sessionID) - ) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } else { - if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } - } - } - // Should be unique already because of using sessIDs set - // liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey] || []); - } - } - let sessions = Array.from(liveSessions); - respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); -} - -const autocomplete = async function (req, res) { - debug && console.log("[WS]autocomplete"); - let _projectKey = extractProjectKeyFromRequest(req); - let filters = await extractPayloadFromRequest(req); - let results = []; - if (filters.query && Object.keys(filters.query).length > 0) { - let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey} = extractPeerId(peerId); - if (projectKey === _projectKey) { - let connected_sockets = await io.in(peerId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { - results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)]; - } - } - } - } - } - respond(res, uniqueAutocomplete(results)); -} - -const findSessionSocketId = async (io, roomId, tabId) => { - let pickFirstSession = tabId === undefined; - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (pickFirstSession) { - return item.id; - } else if (item.tabId === tabId) { - return item.id; - } - } - } - return null; -}; - -async function sessions_agents_count(io, socket) { - let c_sessions = 0, c_agents = 0; - const rooms = await getAvailableRooms(io); - if (rooms.has(socket.roomId)) { - const connected_sockets = await io.in(socket.roomId).fetchSockets(); - - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - c_sessions++; - } else { - c_agents++; - } - } - } else { - c_agents = -1; - c_sessions = -1; - } - return {c_sessions, c_agents}; -} - -async function get_all_agents_ids(io, socket) { - let agents = []; - const rooms = await getAvailableRooms(io); - if (rooms.has(socket.roomId)) { - const connected_sockets = await io.in(socket.roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.agent) { - agents.push(item.id); - } - } - } - return agents; -} - wsRouter.get(`/sockets-list`, socketsList); wsRouter.post(`/sockets-list`, socketsList); wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); @@ -318,156 +94,16 @@ wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveByProject); module.exports = { wsRouter, start: (server, prefix) => { - createSocketIOServer(server, prefix); + io = createSocketIOServer(server, prefix); io.use(async (socket, next) => await authorizer.check(socket, next)); - io.on('connection', async (socket) => { - socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err)); - debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); - socket._connectedAt = new Date(); + io.on('connection', (socket) => onConnect(socket)); - let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.handshake.query.peerId); - socket.peerId = socket.handshake.query.peerId; - socket.roomId = extractRoomId(socket.peerId); - // Set default tabId for back compatibility - connTabId = connTabId ?? (Math.random() + 1).toString(36).substring(2); - socket.tabId = connTabId; - socket.identity = socket.handshake.query.identity; - debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`); - - let {c_sessions, c_agents} = await sessions_agents_count(io, socket); - if (socket.identity === IDENTITIES.session) { - if (c_sessions > 0) { - const rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey} = extractPeerId(roomId); - if (projectKey === connProjectKey) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.tabId === connTabId) { - debug && console.log(`session already connected, refusing new connexion`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); - return socket.disconnect(); - } - } - } - } - } - extractSessionInfo(socket); - if (c_agents > 0) { - debug && console.log(`notifying new session about agent-existence`); - let agents_ids = await get_all_agents_ids(io, socket); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); - } - - } else if (c_sessions <= 0) { - debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - } - await socket.join(socket.roomId); - const rooms = await getAvailableRooms(io); - if (rooms.has(socket.roomId)) { - let connectedSockets = await io.in(socket.roomId).fetchSockets(); - debug && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${connectedSockets.length}`); - } - if (socket.identity === IDENTITIES.agent) { - if (socket.handshake.query.agentInfo !== undefined) { - socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); - socket.agentID = socket.handshake.query.agentInfo.id; - // Stats - startAssist(socket, socket.agentID); - } - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); - } - - socket.on('disconnect', async () => { - debug && console.log(`${socket.id} disconnected from ${socket.roomId}`); - if (socket.identity === IDENTITIES.agent) { - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); - // Stats - endAssist(socket, socket.agentID); - } - debug && console.log("checking for number of connected agents and sessions"); - let {c_sessions, c_agents} = await sessions_agents_count(io, socket); - if (c_sessions === -1 && c_agents === -1) { - debug && console.log(`room not found: ${socket.roomId}`); - } - if (c_sessions === 0) { - debug && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - } - if (c_agents === 0) { - debug && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); - } - }); - - socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, async (...args) => { - debug && console.log(`${socket.id} sent update event.`); - if (socket.identity !== IDENTITIES.session) { - debug && console.log('Ignoring update event.'); - return - } - // Back compatibility (add top layer with meta information) - if (args[0]?.meta === undefined && socket.identity === IDENTITIES.session) { - args[0] = {meta: {tabId: socket.tabId, version: 1}, data: args[0]}; - } - Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0]?.meta?.tabId}); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); - // Update sessionInfo for all sessions in room - const rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - if (roomId === socket.roomId) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { - Object.assign(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); - } - } - } - } - }); - - socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err)); - socket.on(EVENTS_DEFINITION.listen.CONNECT_FAILED, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_FAILED, err)); - - socket.onAny(async (eventName, ...args) => { - if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) { - debug && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`); - return - } - // Back compatibility (add top layer with meta information) - if (args[0]?.meta === undefined && socket.identity === IDENTITIES.session) { - args[0] = {meta: {tabId: socket.tabId, version: 1}, data: args[0]}; - } - if (socket.identity === IDENTITIES.session) { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.roomId}`); - // TODO: send to all agents in the room - socket.to(socket.roomId).emit(eventName, args[0]); - } else { - // Stats - handleEvent(eventName, socket, args[0]); - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`); - let socketId = await findSessionSocketId(io, socket.roomId, args[0]?.meta?.tabId); - if (socketId === null) { - debug && console.log(`session not found for:${socket.roomId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - } else { - debug && console.log("message sent"); - io.to(socketId).emit(eventName, socket.id, args[0]); - } - } - }); - - }); console.log("WS server started"); setInterval(async (io) => { try { const rooms = await getAvailableRooms(io); let validRooms = []; console.log(` ====== Rooms: ${rooms.size} ====== `); - // const arr = Array.from(rooms) - // const filtered = arr.filter(room => !room[1].has(room[0])) for (let i of rooms) { let {projectKey, sessionId} = extractPeerId(i); if (projectKey !== undefined && sessionId !== undefined) { @@ -475,7 +111,7 @@ module.exports = { } } console.log(` ====== Valid Rooms: ${validRooms.length} ====== `); - if (debug) { + if (debug_log) { for (let item of validRooms) { let connectedSockets = await io.in(item).fetchSockets(); console.log(`Room: ${item} connected: ${connectedSockets.length}`); @@ -495,7 +131,7 @@ module.exports = { }) .catch((err) => { console.log("> redis connection error"); - debug && console.error(err); + debug_log && console.error(err); process.exit(2); }); }, diff --git a/ee/assist/servers/websocket.js b/ee/assist/servers/websocket.js index 9a158b3ce..cefc84c8d 100644 --- a/ee/assist/servers/websocket.js +++ b/ee/assist/servers/websocket.js @@ -1,136 +1,37 @@ -const _io = require('socket.io'); const express = require('express'); const { - extractRoomId, extractPeerId, hasFilters, isValidSession, sortPaginate, - getValidAttributes, - uniqueAutocomplete } = require('../utils/helper'); const { - IDENTITIES, - EVENTS_DEFINITION, - extractSessionInfo, - socketConnexionTimeout, - errorHandler, - authorizer -} = require('../utils/assistHelper'); -const { - extractProjectKeyFromRequest, - extractSessionIdFromRequest, extractPayloadFromRequest, - getCompressionConfig, getAvailableRooms } = require('../utils/helper-ee'); const { - startAssist, - endAssist, - handleEvent -} = require('../utils/stats'); + IDENTITIES, + socketConnexionTimeout, + authorizer +} = require('../utils/assistHelper'); +const { + createSocketIOServer +} = require('../utils/wsServer'); +const { + onConnect +} = require('../utils/socketHandlers'); +const { + respond, + socketsList, + socketsListByProject, + socketsLiveByProject, + autocomplete +} = require('../utils/httpHandlers'); const wsRouter = express.Router(); let io; const debug_log = process.env.debug === "1"; -const error_log = process.env.ERROR === "1"; - -const createSocketIOServer = function (server, prefix) { - if (process.env.uws !== "true") { - io = _io(server, { - maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: (prefix ? prefix : '') + '/socket', - ...getCompressionConfig() - }); - } else { - io = new _io.Server({ - maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: (prefix ? prefix : '') + '/socket', - ...getCompressionConfig() - }); - io.attachApp(server); - } -} - -const respond = function (res, data) { - let result = {data} - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } -} - -const socketsList = async function (req, res) { - debug_log && console.log("[WS]looking for all available sessions"); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessionsPerProject = {}; - let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey !== undefined) { - liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); - if (withFilters) { - const connected_sockets = await io.in(peerId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo - && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessionsPerProject[projectKey].add(sessionId); - } - } - } else { - liveSessionsPerProject[projectKey].add(sessionId); - } - } - } - let liveSessions = {}; - liveSessionsPerProject.forEach((sessions, projectId) => { - liveSessions[projectId] = Array.from(sessions); - }); - respond(res, liveSessions); -} - -const socketsListByProject = async function (req, res) { - debug_log && console.log("[WS]looking for available sessions"); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - if (withFilters) { - const connected_sockets = await io.in(peerId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo - && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions.add(sessionId); - } - } - } else { - liveSessions.add(sessionId); - } - } - } - let sessions = Array.from(liveSessions); - respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) - : sessions.length > 0 ? sessions[0] - : null); -} const socketsLive = async function (req, res) { debug_log && console.log("[WS]looking for all available LIVE sessions"); @@ -138,14 +39,14 @@ const socketsLive = async function (req, res) { let withFilters = hasFilters(filters); let liveSessionsPerProject = {}; let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey} = extractPeerId(peerId); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); if (projectKey !== undefined) { - let connected_sockets = await io.in(peerId).fetchSockets(); + let connected_sockets = await io.in(roomId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); - if (hasFilters(filters)) { + if (withFilters) { if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); } @@ -163,114 +64,6 @@ const socketsLive = async function (req, res) { respond(res, sortPaginate(liveSessions, filters)); } -const socketsLiveByProject = async function (req, res) { - debug_log && console.log("[WS]looking for available LIVE sessions"); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - const sessIDs = new Set(); - let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - let connected_sockets = await io.in(peerId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (withFilters) { - if (item.handshake.query.sessionInfo && - isValidSession(item.handshake.query.sessionInfo, filters.filter) && - !sessIDs.has(item.handshake.query.sessionInfo.sessionID) - ) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } else { - if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } - } - } - } - } - let sessions = Array.from(liveSessions); - respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); -} - -const autocomplete = async function (req, res) { - debug_log && console.log("[WS]autocomplete"); - let _projectKey = extractProjectKeyFromRequest(req); - let filters = await extractPayloadFromRequest(req); - let results = []; - if (filters.query && Object.keys(filters.query).length > 0) { - let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey} = extractPeerId(peerId); - if (projectKey === _projectKey) { - let connected_sockets = await io.in(peerId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { - results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)]; - } - } - } - } - } - respond(res, uniqueAutocomplete(results)); -} - -const findSessionSocketId = async (io, roomId, tabId) => { - let pickFirstSession = tabId === undefined; - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (pickFirstSession) { - return item.id; - } else if (item.tabId === tabId) { - return item.id; - } - } - } - return null; -}; - -async function sessions_agents_count(io, socket) { - let c_sessions = 0, c_agents = 0; - const rooms = await getAvailableRooms(io); - if (rooms.get(socket.roomId)) { - const connected_sockets = await io.in(socket.roomId).fetchSockets(); - - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - c_sessions++; - } else { - c_agents++; - } - } - } else { - c_agents = -1; - c_sessions = -1; - } - return {c_sessions, c_agents}; -} - -async function get_all_agents_ids(io, socket) { - let agents = []; - const rooms = await getAvailableRooms(io); - if (rooms.get(socket.roomId)) { - const connected_sockets = await io.in(socket.roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.agent) { - agents.push(item.id); - } - } - } - return agents; -} - wsRouter.get(`/sockets-list`, socketsList); wsRouter.post(`/sockets-list`, socketsList); wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); @@ -288,145 +81,10 @@ wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveByProject); module.exports = { wsRouter, start: (server, prefix) => { - createSocketIOServer(server, prefix); + io = createSocketIOServer(server, prefix); io.use(async (socket, next) => await authorizer.check(socket, next)); - io.on('connection', async (socket) => { - socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err)); - debug_log && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); - socket._connectedAt = new Date(); + io.on('connection', (socket) => onConnect(socket)); - let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.handshake.query.peerId); - socket.peerId = socket.handshake.query.peerId; - socket.roomId = extractRoomId(socket.peerId); - // Set default tabId for back compatibility - connTabId = connTabId ?? (Math.random() + 1).toString(36).substring(2); - socket.tabId = connTabId; - socket.identity = socket.handshake.query.identity; - debug_log && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`); - - let {c_sessions, c_agents} = await sessions_agents_count(io, socket); - if (socket.identity === IDENTITIES.session) { - if (c_sessions > 0) { - const rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey} = extractPeerId(roomId); - if (projectKey === connProjectKey) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.tabId === connTabId) { - error_log && console.log(`session already connected, refusing new connexion, peerId: ${socket.peerId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); - return socket.disconnect(); - } - } - } - } - } - extractSessionInfo(socket); - if (c_agents > 0) { - debug_log && console.log(`notifying new session about agent-existence`); - let agents_ids = await get_all_agents_ids(io, socket); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); - } - - } else if (c_sessions <= 0) { - debug_log && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - } - await socket.join(socket.roomId); - const rooms = await getAvailableRooms(io); - if (rooms.get(socket.roomId)) { - debug_log && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${rooms.get(socket.roomId).size}`); - } - if (socket.identity === IDENTITIES.agent) { - if (socket.handshake.query.agentInfo !== undefined) { - socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); - socket.agentID = socket.handshake.query.agentInfo.id; - // Stats - startAssist(socket, socket.agentID); - } - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); - } - - socket.on('disconnect', async () => { - debug_log && console.log(`${socket.id} disconnected from ${socket.roomId}`); - if (socket.identity === IDENTITIES.agent) { - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); - // Stats - endAssist(socket, socket.agentID); - } - debug_log && console.log("checking for number of connected agents and sessions"); - let {c_sessions, c_agents} = await sessions_agents_count(io, socket); - if (c_sessions === -1 && c_agents === -1) { - debug_log && console.log(`room not found: ${socket.roomId}`); - } - if (c_sessions === 0) { - debug_log && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - } - if (c_agents === 0) { - debug_log && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); - } - }); - - socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, async (...args) => { - debug_log && console.log(`${socket.id} sent update event.`); - if (socket.identity !== IDENTITIES.session) { - debug_log && console.log('Ignoring update event.'); - return - } - // Back compatibility (add top layer with meta information) - if (args[0]?.meta === undefined && socket.identity === IDENTITIES.session) { - args[0] = {meta: {tabId: socket.tabId, version: 1}, data: args[0]}; - } - Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0]?.meta?.tabId}); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); - // Update sessionInfo for all sessions in room - const rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - if (roomId === socket.roomId) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { - Object.assign(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); - } - } - } - } - }); - - socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err)); - socket.on(EVENTS_DEFINITION.listen.CONNECT_FAILED, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_FAILED, err)); - - socket.onAny(async (eventName, ...args) => { - if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) { - debug_log && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`); - return - } - // Back compatibility (add top layer with meta information) - if (args[0]?.meta === undefined && socket.identity === IDENTITIES.session) { - args[0] = {meta: {tabId: socket.tabId, version: 1}, data: args[0]}; - } - if (socket.identity === IDENTITIES.session) { - debug_log && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); - socket.to(socket.roomId).emit(eventName, args[0]); - } else { - // Stats - handleEvent(eventName, socket, args[0]); - debug_log && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`); - let socketId = await findSessionSocketId(io, socket.roomId, args[0]?.meta?.tabId); - if (socketId === null) { - debug_log && console.log(`session not found for:${socket.roomId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - } else { - debug_log && console.log("message sent"); - io.to(socketId).emit(eventName, socket.id, args[0]); - } - } - }); - }); console.log("WS server started"); setInterval(async (io) => { try { diff --git a/ee/assist/utils/extractors.js b/ee/assist/utils/extractors.js new file mode 100644 index 000000000..8a19cf2e6 --- /dev/null +++ b/ee/assist/utils/extractors.js @@ -0,0 +1,13 @@ +const { + extractProjectKeyFromRequest, + extractSessionIdFromRequest, + extractPayloadFromRequest, + getAvailableRooms +} = require('../utils/helper-ee'); + +module.exports = { + extractProjectKeyFromRequest, + extractSessionIdFromRequest, + extractPayloadFromRequest, + getAvailableRooms +} \ No newline at end of file diff --git a/ee/assist/utils/helper-ee.js b/ee/assist/utils/helper-ee.js index 2234226cc..7c735a7ce 100644 --- a/ee/assist/utils/helper-ee.js +++ b/ee/assist/utils/helper-ee.js @@ -17,12 +17,6 @@ const getBodyFromUWSResponse = async function (res) { json = JSON.parse(buffer); } catch (e) { console.error(e); - /* res.close calls onAborted */ - // try { - // res.close(); - // } catch (e2) { - // console.error(e2); - // } json = {}; } resolve(json); diff --git a/ee/assist/utils/wsServer.js b/ee/assist/utils/wsServer.js new file mode 100644 index 000000000..e20f07a7f --- /dev/null +++ b/ee/assist/utils/wsServer.js @@ -0,0 +1,44 @@ +const _io = require("socket.io"); +const {getCompressionConfig} = require("./helper"); + +let io; + +const getServer = function () { + return io; +} + +const createSocketIOServer = function (server, prefix) { + if (io) { + return io; + } + if (process.env.uws !== "true") { + io = _io(server, { + maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"], + credentials: true + }, + path: (prefix ? prefix : '') + '/socket', + ...getCompressionConfig() + }); + } else { + io = new _io.Server({ + maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"], + credentials: true + }, + path: (prefix ? prefix : '') + '/socket', + ...getCompressionConfig() + }); + io.attachApp(server); + } + return io; +} + +module.exports = { + createSocketIOServer, + getServer, +} \ No newline at end of file