/// RTC Relay /// /// @author Pierre Hubert package main import ( "encoding/json" "fmt" "io" "log" "time" "github.com/pion/rtcp" "github.com/pion/webrtc/v2" ) const ( // SDP This is a SDP signal SDP = iota // CANDIDATE This is a candidate CANDIDATE = iota // RequestOffer for a broadcast receiver RequestOffer = iota // CloseConnection Requests the connection to be closed CloseConnection = iota ) type receivedSignal struct { peerID string callHash string sigType uint sdp webrtc.SessionDescription candidate webrtc.ICECandidateInit } type activeRelay struct { channel chan receivedSignal callHash string id uint closed bool } /// We keep for each connection its channel var closeChan = make(chan activeRelay) var connections = make(map[string]activeRelay) var currID uint = 0 /// Process incoming messages func onSignal(callHash, peerID string, data map[string]interface{}) { // Close all the channels that requested so processCloseRequests() // Decode received signal newSignal := receivedSignal{ peerID: peerID, callHash: callHash, } if data["type"] == "SDP" { newSignal.sigType = SDP // I have to re-encode data to initialize SDP var enc []byte enc, err := json.Marshal(data["data"]) if err != nil { log.Printf("Could not re-encode candidate ! %s", err) return } err = json.Unmarshal(enc, &newSignal.sdp) if err != nil { log.Printf("Discarding invalid candidate: %s", err) return } } else if data["type"] == "CANDIDATE" { newSignal.sigType = CANDIDATE // I have to re-encode data to initialize ICECandidate var enc []byte enc, err := json.Marshal(data["data"]) if err != nil { log.Printf("Could not re-encode candidate ! %s", err) return } err = json.Unmarshal(enc, &newSignal.candidate) if err != nil { log.Printf("Discarding invalid candidate: %s", err) return } } else if data["type"] == "REQUEST_OFFER" { // Request an offer newSignal.sigType = RequestOffer } else if data["type"] == "CLOSE_CONN" { // Close connection newSignal.sigType = CloseConnection } else { log.Fatalf("Invalid signal type: %s !", data["type"]) } // Check if we are attempting to connect as viewer to a non existing channel if _, ok := connections[callHash]; !ok { if peerID != "0" || newSignal.sigType != SDP { println("Attempting to connect as viewer | send candidate to a non-ready broadcast!") return } } // Handle new offers if newSignal.sigType == SDP && peerID == "0" { // Check if we are overwriting another connection if val, ok := connections[callHash]; ok { closeConnection(val) } connections[callHash] = activeRelay{ id: currID, callHash: callHash, channel: make(chan receivedSignal, 10), } currID++ go newCall(newSignal, connections[callHash]) } else { // Forward the message to the channel connections[callHash].channel <- newSignal } } /// Request an offer for a client of a broadcast func onRequestOffer(callHash, peerID string) { onSignal(callHash, peerID, map[string]interface{}{ "type": "REQUEST_OFFER", }) } /// Request connections to be closed func onCloseConnection(callHash, peerID string) { onSignal(callHash, peerID, map[string]interface{}{ "type": "CLOSE_CONN", }) } /// Close a connection func closeConnection(r activeRelay) { log.Printf("Closing call %s / id: %d", r.callHash, r.id) if val, ok := connections[r.callHash]; ok && val.id == r.id { close(val.channel) delete(connections, r.callHash) r.closed = true } } // Ask for a channel to be closed func askForClose(r activeRelay) { closeChan <- r } // Process channel close requests (in thread safe way) func processCloseRequests() { for { select { case r := <-closeChan: closeConnection(r) case <-time.After(time.Millisecond * 10): return } } } /// Start new call func newCall(mainOffer receivedSignal, r activeRelay) { log.Printf("Starting new call: %s", mainOffer.callHash) // Since we are answering use PayloadTypes declared by offerer mediaEngine := webrtc.MediaEngine{} err := mediaEngine.PopulateFromSDP(mainOffer.sdp) if err != nil { log.Println("Error: invalid data in offer!", err) askForClose(r) return } // Create the API object with the MediaEngine api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) // Setup config peerConnectionConfig := webrtc.Configuration{ ICEServers: callConf.iceServers, } // Create a new RTCPeerConnection mainPeerConnection, err := api.NewPeerConnection(peerConnectionConfig) if err != nil { log.Println("Error: could not create peer connection!", err) askForClose(r) return } // Close peer connection defer mainPeerConnection.Close() // Allow us to receive 1 audio & 1 video track if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { log.Println("Error: could not prepare to receive video track!", err) askForClose(r) return } if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { log.Println("Error: could not prepare to receive audio track!", err) askForClose(r) return } localTrackChan := make(chan *webrtc.Track) // Set a handler for when a new remote track starts, this just distributes all our packets // to connected peers mainPeerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval // This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it go func() { rtcpPLIInterval := time.Second * 3 ticker := time.NewTicker(rtcpPLIInterval) for range ticker.C { if r.closed { return } if rtcpSendErr := mainPeerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil { fmt.Println("Write RTCP error:", rtcpSendErr) askForClose(r) return } } }() // Create a local track, all our SFU clients will be fed via this track trackID := "audio" trackLabel := "pion" // We need only one track label if remoteTrack.Kind() == webrtc.RTPCodecTypeVideo { trackID = "video" // We need two different track ids } localTrack, newTrackErr := mainPeerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), trackID, trackLabel) if newTrackErr != nil { log.Println("New track error!", err) askForClose(r) return } // Send the track to the main goroutine (in order to respond correctly to SDP requests & responses) localTrackChan <- localTrack rtpBuf := make([]byte, 1400) for { i, readErr := remoteTrack.Read(rtpBuf) if readErr != nil { // Could not read from remote track log.Println("Read error!", readErr) askForClose(r) break } // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet if _, err = localTrack.Write(rtpBuf[:i]); err != nil && err != io.ErrClosedPipe { log.Println("Write error!", err) askForClose(r) break } } }) // Set the remote SessionDescription err = mainPeerConnection.SetRemoteDescription(mainOffer.sdp) if err != nil { log.Println("Set remote description error!", err) askForClose(r) return } // Create answer answer, err := mainPeerConnection.CreateAnswer(nil) if err != nil { log.Println("Create answer error!", err) askForClose(r) return } // Sets the LocalDescription, and starts our UDP listeners err = mainPeerConnection.SetLocalDescription(answer) if err != nil { log.Println("Set local description error!", err) askForClose(r) return } // Forward ice candidates mainPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { if c != nil { sendSignal(mainOffer.callHash, mainOffer.peerID, c.ToJSON()) } }) // Send anwser if !r.closed { sendSignal(mainOffer.callHash, mainOffer.peerID, answer) } // Keep a list of active tracks localTracks := make([]*webrtc.Track, 0, 2) clients := make(map[string]*webrtc.PeerConnection, 1) // Close all clients connections at the end defer func() { for _, v := range clients { v.Close() } }() for { // Receive new channels stopCheck := len(localTracks) >= 2 // Stop check if we got all the channel for !stopCheck { select { case t := <-localTrackChan: localTracks = append(localTracks, t) case <-time.After(time.Millisecond * 100): stopCheck = true } } newMessage, ok := <-r.channel if !ok { log.Printf("Channel closed: call hash: %s / call id: %d", r.callHash, r.id) return } // Check if we are creating a new connection if newMessage.sigType == RequestOffer { // Close any previous connection of this client if val, ok := clients[newMessage.peerID]; ok { val.Close() } // Create new peer connection newPeerConnection, err := api.NewPeerConnection(peerConnectionConfig) if err != nil { log.Printf("Error creating new remote peer connection: %s", err) continue } clients[newMessage.peerID] = newPeerConnection // Add tracks for _, value := range localTracks { _, err = newPeerConnection.AddTrack(value) if err != nil { log.Printf("Error adding a track: %s", err) continue } } // Create the offer offer, err := newPeerConnection.CreateOffer(nil) if err != nil { log.Printf("Could not create answer: %s!", err) continue } err = newPeerConnection.SetLocalDescription(offer) if err != nil { log.Printf("Could not set local description: %s!", err) continue } // Send ice candidates newPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { if c != nil { sendSignal(r.callHash, newMessage.peerID, c.ToJSON()) } }) // Send offer sendSignal(r.callHash, newMessage.peerID, offer) } else if newMessage.sigType == SDP { // Got an answer from the client if val, ok := clients[newMessage.peerID]; ok { // Set remote description if err := val.SetRemoteDescription(newMessage.sdp); err != nil { log.Printf("Could not set remote description! %s", err) } } else { log.Printf("Dropped un-usable answer: callHash %s / peer ID %s", newMessage.callHash, newMessage.peerID) } } else if newMessage.sigType == CANDIDATE { if newMessage.peerID == "0" { // Ice candidate for main peer if err := mainPeerConnection.AddICECandidate(newMessage.candidate); err != nil { log.Printf("Err Adding ICECandidate to main peer: %s", err) } } else if val, ok := clients[newMessage.peerID]; ok { // Ice candidate for remote peer if err := val.AddICECandidate(newMessage.candidate); err != nil { log.Printf("Err adding ice candidate for remote peer: %s", err) } } else { log.Printf("Err tried to add ICE Candidate for non ready peer connection!") } } else if newMessage.sigType == CloseConnection { // Check if we have to close the whole connection or just a client if newMessage.peerID == "0" { askForClose(r) return } else if val, ok := clients[newMessage.peerID]; ok { // Close a single client val.Close() delete(clients, newMessage.peerID) } } } }