/** * User websocket controller * * @author Pierre Hubert */ import * as ws from 'ws'; import { Request } from 'express'; import { RequestHandler } from '../entities/RequestHandler'; import { time } from '../utils/DateUtils'; import { randomStr } from '../utils/CryptUtils'; import { EventsHelper } from '../helpers/EventsHelper'; import { NotificationsHelper } from '../helpers/NotificationsHelper'; import { ConversationsHelper } from '../helpers/ConversationsHelper'; import { UserWebSocketRoutes } from './UserWebSocketRoutes'; import { UserWebSocketRequestsHandler } from '../entities/WebSocketRequestHandler'; import { WsMessage } from '../entities/WsMessage'; interface PendingRequests { time: number, clientID: number, userID: number, token: string } export interface ActiveClient { socketID: string, clientID: number, userID: number, ws: ws } // Tokens are valid only 10 seconds after they are generated const TOKENS_DURATION = 10 const TOKEN_LENGTH = 20 export class UserWebSocketController { /** * The list of pending connections */ static pending_list: PendingRequests[] = [] /** * The list of active clients */ static active_clients: ActiveClient[] = [] /** * Clean the list of tokens */ private static CleanList() { // Clean the list this.pending_list = this.pending_list .filter((l) => l.time + TOKENS_DURATION + 1 > time()) } /** * Get a websocket access token * * @param h Request handler */ public static async GetToken(h: RequestHandler) { this.CleanList(); // Generate a temporary token const token = randomStr(TOKEN_LENGTH); // Add the token to the list this.pending_list.push({ time: time(), clientID: h.getClientInfo().id, userID: h.getUserId(), token: token }); h.send({ token: token }); } /** * Handler user websocket request * * @param req Associated request * @param ws The socket */ public static async UserWS(req: Request, ws: ws) { this.CleanList(); // First, check for token if(!req.query.hasOwnProperty("token") || String(req.query.token).length != TOKEN_LENGTH) { ws.send("Missing token!"); ws.close(); return; } // Search appropriate connection const token = req.query.token; const entryIndex = this.pending_list.findIndex((el) => el.token == token); if(entryIndex == -1) { ws.send("Invalid token!"); ws.close(); return; } // Remove the entry from the array const entry = this.pending_list[entryIndex]; this.pending_list.splice(entryIndex, 1); // Add the client to the list of active clients const client: ActiveClient = { socketID: randomStr(30), clientID: entry.clientID, userID: entry.userID, ws: ws } this.active_clients.push(client); // Remove the client for the list as soon as the // socket is closed ws.addEventListener("close", () => { this.active_clients.splice(this.active_clients.indexOf(client), 1); }) // Handles error ws.addEventListener("error", (e) => { if(ws.readyState == ws.OPEN) ws.close(); console.log("WebSocket error", e) }) // Handles incoming messages ws.addEventListener("message", async (msg) => { // Only accept text messages if(msg.type != "message") { console.error("Received a non-text messsage through a WebSocket !") ws.close(); return; } // Check if the data are valid let wsMsg : WsMessage; try { wsMsg = new WsMessage(JSON.parse(msg.data)); if(!wsMsg.isValidRequest) throw new Error("Requested message is invalid!"); } catch(e) { console.error(e); ws.close(); return; } // Create request handler const handler = new UserWebSocketRequestsHandler(client, wsMsg); try { // Check if we support this kind of message const route = UserWebSocketRoutes.find((el) => el.title == wsMsg.title); if(route == undefined) { handler.error(404, "Method not found!"); return; } else await route.handler(handler); } catch(e) { // Try again to send again a response try { handler.sendResponse("error", { code: 500, message: "Server error" }); } catch(e) { console.error(e); } } }) } /** * Close a specific user websocket * * @param clientID Target client ID * @param userID Target user ID */ public static async CloseClientSockets(clientID: number, userID: number) { for(const entry of this.active_clients.filter((f) => f.clientID == clientID && f.userID == userID)) entry.ws.close(); } /** * Send a message to a socket * * @param userID Target user ID * @param socketID Target socket ID (if null the message is sent to * all the active sockets of the user) * @param message The message to send */ public static Send(userID: number, socketID: string, message: WsMessage) { for(const entry of this.active_clients.filter( (e) => e.userID == userID && (socketID.length == 0 || e.socketID == socketID))) { if(entry.ws.readyState == ws.OPEN) entry.ws.send(JSON.stringify(message)); } } /** * Check out whether a user has an active websocket or not * * @param userID Target user ID */ public static IsConnected(userID: number) : boolean { return this.active_clients.find((e) => e.userID == userID) != undefined; } /** * Send updated notifications number to some users * * @param usersID Target users ID */ public static async SendNewNotificationsNumber(usersID: number[]) { // Process each user for(const userID of usersID) { if(!this.IsConnected(userID)) continue; // Notify user this.Send(userID, "", new WsMessage({ title: "number_notifs", id: "", data: await NotificationsHelper.CountUnread(userID) })); } } /** * Send upated number of unread conversations count * * @param usersID Target users ID */ public static async SendNewUnreadConversationsCount(usersID: number[]) { for(const userID of usersID) { if(!this.IsConnected(userID)) continue; // Notify user this.Send(userID, "", new WsMessage({ title: "number_unread_conversations", id: "", data: await ConversationsHelper.CountUnreadForUser(userID) })); } } } // When user sign out EventsHelper.Listen("destroyed_login_tokens", (e) => UserWebSocketController.CloseClientSockets(e.client.id, e.userID)); // When we get a new number of notifications EventsHelper.Listen("updated_number_notifications", async (e) => await UserWebSocketController.SendNewNotificationsNumber(e.usersID)); // When we get a new number of unread conversations EventsHelper.Listen("updated_number_unread_conversations", async (e) => await UserWebSocketController.SendNewUnreadConversationsCount(e.usersID));