ComunicRTCProxy/relay.go

417 lines
10 KiB
Go
Raw Normal View History

2020-04-11 08:30:28 +00:00
/// RTC Relay
///
/// @author Pierre Hubert
package main
import (
"encoding/json"
2020-04-11 10:14:55 +00:00
"fmt"
"io"
2020-04-11 08:30:28 +00:00
"log"
2020-04-11 09:26:51 +00:00
"time"
2020-04-11 08:30:28 +00:00
2020-04-11 10:14:55 +00:00
"github.com/pion/rtcp"
2020-04-11 08:30:28 +00:00
"github.com/pion/webrtc/v2"
)
const (
// SDP This is a SDP signal
SDP = iota
// CANDIDATE This is a candidate
CANDIDATE = iota
2020-04-11 15:57:32 +00:00
// CloseConnection Requests the connection to be closed
CloseConnection = iota
2020-04-11 08:30:28 +00:00
)
type receivedSignal struct {
2020-04-11 09:26:51 +00:00
peerID string
callHash string
2020-04-11 08:30:28 +00:00
sigType uint
offer webrtc.SessionDescription
candidate webrtc.ICECandidateInit
}
2020-04-11 11:14:27 +00:00
type activeRelay struct {
channel chan receivedSignal
callHash string
id uint
2020-04-11 14:32:34 +00:00
closed bool
2020-04-11 11:14:27 +00:00
}
2020-04-11 08:30:28 +00:00
/// We keep for each connection its channel
2020-04-11 11:14:27 +00:00
var closeChan = make(chan activeRelay)
var connections = make(map[string]activeRelay)
var currID uint = 0
2020-04-11 08:30:28 +00:00
/// Process incoming messages
func onSignal(callHash, peerID string, data map[string]interface{}) {
2020-04-11 09:26:51 +00:00
// Close all the channels that requested so
processCloseRequests()
2020-04-11 08:30:28 +00:00
// Decode received signal
2020-04-11 09:26:51 +00:00
newSignal := receivedSignal{
peerID: peerID,
callHash: callHash,
}
2020-04-11 08:30:28 +00:00
if data["type"] == "SDP" {
newSignal.sigType = SDP
2020-04-11 15:57:32 +00:00
// I have to re-encode data to initialize SDP
var enc []byte
enc, err := json.Marshal(data["data"])
if err != nil {
log.Printf("Could not re-encode candidate ! %s", err)
return
}
err = json.Unmarshal(enc, &newSignal.offer)
if err != nil {
log.Printf("Discarding invalid candidate: %s", err)
return
}
2020-04-11 08:30:28 +00:00
} else if data["type"] == "CANDIDATE" {
newSignal.sigType = CANDIDATE
// I have to re-encode data to initialize ICECandidate
var enc []byte
enc, err := json.Marshal(data["data"])
if err != nil {
log.Printf("Could not re-encode candidate ! %s", err)
return
}
err = json.Unmarshal(enc, &newSignal.candidate)
if err != nil {
log.Printf("Discarding invalid candidate: %s", err)
return
}
2020-04-11 15:57:32 +00:00
} else if data["type"] == "CLOSE_CONN" {
// Close connection
newSignal.sigType = CloseConnection
2020-04-11 08:30:28 +00:00
} else {
log.Fatalf("Invalid signal type: %s !", data["type"])
}
2020-04-11 09:26:51 +00:00
// Check if we are attempting to connect as viewer to a non existing channel
if _, ok := connections[callHash]; !ok {
if peerID != "0" || newSignal.sigType != SDP {
println("Attempting to connect as viewer | send candidate to a non-ready broadcast!")
return
}
}
// Handle new offers
if newSignal.sigType == SDP && peerID == "0" {
// Check if we are overwriting another connection
2020-04-11 11:14:27 +00:00
if val, ok := connections[callHash]; ok {
closeConnection(val)
2020-04-11 09:26:51 +00:00
}
2020-04-11 11:14:27 +00:00
connections[callHash] = activeRelay{
id: currID,
callHash: callHash,
channel: make(chan receivedSignal, 10),
}
currID++
2020-04-11 09:26:51 +00:00
go newCall(newSignal, connections[callHash])
} else {
// Forward the message to the channel
2020-04-11 11:14:27 +00:00
connections[callHash].channel <- newSignal
2020-04-11 09:26:51 +00:00
}
}
2020-04-11 15:57:32 +00:00
/// Request connections to be closed
func onCloseConnection(callHash, peerID string) {
onSignal(callHash, peerID, map[string]interface{}{
"type": "CLOSE_CONN",
})
}
2020-04-11 09:26:51 +00:00
/// Close a connection
2020-04-11 11:14:27 +00:00
func closeConnection(r activeRelay) {
log.Printf("Closing call %s / id: %d", r.callHash, r.id)
if val, ok := connections[r.callHash]; ok && val.id == r.id {
close(val.channel)
delete(connections, r.callHash)
2020-04-11 14:32:34 +00:00
r.closed = true
2020-04-11 09:26:51 +00:00
}
}
// Ask for a channel to be closed
2020-04-11 11:14:27 +00:00
func askForClose(r activeRelay) {
closeChan <- r
2020-04-11 09:26:51 +00:00
}
// Process channel close requests (in thread safe way)
func processCloseRequests() {
for {
select {
2020-04-11 11:14:27 +00:00
case r := <-closeChan:
closeConnection(r)
2020-04-11 09:26:51 +00:00
case <-time.After(time.Millisecond * 10):
return
}
}
}
/// Start new call
2020-04-11 11:14:27 +00:00
func newCall(mainOffer receivedSignal, r activeRelay) {
2020-04-11 09:26:51 +00:00
2020-04-11 10:14:55 +00:00
log.Printf("Starting new call: %s", mainOffer.callHash)
2020-04-11 09:26:51 +00:00
// Since we are answering use PayloadTypes declared by offerer
mediaEngine := webrtc.MediaEngine{}
err := mediaEngine.PopulateFromSDP(mainOffer.offer)
if err != nil {
log.Println("Error: invalid data in offer!", err)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 09:26:51 +00:00
return
}
2020-04-11 10:14:55 +00:00
// Create the API object with the MediaEngine
api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine))
// Setup config
peerConnectionConfig := webrtc.Configuration{
ICEServers: callConf.iceServers,
}
// Create a new RTCPeerConnection
2020-04-11 14:32:34 +00:00
mainPeerConnection, err := api.NewPeerConnection(peerConnectionConfig)
2020-04-11 10:14:55 +00:00
if err != nil {
log.Println("Error: could not create peer connection!", err)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 10:14:55 +00:00
return
}
2020-04-11 14:32:34 +00:00
// Close peer connection
defer mainPeerConnection.Close()
2020-04-11 10:14:55 +00:00
// Allow us to receive 1 audio & 1 video track
2020-04-11 14:32:34 +00:00
if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
2020-04-11 12:02:46 +00:00
webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
2020-04-11 10:14:55 +00:00
log.Println("Error: could not prepare to receive video track!", err)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 10:14:55 +00:00
return
}
2020-04-11 14:32:34 +00:00
if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
2020-04-11 12:02:46 +00:00
webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
2020-04-11 10:14:55 +00:00
log.Println("Error: could not prepare to receive audio track!", err)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 10:14:55 +00:00
return
}
localTrackChan := make(chan *webrtc.Track)
// Set a handler for when a new remote track starts, this just distributes all our packets
// to connected peers
2020-04-11 14:32:34 +00:00
mainPeerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) {
2020-04-11 10:14:55 +00:00
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it
go func() {
rtcpPLIInterval := time.Second * 3
ticker := time.NewTicker(rtcpPLIInterval)
for range ticker.C {
2020-04-11 14:32:34 +00:00
if r.closed {
return
}
if rtcpSendErr := mainPeerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil {
fmt.Println("Write RTCP error:", rtcpSendErr)
2020-04-11 14:41:20 +00:00
askForClose(r)
return
2020-04-11 10:14:55 +00:00
}
}
}()
// Create a local track, all our SFU clients will be fed via this track
2020-04-11 14:32:34 +00:00
localTrack, newTrackErr := mainPeerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label())
2020-04-11 10:14:55 +00:00
if newTrackErr != nil {
log.Println("New track error!", err)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 10:14:55 +00:00
return
}
// Send the track to the main goroutine (in order to respond correctly to SDP requests & responses)
localTrackChan <- localTrack
rtpBuf := make([]byte, 1400)
for {
i, readErr := remoteTrack.Read(rtpBuf)
if readErr != nil {
// Could not read from remote track
2020-04-11 14:32:34 +00:00
log.Println("Read error!", readErr)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 10:14:55 +00:00
break
}
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet
if _, err = localTrack.Write(rtpBuf[:i]); err != nil && err != io.ErrClosedPipe {
log.Println("Write error!", err)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 10:14:55 +00:00
break
}
}
})
// Set the remote SessionDescription
2020-04-11 14:32:34 +00:00
err = mainPeerConnection.SetRemoteDescription(mainOffer.offer)
2020-04-11 10:14:55 +00:00
if err != nil {
log.Println("Set remote description error!", err)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 10:14:55 +00:00
return
}
// Create answer
2020-04-11 14:32:34 +00:00
answer, err := mainPeerConnection.CreateAnswer(nil)
2020-04-11 10:14:55 +00:00
if err != nil {
log.Println("Create answer error!", err)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 10:14:55 +00:00
return
}
// Sets the LocalDescription, and starts our UDP listeners
2020-04-11 14:32:34 +00:00
err = mainPeerConnection.SetLocalDescription(answer)
2020-04-11 10:14:55 +00:00
if err != nil {
log.Println("Set local description error!", err)
2020-04-11 11:14:27 +00:00
askForClose(r)
2020-04-11 10:14:55 +00:00
return
}
// Forward ice candidates
2020-04-11 14:32:34 +00:00
mainPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
2020-04-11 10:14:55 +00:00
if c != nil {
sendSignal(mainOffer.callHash, mainOffer.peerID, c.ToJSON())
}
})
// Send anwser
2020-04-11 14:32:34 +00:00
if !r.closed {
sendSignal(mainOffer.callHash, mainOffer.peerID, answer)
}
// Keep a list of active tracks
localTracks := make([]*webrtc.Track, 0, 2)
clients := make(map[string]*webrtc.PeerConnection, 1)
// Close all clients connections at the end
defer func() {
for _, v := range clients {
v.Close()
}
}()
for {
// Receive new channels
stopCheck := len(localTracks) >= 2 // Stop check if we got all the channel
for !stopCheck {
select {
case t := <-localTrackChan:
localTracks = append(localTracks, t)
case <-time.After(time.Millisecond * 100):
stopCheck = true
}
}
newMessage, ok := <-r.channel
if !ok {
log.Printf("Channel closed: call hash: %s / call id: %d", r.callHash, r.id)
return
}
// Check if we are creating a new connection
if newMessage.sigType == SDP {
// Close any previous connection of this client
if val, ok := clients[newMessage.peerID]; ok {
val.Close()
}
// Create new peer connection
newPeerConnection, err := api.NewPeerConnection(peerConnectionConfig)
if err != nil {
log.Printf("Error creating new remote peer connection: %s", err)
continue
}
clients[newMessage.peerID] = newPeerConnection
// Add tracks
for _, value := range localTracks {
_, err = newPeerConnection.AddTrack(value)
if err != nil {
log.Printf("Error adding a track: %s", err)
continue
}
}
// Set remote description
err = newPeerConnection.SetRemoteDescription(newMessage.offer)
if err != nil {
log.Printf("Could not set remote description (remote peer): %s", err)
continue
}
// Create the answer
answer, err := newPeerConnection.CreateAnswer(nil)
if err != nil {
log.Printf("Could not create answer: %s!", err)
continue
}
err = newPeerConnection.SetLocalDescription(answer)
if err != nil {
log.Printf("Could not set local description: %s!", err)
continue
}
// Send ice candidates
newPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
if c != nil {
sendSignal(r.callHash, newMessage.peerID, c.ToJSON())
}
})
// Send answer
sendSignal(r.callHash, newMessage.peerID, answer)
} else if newMessage.sigType == CANDIDATE {
if newMessage.peerID == "0" {
// Ice candidate for main peer
if err := mainPeerConnection.AddICECandidate(newMessage.candidate); err != nil {
log.Printf("Err Adding ICECandidate to main peer: %s", err)
}
} else if val, ok := clients[newMessage.peerID]; ok {
// Ice candidate for remote peer
if err := val.AddICECandidate(newMessage.candidate); err != nil {
log.Printf("Err adding ice candidate for remote peer: %s", err)
}
} else {
log.Printf("Err tried to add ICE Candidate for non ready peer connection!")
}
}
}
2020-04-11 08:30:28 +00:00
}