diff --git a/relay.go b/relay.go index c1623e0..caed722 100644 --- a/relay.go +++ b/relay.go @@ -31,9 +31,17 @@ type receivedSignal struct { candidate webrtc.ICECandidateInit } +type activeRelay struct { + channel chan receivedSignal + callHash string + id uint +} + /// We keep for each connection its channel -var closeChan = make(chan string) -var connections = make(map[string]chan receivedSignal) +var closeChan = make(chan activeRelay) +var connections = make(map[string]activeRelay) + +var currID uint = 0 /// Process incoming messages func onSignal(callHash, peerID string, data map[string]interface{}) { @@ -85,39 +93,44 @@ func onSignal(callHash, peerID string, data map[string]interface{}) { if newSignal.sigType == SDP && peerID == "0" { // Check if we are overwriting another connection - if _, ok := connections[callHash]; ok { - closeConnection(callHash) + if val, ok := connections[callHash]; ok { + closeConnection(val) } - connections[callHash] = make(chan receivedSignal, 10) + connections[callHash] = activeRelay{ + id: currID, + callHash: callHash, + channel: make(chan receivedSignal, 10), + } + currID++ go newCall(newSignal, connections[callHash]) } else { // Forward the message to the channel - connections[callHash] <- newSignal + connections[callHash].channel <- newSignal } } /// Close a connection -func closeConnection(callHash string) { - log.Printf("Closing call %s", callHash) - if val, ok := connections[callHash]; ok { - close(val) - delete(connections, callHash) +func closeConnection(r activeRelay) { + log.Printf("Closing call %s / id: %d", r.callHash, r.id) + if val, ok := connections[r.callHash]; ok && val.id == r.id { + close(val.channel) + delete(connections, r.callHash) } } // Ask for a channel to be closed -func askForClose(callHash string) { - closeChan <- callHash +func askForClose(r activeRelay) { + closeChan <- r } // Process channel close requests (in thread safe way) func processCloseRequests() { for { select { - case id := <-closeChan: - closeConnection(id) + case r := <-closeChan: + closeConnection(r) case <-time.After(time.Millisecond * 10): return } @@ -125,7 +138,7 @@ func processCloseRequests() { } /// Start new call -func newCall(mainOffer receivedSignal, ch chan receivedSignal) { +func newCall(mainOffer receivedSignal, r activeRelay) { log.Printf("Starting new call: %s", mainOffer.callHash) @@ -134,7 +147,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) { err := mediaEngine.PopulateFromSDP(mainOffer.offer) if err != nil { log.Println("Error: invalid data in offer!", err) - askForClose(mainOffer.callHash) + askForClose(r) return } @@ -150,20 +163,20 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) { peerConnection, err := api.NewPeerConnection(peerConnectionConfig) if err != nil { log.Println("Error: could not create peer connection!", err) - askForClose(mainOffer.callHash) + askForClose(r) return } // Allow us to receive 1 audio & 1 video track if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { log.Println("Error: could not prepare to receive video track!", err) - askForClose(mainOffer.callHash) + askForClose(r) return } if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { log.Println("Error: could not prepare to receive audio track!", err) - askForClose(mainOffer.callHash) + askForClose(r) return } @@ -189,7 +202,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) { localTrack, newTrackErr := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label()) if newTrackErr != nil { log.Println("New track error!", err) - askForClose(mainOffer.callHash) + askForClose(r) return } @@ -203,7 +216,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) { // Could not read from remote track log.Println("Read error!", err) - askForClose(mainOffer.callHash) + askForClose(r) break } @@ -211,7 +224,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) { // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet if _, err = localTrack.Write(rtpBuf[:i]); err != nil && err != io.ErrClosedPipe { log.Println("Write error!", err) - askForClose(mainOffer.callHash) + askForClose(r) break } } @@ -221,7 +234,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) { err = peerConnection.SetRemoteDescription(mainOffer.offer) if err != nil { log.Println("Set remote description error!", err) - askForClose(mainOffer.callHash) + askForClose(r) return } @@ -229,7 +242,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) { answer, err := peerConnection.CreateAnswer(nil) if err != nil { log.Println("Create answer error!", err) - askForClose(mainOffer.callHash) + askForClose(r) return } @@ -237,7 +250,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) { err = peerConnection.SetLocalDescription(answer) if err != nil { log.Println("Set local description error!", err) - askForClose(mainOffer.callHash) + askForClose(r) return }