Thread-safely open & close channels

This commit is contained in:
Pierre HUBERT 2020-04-11 11:26:51 +02:00
parent 60605e9ada
commit 53904ed367

View File

@ -6,8 +6,8 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt"
"log" "log"
"time"
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
) )
@ -21,19 +21,28 @@ const (
) )
type receivedSignal struct { type receivedSignal struct {
peerID string
callHash string
sigType uint sigType uint
offer webrtc.SessionDescription offer webrtc.SessionDescription
candidate webrtc.ICECandidateInit candidate webrtc.ICECandidateInit
} }
/// We keep for each connection its channel /// We keep for each connection its channel
var closeChan = make(chan string)
var connections = make(map[string]chan receivedSignal) var connections = make(map[string]chan receivedSignal)
/// Process incoming messages /// Process incoming messages
func onSignal(callHash, peerID string, data map[string]interface{}) { func onSignal(callHash, peerID string, data map[string]interface{}) {
// Close all the channels that requested so
processCloseRequests()
// Decode received signal // Decode received signal
newSignal := receivedSignal{} newSignal := receivedSignal{
peerID: peerID,
callHash: callHash,
}
if data["type"] == "SDP" { if data["type"] == "SDP" {
newSignal.sigType = SDP newSignal.sigType = SDP
@ -61,6 +70,67 @@ func onSignal(callHash, peerID string, data map[string]interface{}) {
log.Fatalf("Invalid signal type: %s !", data["type"]) log.Fatalf("Invalid signal type: %s !", data["type"])
} }
println("Signal successfully processed!") // Check if we are attempting to connect as viewer to a non existing channel
fmt.Println("sig:", newSignal) if _, ok := connections[callHash]; !ok {
if peerID != "0" || newSignal.sigType != SDP {
println("Attempting to connect as viewer | send candidate to a non-ready broadcast!")
return
}
}
// Handle new offers
if newSignal.sigType == SDP && peerID == "0" {
// Check if we are overwriting another connection
if _, ok := connections[callHash]; ok {
closeConnection(callHash)
}
connections[callHash] = make(chan receivedSignal, 10)
go newCall(newSignal, connections[callHash])
} else {
// Forward the message to the channel
connections[callHash] <- newSignal
}
}
/// Close a connection
func closeConnection(callHash string) {
log.Printf("Closing call %s", callHash)
if val, ok := connections[callHash]; ok {
close(val)
delete(connections, callHash)
}
}
// Ask for a channel to be closed
func askForClose(callHash string) {
closeChan <- callHash
}
// Process channel close requests (in thread safe way)
func processCloseRequests() {
for {
select {
case id := <-closeChan:
closeConnection(id)
case <-time.After(time.Millisecond * 10):
return
}
}
}
/// Start new call
func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
// Since we are answering use PayloadTypes declared by offerer
mediaEngine := webrtc.MediaEngine{}
err := mediaEngine.PopulateFromSDP(mainOffer.offer)
if err != nil {
log.Println("Error: invalid data in offer!", err)
askForClose(mainOffer.callHash)
return
}
} }