From 0484e478903cbeae8647a4347f9e0a51f83f01ed Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Sat, 11 Apr 2020 12:14:55 +0200 Subject: [PATCH] Send signals to the client --- go.mod | 1 + relay.go | 117 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ ws.go | 22 +++++++++++ 3 files changed, 140 insertions(+) diff --git a/go.mod b/go.mod index 89d4bcb..9a36f53 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.14 require ( github.com/gorilla/websocket v0.0.0-20200319175051-b65e62901fc1 + github.com/pion/rtcp v1.2.1 github.com/pion/webrtc/v2 v2.2.4 gopkg.in/yaml.v2 v2.2.8 ) diff --git a/relay.go b/relay.go index 6357421..c1623e0 100644 --- a/relay.go +++ b/relay.go @@ -6,9 +6,12 @@ package main import ( "encoding/json" + "fmt" + "io" "log" "time" + "github.com/pion/rtcp" "github.com/pion/webrtc/v2" ) @@ -124,6 +127,8 @@ func processCloseRequests() { /// Start new call func newCall(mainOffer receivedSignal, ch chan receivedSignal) { + log.Printf("Starting new call: %s", mainOffer.callHash) + // Since we are answering use PayloadTypes declared by offerer mediaEngine := webrtc.MediaEngine{} err := mediaEngine.PopulateFromSDP(mainOffer.offer) @@ -133,4 +138,116 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) { return } + // Create the API object with the MediaEngine + api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) + + // Setup config + peerConnectionConfig := webrtc.Configuration{ + ICEServers: callConf.iceServers, + } + + // Create a new RTCPeerConnection + peerConnection, err := api.NewPeerConnection(peerConnectionConfig) + if err != nil { + log.Println("Error: could not create peer connection!", err) + askForClose(mainOffer.callHash) + return + } + + // Allow us to receive 1 audio & 1 video track + if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { + log.Println("Error: could not prepare to receive video track!", err) + askForClose(mainOffer.callHash) + return + } + + if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { + log.Println("Error: could not prepare to receive audio track!", err) + askForClose(mainOffer.callHash) + return + } + + localTrackChan := make(chan *webrtc.Track) + + // Set a handler for when a new remote track starts, this just distributes all our packets + // to connected peers + peerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { + + // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval + // This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it + go func() { + rtcpPLIInterval := time.Second * 3 + ticker := time.NewTicker(rtcpPLIInterval) + for range ticker.C { + if rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil { + fmt.Println(rtcpSendErr) + } + } + }() + + // Create a local track, all our SFU clients will be fed via this track + localTrack, newTrackErr := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label()) + if newTrackErr != nil { + log.Println("New track error!", err) + askForClose(mainOffer.callHash) + return + } + + // Send the track to the main goroutine (in order to respond correctly to SDP requests & responses) + localTrackChan <- localTrack + + rtpBuf := make([]byte, 1400) + for { + i, readErr := remoteTrack.Read(rtpBuf) + if readErr != nil { + + // Could not read from remote track + log.Println("Read error!", err) + askForClose(mainOffer.callHash) + break + + } + + // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet + if _, err = localTrack.Write(rtpBuf[:i]); err != nil && err != io.ErrClosedPipe { + log.Println("Write error!", err) + askForClose(mainOffer.callHash) + break + } + } + }) + + // Set the remote SessionDescription + err = peerConnection.SetRemoteDescription(mainOffer.offer) + if err != nil { + log.Println("Set remote description error!", err) + askForClose(mainOffer.callHash) + return + } + + // Create answer + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + log.Println("Create answer error!", err) + askForClose(mainOffer.callHash) + return + } + + // Sets the LocalDescription, and starts our UDP listeners + err = peerConnection.SetLocalDescription(answer) + if err != nil { + log.Println("Set local description error!", err) + askForClose(mainOffer.callHash) + return + } + + // Forward ice candidates + peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { + if c != nil { + sendSignal(mainOffer.callHash, mainOffer.peerID, c.ToJSON()) + } + }) + + // Send anwser + sendSignal(mainOffer.callHash, mainOffer.peerID, answer) } diff --git a/ws.go b/ws.go index 8bfa278..2f1643b 100644 --- a/ws.go +++ b/ws.go @@ -21,6 +21,8 @@ type WsMessage struct { Data interface{} } +var outMsgChan = make(chan interface{}) + // Open websocket connection func openWs(conf *Config) { @@ -44,6 +46,13 @@ func openWs(conf *Config) { c.Close() }() + // Send outgoing messages + go func() { + for { + c.WriteJSON(<-outMsgChan) + } + }() + // Read remote messages for { @@ -81,3 +90,16 @@ func openWs(conf *Config) { } } + +/// Send a signal to the API +func sendSignal(callHash, peerID string, data interface{}) { + + msg := WsMessage{ + Title: "signal", + PeerID: peerID, + CallHash: callHash, + Data: data, + } + + outMsgChan <- msg +}