diff --git a/relay.go b/relay.go index 4057382..cc8868d 100644 --- a/relay.go +++ b/relay.go @@ -35,6 +35,7 @@ type activeRelay struct { channel chan receivedSignal callHash string id uint + closed bool } /// We keep for each connection its channel @@ -117,6 +118,7 @@ func closeConnection(r activeRelay) { if val, ok := connections[r.callHash]; ok && val.id == r.id { close(val.channel) delete(connections, r.callHash) + r.closed = true } } @@ -160,22 +162,25 @@ func newCall(mainOffer receivedSignal, r activeRelay) { } // Create a new RTCPeerConnection - peerConnection, err := api.NewPeerConnection(peerConnectionConfig) + mainPeerConnection, err := api.NewPeerConnection(peerConnectionConfig) if err != nil { log.Println("Error: could not create peer connection!", err) askForClose(r) return } + // Close peer connection + defer mainPeerConnection.Close() + // Allow us to receive 1 audio & 1 video track - if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, + if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { log.Println("Error: could not prepare to receive video track!", err) askForClose(r) return } - if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, + if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { log.Println("Error: could not prepare to receive audio track!", err) askForClose(r) @@ -186,7 +191,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) { // Set a handler for when a new remote track starts, this just distributes all our packets // to connected peers - peerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { + mainPeerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval // This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it @@ -194,14 +199,19 @@ func newCall(mainOffer receivedSignal, r activeRelay) { rtcpPLIInterval := time.Second * 3 ticker := time.NewTicker(rtcpPLIInterval) for range ticker.C { - if rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil { - fmt.Println(rtcpSendErr) + + if r.closed { + return + } + + if rtcpSendErr := mainPeerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil { + fmt.Println("Write RTCP error:", rtcpSendErr) } } }() // Create a local track, all our SFU clients will be fed via this track - localTrack, newTrackErr := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label()) + localTrack, newTrackErr := mainPeerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label()) if newTrackErr != nil { log.Println("New track error!", err) askForClose(r) @@ -217,10 +227,9 @@ func newCall(mainOffer receivedSignal, r activeRelay) { if readErr != nil { // Could not read from remote track - log.Println("Read error!", err) + log.Println("Read error!", readErr) askForClose(r) break - } // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet @@ -233,7 +242,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) { }) // Set the remote SessionDescription - err = peerConnection.SetRemoteDescription(mainOffer.offer) + err = mainPeerConnection.SetRemoteDescription(mainOffer.offer) if err != nil { log.Println("Set remote description error!", err) askForClose(r) @@ -241,7 +250,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) { } // Create answer - answer, err := peerConnection.CreateAnswer(nil) + answer, err := mainPeerConnection.CreateAnswer(nil) if err != nil { log.Println("Create answer error!", err) askForClose(r) @@ -249,7 +258,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) { } // Sets the LocalDescription, and starts our UDP listeners - err = peerConnection.SetLocalDescription(answer) + err = mainPeerConnection.SetLocalDescription(answer) if err != nil { log.Println("Set local description error!", err) askForClose(r) @@ -257,12 +266,122 @@ func newCall(mainOffer receivedSignal, r activeRelay) { } // Forward ice candidates - peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { + mainPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { if c != nil { sendSignal(mainOffer.callHash, mainOffer.peerID, c.ToJSON()) } }) // Send anwser - sendSignal(mainOffer.callHash, mainOffer.peerID, answer) + if !r.closed { + sendSignal(mainOffer.callHash, mainOffer.peerID, answer) + } + + // Keep a list of active tracks + localTracks := make([]*webrtc.Track, 0, 2) + + clients := make(map[string]*webrtc.PeerConnection, 1) + + // Close all clients connections at the end + defer func() { + for _, v := range clients { + v.Close() + } + }() + + for { + + // Receive new channels + stopCheck := len(localTracks) >= 2 // Stop check if we got all the channel + for !stopCheck { + select { + case t := <-localTrackChan: + localTracks = append(localTracks, t) + case <-time.After(time.Millisecond * 100): + stopCheck = true + } + } + + newMessage, ok := <-r.channel + + if !ok { + log.Printf("Channel closed: call hash: %s / call id: %d", r.callHash, r.id) + return + } + + // Check if we are creating a new connection + if newMessage.sigType == SDP { + + // Close any previous connection of this client + if val, ok := clients[newMessage.peerID]; ok { + val.Close() + } + + // Create new peer connection + newPeerConnection, err := api.NewPeerConnection(peerConnectionConfig) + if err != nil { + log.Printf("Error creating new remote peer connection: %s", err) + continue + } + clients[newMessage.peerID] = newPeerConnection + + // Add tracks + for _, value := range localTracks { + _, err = newPeerConnection.AddTrack(value) + if err != nil { + log.Printf("Error adding a track: %s", err) + continue + } + } + + // Set remote description + err = newPeerConnection.SetRemoteDescription(newMessage.offer) + if err != nil { + log.Printf("Could not set remote description (remote peer): %s", err) + continue + } + + // Create the answer + answer, err := newPeerConnection.CreateAnswer(nil) + if err != nil { + log.Printf("Could not create answer: %s!", err) + continue + } + + err = newPeerConnection.SetLocalDescription(answer) + if err != nil { + log.Printf("Could not set local description: %s!", err) + continue + } + + // Send ice candidates + newPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { + if c != nil { + sendSignal(r.callHash, newMessage.peerID, c.ToJSON()) + } + }) + + // Send answer + sendSignal(r.callHash, newMessage.peerID, answer) + + } else if newMessage.sigType == CANDIDATE { + + if newMessage.peerID == "0" { + // Ice candidate for main peer + if err := mainPeerConnection.AddICECandidate(newMessage.candidate); err != nil { + log.Printf("Err Adding ICECandidate to main peer: %s", err) + } + + } else if val, ok := clients[newMessage.peerID]; ok { + // Ice candidate for remote peer + if err := val.AddICECandidate(newMessage.candidate); err != nil { + log.Printf("Err adding ice candidate for remote peer: %s", err) + } + + } else { + log.Printf("Err tried to add ICE Candidate for non ready peer connection!") + } + + } + } } diff --git a/ws.go b/ws.go index 8707e04..4a86177 100644 --- a/ws.go +++ b/ws.go @@ -64,7 +64,6 @@ func openWs(conf *Config) { } // Process incoming messages - log.Printf("recv: %s", message) // Decode JSON var msg WsMessage @@ -85,6 +84,7 @@ func openWs(conf *Config) { default: println("Received unkown message type!") + log.Printf("recv: %s", message) break } }