import 'dart:async'; import 'dart:convert'; import 'package:comunic/helpers/comments_helper.dart'; import 'package:comunic/helpers/conversations_helper.dart'; import 'package:comunic/helpers/events_helper.dart'; import 'package:comunic/models/api_request.dart'; import 'package:comunic/models/config.dart'; import 'package:comunic/models/ws_message.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; /// User web socket helper /// /// @author Pierre Hubert class WebSocketHelper { static WebSocketChannel? _ws; static int _counter = 0; static final _requests = Map>(); /// Check out whether we are currently connected to WebSocket or not static bool isConnected() { return _ws != null && _ws!.closeCode == null; } /// Get WebSocket access token static Future _getWsToken() async => (await APIRequest(uri: "ws/token", needLogin: true).exec()) .assertOk() .getObject()["token"]; /// Connect to WebSocket static connect() async { if (isConnected()) return; // First, get an access token final token = await _getWsToken(); // Determine WebSocket URI final wsURL = "${(config().apiServerSecure ? "wss" : "ws")}://${config().apiServerName}${config().apiServerUri}ws?token=$token"; final wsURI = Uri.parse(wsURL); // Connect _ws = WebSocketChannel.connect(wsURI); _ws!.stream.listen( // When we got data (data) { print("WS New data: $data"); _processMessage(data.toString()); }, // Print errors on console onError: (e, stack) { print("WS error! $e"); print(stack); }, // Notify when the channel is closed onDone: () { print("WS Channel closed"); // Clear Futures queue _requests.clear(); _ws = null; EventsHelper.emit(WSClosedEvent()); }, ); } /// Close current WebSocket (if any) static close() { if (isConnected()) _ws!.sink.close(); } /// Send a new message /// /// This method might throw an [Exception] in case of failure static Future sendMessage(String title, dynamic data) { if (!isConnected()) throw Exception("WS: Trying to send message but websocket is closed!"); final completer = Completer(); final id = "freq-${_counter++}"; final msg = WsMessage(id: id, title: title, data: data).toJSON(); print("WS Send message: $msg"); _ws!.sink.add(msg); _requests[id] = completer; return completer.future; } /// Process incoming message static _processMessage(String msgStr) { try { final msg = WsMessage.fromJSON(jsonDecode(msgStr)); if (!msg.hasId) _processUnattendedMessage(msg); else _respondRequests(msg); } catch (e, stack) { print("WS could not process message: $e"); print(stack); } } /// Process an unattended message static _processUnattendedMessage(WsMessage msg) { switch (msg.title) { // New number of notifications case "number_notifs": EventsHelper.emit(NewNumberNotifsEvent(msg.data)); break; // New number of unread conversations case "number_unread_conversations": EventsHelper.emit(NewNumberUnreadConversations(msg.data)); break; // New comment case "new_comment": EventsHelper.emit( NewCommentEvent(CommentsHelper.apiToComment(msg.data))); break; // Updated comment case "comment_updated": EventsHelper.emit( UpdatedCommentEvent(CommentsHelper.apiToComment(msg.data))); break; // Deleted comment case "comment_deleted": EventsHelper.emit(DeletedCommentEvent(msg.data)); break; // A user is writing a new message case "writing_message_in_conv": EventsHelper.emit(WritingMessageInConversationEvent( msg.data["conv_id"], msg.data["user_id"])); break; // Created new conversation message case "new_conv_message": EventsHelper.emit(NewConversationMessageEvent( ConversationsHelper.apiToConversationMessage(msg.data))); break; // Update conversation message content case "updated_conv_message": EventsHelper.emit(UpdatedConversationMessageEvent( ConversationsHelper.apiToConversationMessage(msg.data))); break; // Deleted a conversation message case "deleted_conv_message": EventsHelper.emit(DeletedConversationMessageEvent( ConversationsHelper.apiToConversationMessage(msg.data))); break; // Removed user from conversation case "removed_user_from_conv": EventsHelper.emit(RemovedUserFromConversationEvent( msg.data["conv_id"], msg.data["user_id"])); break; // Conversation deleted case "deleted_conversation": EventsHelper.emit(DeletedConversationEvent(msg.data)); break; // A user joined a call case "user_joined_call": EventsHelper.emit( UserJoinedCallEvent(msg.data["callID"], msg.data["userID"])); break; // A user left a call case "user_left_call": EventsHelper.emit( UserLeftCallEvent(msg.data["callID"], msg.data["userID"])); break; // Got new call signal case "new_call_signal": Map signalData = msg.data["data"]; EventsHelper.emit(NewCallSignalEvent( callID: msg.data["callID"], peerID: msg.data["peerID"], candidate: signalData.containsKey("candidate") ? RTCIceCandidate( signalData["candidate"], "${signalData["sdpMLineIndex"]}" /* fix plugin crash */, signalData["sdpMLineIndex"]) : null, sessionDescription: signalData.containsKey("type") ? RTCSessionDescription(signalData["sdp"], signalData["type"]) : null)); break; // Call peer ready event case "call_peer_ready": EventsHelper.emit( CallPeerReadyEvent(msg.data["callID"], msg.data["peerID"])); break; // Call peer interrupted streaming event case "call_peer_interrupted_streaming": EventsHelper.emit(CallPeerInterruptedStreamingEvent( msg.data["callID"], msg.data["peerID"])); break; // The call has been closed case "call_closed": EventsHelper.emit(CallClosedEvent(msg.data)); break; default: throw Exception("Unknown message type: ${msg.title}"); } } /// Process responses to requests static _respondRequests(WsMessage msg) { if (!_requests.containsKey(msg.id)) throw Exception( "Could not find request ${msg.id} in the requests queue!"); final completer = _requests.remove(msg.id); // Handles errors if (msg.title != "success") { completer!.completeError(Exception("Could not process request!")); return; } completer!.complete(msg.data); } } /// Helper function Future ws(String title, dynamic data) => WebSocketHelper.sendMessage(title, data);