Upgrade system

This commit is contained in:
Pierre HUBERT 2020-04-12 17:09:45 +02:00
parent 89f6e67a53
commit 98875aa47d

View File

@ -6,7 +6,6 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"log" "log"
"sync" "sync"
@ -46,8 +45,8 @@ type activeRelay struct {
} }
/// We keep for each connection its channel /// We keep for each connection its channel
var closeChan = make(chan activeRelay) var closeChan = make(chan *activeRelay)
var connections = make(map[string]activeRelay) var connections = make(map[string]*activeRelay)
var currID uint = 0 var currID uint = 0
@ -114,7 +113,7 @@ func onSignal(callHash, peerID string, data map[string]interface{}) {
// Check if we are attempting to connect as viewer to a non existing channel // Check if we are attempting to connect as viewer to a non existing channel
if _, ok := connections[callHash]; !ok { if _, ok := connections[callHash]; !ok {
if peerID != "0" || newSignal.sigType != SDP { if peerID != "0" || newSignal.sigType != SDP {
println("Attempting to connect as viewer | send candidate to a non-ready broadcast!") log.Printf("Attempting to connect as viewer | send candidate to a non-ready broadcast! (callhash %s / peerid %s)", callHash, peerID)
return return
} }
} }
@ -127,13 +126,14 @@ func onSignal(callHash, peerID string, data map[string]interface{}) {
closeConnection(val) closeConnection(val)
} }
connections[callHash] = activeRelay{ newRelay := activeRelay{
id: currID, id: currID,
callHash: callHash, callHash: callHash,
channel: make(chan receivedSignal, 10), channel: make(chan receivedSignal, 10),
} }
currID++ currID++
go newCall(newSignal, connections[callHash]) connections[callHash] = &newRelay
go newCall(newSignal, &newRelay)
} else { } else {
// Forward the message to the channel // Forward the message to the channel
@ -156,17 +156,23 @@ func onCloseConnection(callHash, peerID string) {
} }
/// Close a connection /// Close a connection
func closeConnection(r activeRelay) { func closeConnection(r *activeRelay) {
log.Printf("Closing call %s / id: %d", r.callHash, r.id) log.Printf("Closing call %s / id: %d", r.callHash, r.id)
if val, ok := connections[r.callHash]; ok && val.id == r.id {
close(val.channel) // Close the channel
delete(connections, r.callHash) if !r.closed {
close(r.channel)
r.closed = true r.closed = true
} }
// Remove the channel from the list
if val, ok := connections[r.callHash]; ok && val.id == r.id {
delete(connections, r.callHash)
}
} }
// Ask for a channel to be closed // Ask for a channel to be closed
func askForClose(r activeRelay) { func askForClose(r *activeRelay) {
closeChan <- r closeChan <- r
} }
@ -183,7 +189,7 @@ func processCloseRequests() {
} }
/// Start new call /// Start new call
func newCall(mainOffer receivedSignal, r activeRelay) { func newCall(mainOffer receivedSignal, r *activeRelay) {
log.Printf("Starting new call: %s", mainOffer.callHash) log.Printf("Starting new call: %s", mainOffer.callHash)
@ -221,6 +227,14 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
return return
} }
// Check if the connection is closed
mainPeerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
if s == webrtc.PeerConnectionStateClosed {
askForClose(r)
return
}
})
// Close peer connection // Close peer connection
defer mainPeerConnection.Close() defer mainPeerConnection.Close()
@ -279,7 +293,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
} }
if rtcpSendErr := mainPeerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil { if rtcpSendErr := mainPeerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil {
fmt.Println("Write RTCP error:", rtcpSendErr) log.Println("Write RTCP error:", rtcpSendErr)
askForClose(r) askForClose(r)
return return
} }
@ -308,7 +322,6 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
for { for {
i, readErr := remoteTrack.Read(rtpBuf) i, readErr := remoteTrack.Read(rtpBuf)
if readErr != nil { if readErr != nil {
// Could not read from remote track // Could not read from remote track
log.Println("Read error!", readErr) log.Println("Read error!", readErr)
askForClose(r) askForClose(r)