First working retransmission

This commit is contained in:
Pierre HUBERT 2020-04-11 16:32:34 +02:00
parent 0d09b436d2
commit 584504b40c
2 changed files with 134 additions and 15 deletions

145
relay.go
View File

@ -35,6 +35,7 @@ type activeRelay struct {
channel chan receivedSignal channel chan receivedSignal
callHash string callHash string
id uint id uint
closed bool
} }
/// We keep for each connection its channel /// We keep for each connection its channel
@ -117,6 +118,7 @@ func closeConnection(r activeRelay) {
if val, ok := connections[r.callHash]; ok && val.id == r.id { if val, ok := connections[r.callHash]; ok && val.id == r.id {
close(val.channel) close(val.channel)
delete(connections, r.callHash) delete(connections, r.callHash)
r.closed = true
} }
} }
@ -160,22 +162,25 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
} }
// Create a new RTCPeerConnection // Create a new RTCPeerConnection
peerConnection, err := api.NewPeerConnection(peerConnectionConfig) mainPeerConnection, err := api.NewPeerConnection(peerConnectionConfig)
if err != nil { if err != nil {
log.Println("Error: could not create peer connection!", err) log.Println("Error: could not create peer connection!", err)
askForClose(r) askForClose(r)
return return
} }
// Close peer connection
defer mainPeerConnection.Close()
// Allow us to receive 1 audio & 1 video track // Allow us to receive 1 audio & 1 video track
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
log.Println("Error: could not prepare to receive video track!", err) log.Println("Error: could not prepare to receive video track!", err)
askForClose(r) askForClose(r)
return return
} }
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, if _, err = mainPeerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
log.Println("Error: could not prepare to receive audio track!", err) log.Println("Error: could not prepare to receive audio track!", err)
askForClose(r) askForClose(r)
@ -186,7 +191,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
// Set a handler for when a new remote track starts, this just distributes all our packets // Set a handler for when a new remote track starts, this just distributes all our packets
// to connected peers // to connected peers
peerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { mainPeerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it // This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it
@ -194,14 +199,19 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
rtcpPLIInterval := time.Second * 3 rtcpPLIInterval := time.Second * 3
ticker := time.NewTicker(rtcpPLIInterval) ticker := time.NewTicker(rtcpPLIInterval)
for range ticker.C { for range ticker.C {
if rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil {
fmt.Println(rtcpSendErr) if r.closed {
return
}
if rtcpSendErr := mainPeerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}}); rtcpSendErr != nil {
fmt.Println("Write RTCP error:", rtcpSendErr)
} }
} }
}() }()
// Create a local track, all our SFU clients will be fed via this track // Create a local track, all our SFU clients will be fed via this track
localTrack, newTrackErr := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label()) localTrack, newTrackErr := mainPeerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), remoteTrack.ID(), remoteTrack.Label())
if newTrackErr != nil { if newTrackErr != nil {
log.Println("New track error!", err) log.Println("New track error!", err)
askForClose(r) askForClose(r)
@ -217,10 +227,9 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
if readErr != nil { if readErr != nil {
// Could not read from remote track // Could not read from remote track
log.Println("Read error!", err) log.Println("Read error!", readErr)
askForClose(r) askForClose(r)
break break
} }
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet
@ -233,7 +242,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
}) })
// Set the remote SessionDescription // Set the remote SessionDescription
err = peerConnection.SetRemoteDescription(mainOffer.offer) err = mainPeerConnection.SetRemoteDescription(mainOffer.offer)
if err != nil { if err != nil {
log.Println("Set remote description error!", err) log.Println("Set remote description error!", err)
askForClose(r) askForClose(r)
@ -241,7 +250,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
} }
// Create answer // Create answer
answer, err := peerConnection.CreateAnswer(nil) answer, err := mainPeerConnection.CreateAnswer(nil)
if err != nil { if err != nil {
log.Println("Create answer error!", err) log.Println("Create answer error!", err)
askForClose(r) askForClose(r)
@ -249,7 +258,7 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
} }
// Sets the LocalDescription, and starts our UDP listeners // Sets the LocalDescription, and starts our UDP listeners
err = peerConnection.SetLocalDescription(answer) err = mainPeerConnection.SetLocalDescription(answer)
if err != nil { if err != nil {
log.Println("Set local description error!", err) log.Println("Set local description error!", err)
askForClose(r) askForClose(r)
@ -257,12 +266,122 @@ func newCall(mainOffer receivedSignal, r activeRelay) {
} }
// Forward ice candidates // Forward ice candidates
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { mainPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
if c != nil { if c != nil {
sendSignal(mainOffer.callHash, mainOffer.peerID, c.ToJSON()) sendSignal(mainOffer.callHash, mainOffer.peerID, c.ToJSON())
} }
}) })
// Send anwser // Send anwser
if !r.closed {
sendSignal(mainOffer.callHash, mainOffer.peerID, answer) sendSignal(mainOffer.callHash, mainOffer.peerID, answer)
} }
// Keep a list of active tracks
localTracks := make([]*webrtc.Track, 0, 2)
clients := make(map[string]*webrtc.PeerConnection, 1)
// Close all clients connections at the end
defer func() {
for _, v := range clients {
v.Close()
}
}()
for {
// Receive new channels
stopCheck := len(localTracks) >= 2 // Stop check if we got all the channel
for !stopCheck {
select {
case t := <-localTrackChan:
localTracks = append(localTracks, t)
case <-time.After(time.Millisecond * 100):
stopCheck = true
}
}
newMessage, ok := <-r.channel
if !ok {
log.Printf("Channel closed: call hash: %s / call id: %d", r.callHash, r.id)
return
}
// Check if we are creating a new connection
if newMessage.sigType == SDP {
// Close any previous connection of this client
if val, ok := clients[newMessage.peerID]; ok {
val.Close()
}
// Create new peer connection
newPeerConnection, err := api.NewPeerConnection(peerConnectionConfig)
if err != nil {
log.Printf("Error creating new remote peer connection: %s", err)
continue
}
clients[newMessage.peerID] = newPeerConnection
// Add tracks
for _, value := range localTracks {
_, err = newPeerConnection.AddTrack(value)
if err != nil {
log.Printf("Error adding a track: %s", err)
continue
}
}
// Set remote description
err = newPeerConnection.SetRemoteDescription(newMessage.offer)
if err != nil {
log.Printf("Could not set remote description (remote peer): %s", err)
continue
}
// Create the answer
answer, err := newPeerConnection.CreateAnswer(nil)
if err != nil {
log.Printf("Could not create answer: %s!", err)
continue
}
err = newPeerConnection.SetLocalDescription(answer)
if err != nil {
log.Printf("Could not set local description: %s!", err)
continue
}
// Send ice candidates
newPeerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
if c != nil {
sendSignal(r.callHash, newMessage.peerID, c.ToJSON())
}
})
// Send answer
sendSignal(r.callHash, newMessage.peerID, answer)
} else if newMessage.sigType == CANDIDATE {
if newMessage.peerID == "0" {
// Ice candidate for main peer
if err := mainPeerConnection.AddICECandidate(newMessage.candidate); err != nil {
log.Printf("Err Adding ICECandidate to main peer: %s", err)
}
} else if val, ok := clients[newMessage.peerID]; ok {
// Ice candidate for remote peer
if err := val.AddICECandidate(newMessage.candidate); err != nil {
log.Printf("Err adding ice candidate for remote peer: %s", err)
}
} else {
log.Printf("Err tried to add ICE Candidate for non ready peer connection!")
}
}
}
}

2
ws.go
View File

@ -64,7 +64,6 @@ func openWs(conf *Config) {
} }
// Process incoming messages // Process incoming messages
log.Printf("recv: %s", message)
// Decode JSON // Decode JSON
var msg WsMessage var msg WsMessage
@ -85,6 +84,7 @@ func openWs(conf *Config) {
default: default:
println("Received unkown message type!") println("Received unkown message type!")
log.Printf("recv: %s", message)
break break
} }
} }