Send signals to the client

This commit is contained in:
Pierre HUBERT 2020-04-11 12:14:55 +02:00
parent 53904ed367
commit 0484e47890
3 changed files with 140 additions and 0 deletions

1
go.mod
View File

@ -4,6 +4,7 @@ go 1.14
require ( require (
github.com/gorilla/websocket v0.0.0-20200319175051-b65e62901fc1 github.com/gorilla/websocket v0.0.0-20200319175051-b65e62901fc1
github.com/pion/rtcp v1.2.1
github.com/pion/webrtc/v2 v2.2.4 github.com/pion/webrtc/v2 v2.2.4
gopkg.in/yaml.v2 v2.2.8 gopkg.in/yaml.v2 v2.2.8
) )

117
relay.go
View File

@ -6,9 +6,12 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io"
"log" "log"
"time" "time"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
) )
@ -124,6 +127,8 @@ func processCloseRequests() {
/// Start new call /// Start new call
func newCall(mainOffer receivedSignal, ch chan receivedSignal) { func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
log.Printf("Starting new call: %s", mainOffer.callHash)
// Since we are answering use PayloadTypes declared by offerer // Since we are answering use PayloadTypes declared by offerer
mediaEngine := webrtc.MediaEngine{} mediaEngine := webrtc.MediaEngine{}
err := mediaEngine.PopulateFromSDP(mainOffer.offer) err := mediaEngine.PopulateFromSDP(mainOffer.offer)
@ -133,4 +138,116 @@ func newCall(mainOffer receivedSignal, ch chan receivedSignal) {
return return
} }
// Create the API object with the MediaEngine
api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine))
// Setup config
peerConnectionConfig := webrtc.Configuration{
ICEServers: callConf.iceServers,
}
// Create a new RTCPeerConnection
peerConnection, err := api.NewPeerConnection(peerConnectionConfig)
if err != nil {
log.Println("Error: could not create peer connection!", err)
askForClose(mainOffer.callHash)
return
}
// Allow us to receive 1 audio & 1 video track
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil {
log.Println("Error: could not prepare to receive video track!", err)
askForClose(mainOffer.callHash)
return
}
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil {
log.Println("Error: could not prepare to receive audio track!", err)
askForClose(mainOffer.callHash)
return
}
localTrackChan := make(chan *webrtc.Track)
// Set a handler for when a new remote track starts, this just distributes all our packets
// to connected peers
peerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it
go func() {
rtcpPLIInterval := time.Second * 3
ticker := time.NewTicker(rtcpPLIInterval)
for range ticker.C {
if rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil {
fmt.Println(rtcpSendErr)
}
}
}()
// Create a local track, all our SFU clients will be fed via this track
localTrack, newTrackErr := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label())
if newTrackErr != nil {
log.Println("New track error!", err)
askForClose(mainOffer.callHash)
return
}
// Send the track to the main goroutine (in order to respond correctly to SDP requests & responses)
localTrackChan <- localTrack
rtpBuf := make([]byte, 1400)
for {
i, readErr := remoteTrack.Read(rtpBuf)
if readErr != nil {
// Could not read from remote track
log.Println("Read error!", err)
askForClose(mainOffer.callHash)
break
}
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet
if _, err = localTrack.Write(rtpBuf[:i]); err != nil && err != io.ErrClosedPipe {
log.Println("Write error!", err)
askForClose(mainOffer.callHash)
break
}
}
})
// Set the remote SessionDescription
err = peerConnection.SetRemoteDescription(mainOffer.offer)
if err != nil {
log.Println("Set remote description error!", err)
askForClose(mainOffer.callHash)
return
}
// Create answer
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Println("Create answer error!", err)
askForClose(mainOffer.callHash)
return
}
// Sets the LocalDescription, and starts our UDP listeners
err = peerConnection.SetLocalDescription(answer)
if err != nil {
log.Println("Set local description error!", err)
askForClose(mainOffer.callHash)
return
}
// Forward ice candidates
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
if c != nil {
sendSignal(mainOffer.callHash, mainOffer.peerID, c.ToJSON())
}
})
// Send anwser
sendSignal(mainOffer.callHash, mainOffer.peerID, answer)
} }

22
ws.go
View File

@ -21,6 +21,8 @@ type WsMessage struct {
Data interface{} Data interface{}
} }
var outMsgChan = make(chan interface{})
// Open websocket connection // Open websocket connection
func openWs(conf *Config) { func openWs(conf *Config) {
@ -44,6 +46,13 @@ func openWs(conf *Config) {
c.Close() c.Close()
}() }()
// Send outgoing messages
go func() {
for {
c.WriteJSON(<-outMsgChan)
}
}()
// Read remote messages // Read remote messages
for { for {
@ -81,3 +90,16 @@ func openWs(conf *Config) {
} }
} }
/// Send a signal to the API
func sendSignal(callHash, peerID string, data interface{}) {
msg := WsMessage{
Title: "signal",
PeerID: peerID,
CallHash: callHash,
Data: data,
}
outMsgChan <- msg
}