Improve the way relays are closed

This commit is contained in:
Pierre HUBERT 2020-04-11 13:14:27 +02:00
parent 0484e47890
commit 262476e60b

View File

@ -31,9 +31,17 @@ type receivedSignal struct {
candidate webrtc.ICECandidateInit candidate webrtc.ICECandidateInit
} }
type activeRelay struct {
channel chan receivedSignal
callHash string
id uint
}
/// We keep for each connection its channel /// We keep for each connection its channel
var closeChan = make(chan string) var closeChan = make(chan activeRelay)
var connections = make(map[string]chan receivedSignal) var connections = make(map[string]activeRelay)
var currID uint = 0
/// Process incoming messages /// Process incoming messages
func onSignal(callHash, peerID string, data map[string]interface{}) { func onSignal(callHash, peerID string, data map[string]interface{}) {
@ -85,39 +93,44 @@ func onSignal(callHash, peerID string, data map[string]interface{}) {
if newSignal.sigType == SDP && peerID == "0" { if newSignal.sigType == SDP && peerID == "0" {
// Check if we are overwriting another connection // Check if we are overwriting another connection
if _, ok := connections[callHash]; ok { if val, ok := connections[callHash]; ok {
closeConnection(callHash) closeConnection(val)
} }
connections[callHash] = make(chan receivedSignal, 10) connections[callHash] = activeRelay{
id: currID,
callHash: callHash,
channel: make(chan receivedSignal, 10),
}
currID++
go newCall(newSignal, connections[callHash]) go newCall(newSignal, connections[callHash])
} else { } else {
// Forward the message to the channel // Forward the message to the channel
connections[callHash] <- newSignal connections[callHash].channel <- newSignal
} }
} }
/// Close a connection /// Close a connection
func closeConnection(callHash string) { func closeConnection(r activeRelay) {
log.Printf("Closing call %s", callHash) log.Printf("Closing call %s / id: %d", r.callHash, r.id)
if val, ok := connections[callHash]; ok { if val, ok := connections[r.callHash]; ok && val.id == r.id {
close(val) close(val.channel)
delete(connections, callHash) delete(connections, r.callHash)
} }
} }
// Ask for a channel to be closed // Ask for a channel to be closed
func askForClose(callHash string) { func askForClose(r activeRelay) {
closeChan <- callHash closeChan <- r
} }
// Process channel close requests (in thread safe way) // Process channel close requests (in thread safe way)
func processCloseRequests() { func processCloseRequests() {
for { for {
select { select {
case id := <-closeChan: case r := <-closeChan:
closeConnection(id) closeConnection(r)
case <-time.After(time.Millisecond * 10): case <-time.After(time.Millisecond * 10):
return return
} }
@ -125,7 +138,7 @@ func processCloseRequests() {
} }
/// Start new call /// Start new call
func newCall(mainOffer receivedSignal, ch chan receivedSignal) { func newCall(mainOffer receivedSignal, r activeRelay) {
log.Printf("Starting new call: %s", mainOffer.callHash) log.Printf("Starting new call: %s", mainOffer.callHash)
@ -134,7 +147,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
err := mediaEngine.PopulateFromSDP(mainOffer.offer) err := mediaEngine.PopulateFromSDP(mainOffer.offer)
if err != nil { if err != nil {
log.Println("Error: invalid data in offer!", err) log.Println("Error: invalid data in offer!", err)
askForClose(mainOffer.callHash) askForClose(r)
return return
} }
@ -150,20 +163,20 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
peerConnection, err := api.NewPeerConnection(peerConnectionConfig) peerConnection, err := api.NewPeerConnection(peerConnectionConfig)
if err != nil { if err != nil {
log.Println("Error: could not create peer connection!", err) log.Println("Error: could not create peer connection!", err)
askForClose(mainOffer.callHash) askForClose(r)
return return
} }
// Allow us to receive 1 audio & 1 video track // Allow us to receive 1 audio & 1 video track
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil {
log.Println("Error: could not prepare to receive video track!", err) log.Println("Error: could not prepare to receive video track!", err)
askForClose(mainOffer.callHash) askForClose(r)
return return
} }
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil {
log.Println("Error: could not prepare to receive audio track!", err) log.Println("Error: could not prepare to receive audio track!", err)
askForClose(mainOffer.callHash) askForClose(r)
return return
} }
@ -189,7 +202,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
localTrack, newTrackErr := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label()) localTrack, newTrackErr := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label())
if newTrackErr != nil { if newTrackErr != nil {
log.Println("New track error!", err) log.Println("New track error!", err)
askForClose(mainOffer.callHash) askForClose(r)
return return
} }
@ -203,7 +216,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
// Could not read from remote track // Could not read from remote track
log.Println("Read error!", err) log.Println("Read error!", err)
askForClose(mainOffer.callHash) askForClose(r)
break break
} }
@ -211,7 +224,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet
if _, err = localTrack.Write(rtpBuf[:i]); err != nil && err != io.ErrClosedPipe { if _, err = localTrack.Write(rtpBuf[:i]); err != nil && err != io.ErrClosedPipe {
log.Println("Write error!", err) log.Println("Write error!", err)
askForClose(mainOffer.callHash) askForClose(r)
break break
} }
} }
@ -221,7 +234,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
err = peerConnection.SetRemoteDescription(mainOffer.offer) err = peerConnection.SetRemoteDescription(mainOffer.offer)
if err != nil { if err != nil {
log.Println("Set remote description error!", err) log.Println("Set remote description error!", err)
askForClose(mainOffer.callHash) askForClose(r)
return return
} }
@ -229,7 +242,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
answer, err := peerConnection.CreateAnswer(nil) answer, err := peerConnection.CreateAnswer(nil)
if err != nil { if err != nil {
log.Println("Create answer error!", err) log.Println("Create answer error!", err)
askForClose(mainOffer.callHash) askForClose(r)
return return
} }
@ -237,7 +250,7 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
err = peerConnection.SetLocalDescription(answer) err = peerConnection.SetLocalDescription(answer)
if err != nil { if err != nil {
log.Println("Set local description error!", err) log.Println("Set local description error!", err)
askForClose(mainOffer.callHash) askForClose(r)
return return
} }