From 98875aa47d5499abdc70a254ac32c91bbbbcadc8 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Sun, 12 Apr 2020 17:09:45 +0200 Subject: [PATCH] Upgrade system --- relay.go | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/relay.go b/relay.go index de0b95e..a29d44f 100644 --- a/relay.go +++ b/relay.go @@ -6,7 +6,6 @@ package main import ( "encoding/json" - "fmt" "io" "log" "sync" @@ -46,8 +45,8 @@ type activeRelay struct { } /// We keep for each connection its channel -var closeChan = make(chan activeRelay) -var connections = make(map[string]activeRelay) +var closeChan = make(chan *activeRelay) +var connections = make(map[string]*activeRelay) var currID uint = 0 @@ -114,7 +113,7 @@ func onSignal(callHash, peerID string, data map[string]interface{}) { // Check if we are attempting to connect as viewer to a non existing channel if _, ok := connections[callHash]; !ok { if peerID != "0" || newSignal.sigType != SDP { - println("Attempting to connect as viewer | send candidate to a non-ready broadcast!") + log.Printf("Attempting to connect as viewer | send candidate to a non-ready broadcast! (callhash %s / peerid %s)", callHash, peerID) return } } @@ -127,13 +126,14 @@ func onSignal(callHash, peerID string, data map[string]interface{}) { closeConnection(val) } - connections[callHash] = activeRelay{ + newRelay := activeRelay{ id: currID, callHash: callHash, channel: make(chan receivedSignal, 10), } currID++ - go newCall(newSignal, connections[callHash]) + connections[callHash] = &newRelay + go newCall(newSignal, &newRelay) } else { // Forward the message to the channel @@ -156,17 +156,23 @@ func onCloseConnection(callHash, peerID string) { } /// Close a connection -func closeConnection(r activeRelay) { +func closeConnection(r *activeRelay) { log.Printf("Closing call %s / id: %d", r.callHash, r.id) - if val, ok := connections[r.callHash]; ok && val.id == r.id { - close(val.channel) - delete(connections, r.callHash) + + // Close the channel + if !r.closed { + close(r.channel) r.closed = true } + + // Remove the channel from the list + if val, ok := connections[r.callHash]; ok && val.id == r.id { + delete(connections, r.callHash) + } } // Ask for a channel to be closed -func askForClose(r activeRelay) { +func askForClose(r *activeRelay) { closeChan <- r } @@ -183,7 +189,7 @@ func processCloseRequests() { } /// Start new call -func newCall(mainOffer receivedSignal, r activeRelay) { +func newCall(mainOffer receivedSignal, r *activeRelay) { log.Printf("Starting new call: %s", mainOffer.callHash) @@ -221,6 +227,14 @@ func newCall(mainOffer receivedSignal, r activeRelay) { return } + // Check if the connection is closed + mainPeerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { + if s == webrtc.PeerConnectionStateClosed { + askForClose(r) + return + } + }) + // Close peer connection defer mainPeerConnection.Close() @@ -279,7 +293,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) { } if rtcpSendErr := mainPeerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil { - fmt.Println("Write RTCP error:", rtcpSendErr) + log.Println("Write RTCP error:", rtcpSendErr) askForClose(r) return } @@ -308,7 +322,6 @@ func newCall(mainOffer receivedSignal, r activeRelay) { for { i, readErr := remoteTrack.Read(rtpBuf) if readErr != nil { - // Could not read from remote track log.Println("Read error!", readErr) askForClose(r)