/// RTC Relay /// /// @author Pierre Hubert package main import ( "encoding/json" "io" "log" "sync" "time" "github.com/pion/rtcp" "github.com/pion/webrtc/v2" ) const ( // SDP This is a SDP signal SDP = iota // CANDIDATE This is a candidate CANDIDATE = iota // RequestOffer for a broadcast receiver RequestOffer = iota // CloseConnection Requests the connection to be closed CloseConnection = iota ) type receivedSignal struct { peerID string callHash string sigType uint sdp webrtc.SessionDescription candidate webrtc.ICECandidateInit } type activeRelay struct { channel chan receivedSignal callHash string id uint closed bool } /// We keep for each connection its channel var closeChan = make(chan *activeRelay) var connections = make(map[string]*activeRelay) var currID uint = 0 /// Process incoming messages func onSignal(callHash, peerID string, data map[string]interface{}) { // Close all the channels that requested so processCloseRequests() // Decode received signal newSignal := receivedSignal{ peerID: peerID, callHash: callHash, } if data["type"] == "SDP" { newSignal.sigType = SDP // I have to re-encode data to initialize SDP var enc []byte enc, err := json.Marshal(data["data"]) if err != nil { log.Printf("Could not re-encode candidate ! %s", err) return } err = json.Unmarshal(enc, &newSignal.sdp) if err != nil { log.Printf("Discarding invalid candidate: %s", err) return } } else if data["type"] == "CANDIDATE" { newSignal.sigType = CANDIDATE // I have to re-encode data to initialize ICECandidate var enc []byte enc, err := json.Marshal(data["data"]) if err != nil { log.Printf("Could not re-encode candidate ! %s", err) return } err = json.Unmarshal(enc, &newSignal.candidate) if err != nil { log.Printf("Discarding invalid candidate: %s", err) return } } else if data["type"] == "REQUEST_OFFER" { // Request an offer newSignal.sigType = RequestOffer } else if data["type"] == "CLOSE_CONN" { // Close connection newSignal.sigType = CloseConnection } else { log.Fatalf("Invalid signal type: %s !", data["type"]) } // Check if we are attempting to connect as viewer to a non existing channel if _, ok := connections[callHash]; !ok { if peerID != "0" || newSignal.sigType != SDP { log.Printf("Attempting to connect as viewer | send candidate to a non-ready broadcast! (callhash %s / peerid %s)", callHash, peerID) return } } // Handle new offers if newSignal.sigType == SDP && peerID == "0" { // Check if we are overwriting another connection if val, ok := connections[callHash]; ok { closeConnection(val) } newRelay := activeRelay{ id: currID, callHash: callHash, channel: make(chan receivedSignal, 10), } currID++ connections[callHash] = &newRelay go newCall(newSignal, &newRelay) } else { // Forward the message to the channel connections[callHash].channel <- newSignal } } /// Request an offer for a client of a broadcast func onRequestOffer(callHash, peerID string) { onSignal(callHash, peerID, map[string]interface{}{ "type": "REQUEST_OFFER", }) } /// Request connections to be closed func onCloseConnection(callHash, peerID string) { onSignal(callHash, peerID, map[string]interface{}{ "type": "CLOSE_CONN", }) } /// Close a connection func closeConnection(r *activeRelay) { // Close the channel if !r.closed { close(r.channel) r.closed = true } // Remove the channel from the list if val, ok := connections[r.callHash]; ok && val.id == r.id { delete(connections, r.callHash) } log.Printf("Closing call %s / id: %d (%d remaining open calls)", r.callHash, r.id, len(connections)) } // Ask for a channel to be closed func askForClose(r *activeRelay) { closeChan <- r } // Process channel close requests (in thread safe way) func processCloseRequests() { for { select { case r := <-closeChan: closeConnection(r) case <-time.After(time.Millisecond * 10): return } } } /// Start new call func newCall(mainOffer receivedSignal, r *activeRelay) { log.Printf("Starting new call: %s / id: %d", r.callHash, r.id) // Ice candidates mutex var candidatesMux sync.Mutex pendingCandidates := make([]*webrtc.ICECandidate, 0) // Since we are answering use PayloadTypes declared by offerer mediaEngine := webrtc.MediaEngine{} err := mediaEngine.PopulateFromSDP(mainOffer.sdp) if err != nil { log.Println("Error: invalid data in offer!", err) askForClose(r) return } // Enable trickling s := webrtc.SettingEngine{} s.SetTrickle(false) // I did not manage to make the connection trickling, sorry... // Create the API object with the MediaEngine api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine), webrtc.WithSettingEngine(s)) // Setup config peerConnectionConfig := webrtc.Configuration{ ICEServers: callConf.iceServers, } // Create a new RTCPeerConnection mainPeerConnection, err := api.NewPeerConnection(peerConnectionConfig) if err != nil { log.Println("Error: could not create peer connection!", err) askForClose(r) return } // Close peer connection defer mainPeerConnection.Close() // Check if the connection is closed mainPeerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { if s == webrtc.PeerConnectionStateClosed { askForClose(r) return } }) // Forward ice candidates mainPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil || r.closed { return } // Lock the list of candidates candidatesMux.Lock() defer candidatesMux.Unlock() desc := mainPeerConnection.RemoteDescription() if desc == nil { // Add the signal to the pending list pendingCandidates = append(pendingCandidates, c) } else { // Send the signal directly sendSignal(mainOffer.callHash, mainOffer.peerID, c.ToJSON()) } }) // Allow us to receive 1 audio & 1 video track if callConf.allowVideo { if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { log.Println("Error: could not prepare to receive video track!", err) askForClose(r) return } } if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { log.Println("Error: could not prepare to receive audio track!", err) askForClose(r) return } localTrackChan := make(chan *webrtc.Track) // Set a handler for when a new remote track starts, this just distributes all our packets // to connected peers mainPeerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval // This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it go func() { rtcpPLIInterval := time.Second * 3 ticker := time.NewTicker(rtcpPLIInterval) for range ticker.C { if r.closed { return } if rtcpSendErr := mainPeerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil { log.Println("Write RTCP error:", rtcpSendErr) askForClose(r) return } } }() // Create a local track, all our SFU clients will be fed via this track trackID := "audio" trackLabel := "pion" // We need only one track label if remoteTrack.Kind() == webrtc.RTPCodecTypeVideo { // Check if video calls are allowed if !callConf.allowVideo { log.Printf("Blocked a video stream! Call hash: %s", r.callHash) return } trackID = "video" // We need two different track ids } localTrack, newTrackErr := mainPeerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), trackID, trackLabel) if newTrackErr != nil { log.Println("New track error!", err) askForClose(r) return } // Send the track to the main goroutine (in order to respond correctly to SDP requests & responses) localTrackChan <- localTrack rtpBuf := make([]byte, 1400) for { if r.closed { return } i, readErr := remoteTrack.Read(rtpBuf) if readErr != nil { // Could not read from remote track log.Println("Read error!", readErr) askForClose(r) break } // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet if _, err = localTrack.Write(rtpBuf[:i]); err != nil && err != io.ErrClosedPipe { log.Println("Write error!", err) askForClose(r) break } } }) // Set the remote SessionDescription err = mainPeerConnection.SetRemoteDescription(mainOffer.sdp) if err != nil { log.Println("Set remote description error!", err) askForClose(r) return } // Create answer answer, err := mainPeerConnection.CreateAnswer(nil) if err != nil { log.Println("Create answer error!", err) askForClose(r) return } // Sets the LocalDescription, and starts our UDP listeners err = mainPeerConnection.SetLocalDescription(answer) if err != nil { log.Println("Set local description error!", err) askForClose(r) return } // Send anwser if !r.closed { sendSignal(mainOffer.callHash, mainOffer.peerID, answer) // Warning ! Do not put this outside a specific block candidatesMux.Lock() // Send pending ice candidates for _, val := range pendingCandidates { sendSignal(mainOffer.callHash, mainOffer.peerID, val) } candidatesMux.Unlock() } // Keep a list of active tracks localTracks := make([]*webrtc.Track, 0, 2) clients := make(map[string]*webrtc.PeerConnection, 1) // Close all clients connections at the end defer func() { for _, v := range clients { v.Close() } }() // Enter messags loop for { // Receive new channels stopCheck := len(localTracks) >= 2 // Stop check if we got all the channel for !stopCheck { select { case t := <-localTrackChan: localTracks = append(localTracks, t) case <-time.After(time.Millisecond * 100): stopCheck = true } } newMessage, ok := <-r.channel if !ok { log.Printf("Channel closed: call hash: %s / call id: %d", r.callHash, r.id) return } // Check if we are creating a new connection if newMessage.sigType == RequestOffer { // Close any previous connection of this client if val, ok := clients[newMessage.peerID]; ok { val.Close() } // Ice candidates mutex var newCandidatesMux sync.Mutex newPendingCandidates := make([]*webrtc.ICECandidate, 0) // Create new peer connection newPeerConnection, err := api.NewPeerConnection(peerConnectionConfig) if err != nil { log.Printf("Error creating new remote peer connection: %s", err) continue } clients[newMessage.peerID] = newPeerConnection defer newPeerConnection.Close() // Send ice candidates newPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil || r.closed { return } newCandidatesMux.Lock() defer newCandidatesMux.Unlock() // Check if we have already determined remote descprition desc := newPeerConnection.LocalDescription() if desc == nil { // Append signal to the queue newPendingCandidates = append(newPendingCandidates, c) } else { // Send the signal immedialy sendSignal(r.callHash, newMessage.peerID, c.ToJSON()) } }) // Add tracks for _, value := range localTracks { _, err = newPeerConnection.AddTrack(value) if err != nil { log.Printf("Error adding a track: %s", err) continue } } // Create the offer offer, err := newPeerConnection.CreateOffer(nil) if err != nil { log.Printf("Could not create answer: %s!", err) continue } err = newPeerConnection.SetLocalDescription(offer) if err != nil { log.Printf("Could not set local description: %s!", err) continue } // Send offer sendSignal(r.callHash, newMessage.peerID, offer) // Send pending ice candidates newCandidatesMux.Lock() for _, c := range newPendingCandidates { sendSignal(r.callHash, newMessage.peerID, c.ToJSON()) } newCandidatesMux.Unlock() } else if newMessage.sigType == SDP { // Got an answer from the client if val, ok := clients[newMessage.peerID]; ok { // Set remote description if err := val.SetRemoteDescription(newMessage.sdp); err != nil { log.Printf("Could not set remote description! %s", err) } } else { log.Printf("Dropped un-usable answer: callHash %s / peer ID %s", newMessage.callHash, newMessage.peerID) } } else if newMessage.sigType == CANDIDATE { if newMessage.peerID == "0" { // Ice candidate for main peer if err := mainPeerConnection.AddICECandidate(newMessage.candidate); err != nil { log.Printf("Err Adding ICECandidate to main peer: %s", err) } } else if val, ok := clients[newMessage.peerID]; ok { // Ice candidate for remote peer if err := val.AddICECandidate(newMessage.candidate); err != nil { log.Printf("Err adding ice candidate for remote peer: %s", err) } } else { log.Printf("Err tried to add ICE Candidate for non ready peer connection!") } } else if newMessage.sigType == CloseConnection { // Check if we have to close the whole connection or just a client if newMessage.peerID == "0" { askForClose(r) return } else if val, ok := clients[newMessage.peerID]; ok { // Close a single client val.Close() delete(clients, newMessage.peerID) } } } }