/// RTC Relay /// /// @author Pierre Hubert package main import ( "encoding/json" "fmt" "io" "log" "time" "github.com/pion/rtcp" "github.com/pion/webrtc/v2" ) const ( // SDP This is a SDP signal SDP = iota // CANDIDATE This is a candidate CANDIDATE = iota ) type receivedSignal struct { peerID string callHash string sigType uint offer webrtc.SessionDescription candidate webrtc.ICECandidateInit } type activeRelay struct { channel chan receivedSignal callHash string id uint } /// We keep for each connection its channel var closeChan = make(chan activeRelay) var connections = make(map[string]activeRelay) var currID uint = 0 /// Process incoming messages func onSignal(callHash, peerID string, data map[string]interface{}) { // Close all the channels that requested so processCloseRequests() // Decode received signal newSignal := receivedSignal{ peerID: peerID, callHash: callHash, } if data["type"] == "SDP" { newSignal.sigType = SDP newSignal.offer.Type = webrtc.SDPTypeOffer newSignal.offer.SDP = data["data"].(map[string]interface{})["sdp"].(string) } else if data["type"] == "CANDIDATE" { newSignal.sigType = CANDIDATE // I have to re-encode data to initialize ICECandidate var enc []byte enc, err := json.Marshal(data["data"]) if err != nil { log.Printf("Could not re-encode candidate ! %s", err) return } err = json.Unmarshal(enc, &newSignal.candidate) if err != nil { log.Printf("Discarding invalid candidate: %s", err) return } } else { log.Fatalf("Invalid signal type: %s !", data["type"]) } // Check if we are attempting to connect as viewer to a non existing channel if _, ok := connections[callHash]; !ok { if peerID != "0" || newSignal.sigType != SDP { println("Attempting to connect as viewer | send candidate to a non-ready broadcast!") return } } // Handle new offers if newSignal.sigType == SDP && peerID == "0" { // Check if we are overwriting another connection if val, ok := connections[callHash]; ok { closeConnection(val) } connections[callHash] = activeRelay{ id: currID, callHash: callHash, channel: make(chan receivedSignal, 10), } currID++ go newCall(newSignal, connections[callHash]) } else { // Forward the message to the channel connections[callHash].channel <- newSignal } } /// Close a connection func closeConnection(r activeRelay) { log.Printf("Closing call %s / id: %d", r.callHash, r.id) if val, ok := connections[r.callHash]; ok && val.id == r.id { close(val.channel) delete(connections, r.callHash) } } // Ask for a channel to be closed func askForClose(r activeRelay) { closeChan <- r } // Process channel close requests (in thread safe way) func processCloseRequests() { for { select { case r := <-closeChan: closeConnection(r) case <-time.After(time.Millisecond * 10): return } } } /// Start new call func newCall(mainOffer receivedSignal, r activeRelay) { log.Printf("Starting new call: %s", mainOffer.callHash) // Since we are answering use PayloadTypes declared by offerer mediaEngine := webrtc.MediaEngine{} err := mediaEngine.PopulateFromSDP(mainOffer.offer) if err != nil { log.Println("Error: invalid data in offer!", err) askForClose(r) return } // Create the API object with the MediaEngine api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) // Setup config peerConnectionConfig := webrtc.Configuration{ ICEServers: callConf.iceServers, } // Create a new RTCPeerConnection peerConnection, err := api.NewPeerConnection(peerConnectionConfig) if err != nil { log.Println("Error: could not create peer connection!", err) askForClose(r) return } // Allow us to receive 1 audio & 1 video track if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { log.Println("Error: could not prepare to receive video track!", err) askForClose(r) return } if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { log.Println("Error: could not prepare to receive audio track!", err) askForClose(r) return } localTrackChan := make(chan *webrtc.Track) // Set a handler for when a new remote track starts, this just distributes all our packets // to connected peers peerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval // This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it go func() { rtcpPLIInterval := time.Second * 3 ticker := time.NewTicker(rtcpPLIInterval) for range ticker.C { if rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil { fmt.Println(rtcpSendErr) } } }() // Create a local track, all our SFU clients will be fed via this track localTrack, newTrackErr := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label()) if newTrackErr != nil { log.Println("New track error!", err) askForClose(r) return } // Send the track to the main goroutine (in order to respond correctly to SDP requests & responses) localTrackChan <- localTrack rtpBuf := make([]byte, 1400) for { i, readErr := remoteTrack.Read(rtpBuf) if readErr != nil { // Could not read from remote track log.Println("Read error!", err) askForClose(r) break } // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet if _, err = localTrack.Write(rtpBuf[:i]); err != nil && err != io.ErrClosedPipe { log.Println("Write error!", err) askForClose(r) break } } }) // Set the remote SessionDescription err = peerConnection.SetRemoteDescription(mainOffer.offer) if err != nil { log.Println("Set remote description error!", err) askForClose(r) return } // Create answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { log.Println("Create answer error!", err) askForClose(r) return } // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) if err != nil { log.Println("Set local description error!", err) askForClose(r) return } // Forward ice candidates peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { if c != nil { sendSignal(mainOffer.callHash, mainOffer.peerID, c.ToJSON()) } }) // Send anwser sendSignal(mainOffer.callHash, mainOffer.peerID, answer) }