From 53904ed3672cea2a74e1bd9b7c665faccf5b4034 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Sat, 11 Apr 2020 11:26:51 +0200 Subject: [PATCH] Thread-safely open & close channels --- relay.go | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 74 insertions(+), 4 deletions(-) diff --git a/relay.go b/relay.go index f4b02f8..6357421 100644 --- a/relay.go +++ b/relay.go @@ -6,8 +6,8 @@ package main import ( "encoding/json" - "fmt" "log" + "time" "github.com/pion/webrtc/v2" ) @@ -21,19 +21,28 @@ const ( ) type receivedSignal struct { + peerID string + callHash string sigType uint offer webrtc.SessionDescription candidate webrtc.ICECandidateInit } /// We keep for each connection its channel +var closeChan = make(chan string) var connections = make(map[string]chan receivedSignal) /// Process incoming messages func onSignal(callHash, peerID string, data map[string]interface{}) { + // Close all the channels that requested so + processCloseRequests() + // Decode received signal - newSignal := receivedSignal{} + newSignal := receivedSignal{ + peerID: peerID, + callHash: callHash, + } if data["type"] == "SDP" { newSignal.sigType = SDP @@ -61,6 +70,67 @@ func onSignal(callHash, peerID string, data map[string]interface{}) { log.Fatalf("Invalid signal type: %s !", data["type"]) } - println("Signal successfully processed!") - fmt.Println("sig:", newSignal) + // Check if we are attempting to connect as viewer to a non existing channel + if _, ok := connections[callHash]; !ok { + if peerID != "0" || newSignal.sigType != SDP { + println("Attempting to connect as viewer | send candidate to a non-ready broadcast!") + return + } + } + + // Handle new offers + if newSignal.sigType == SDP && peerID == "0" { + + // Check if we are overwriting another connection + if _, ok := connections[callHash]; ok { + closeConnection(callHash) + } + + connections[callHash] = make(chan receivedSignal, 10) + go newCall(newSignal, connections[callHash]) + + } else { + // Forward the message to the channel + connections[callHash] <- newSignal + } +} + +/// Close a connection +func closeConnection(callHash string) { + log.Printf("Closing call %s", callHash) + if val, ok := connections[callHash]; ok { + close(val) + delete(connections, callHash) + } +} + +// Ask for a channel to be closed +func askForClose(callHash string) { + closeChan <- callHash +} + +// Process channel close requests (in thread safe way) +func processCloseRequests() { + for { + select { + case id := <-closeChan: + closeConnection(id) + case <-time.After(time.Millisecond * 10): + return + } + } +} + +/// Start new call +func newCall(mainOffer receivedSignal, ch chan receivedSignal) { + + // Since we are answering use PayloadTypes declared by offerer + mediaEngine := webrtc.MediaEngine{} + err := mediaEngine.PopulateFromSDP(mainOffer.offer) + if err != nil { + log.Println("Error: invalid data in offer!", err) + askForClose(mainOffer.callHash) + return + } + }