Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .changeset/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"commit": false,
"linked": [],
"access": "restricted",
"baseBranch": "main",
"baseBranch": "main_video_v3",
"updateInternalDependencies": "patch",
"ignore": []
}
5 changes: 5 additions & 0 deletions .changeset/swift-moles-arrive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@signalwire/webrtc': patch
---

FIXED inbound calls negotiation
5 changes: 5 additions & 0 deletions .changeset/tender-falcons-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@signalwire/webrtc': patch
---

refactored the early invites implementation to make it more stable.
190 changes: 52 additions & 138 deletions packages/webrtc/src/RTCPeer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
} from './utils'
import { watchRTCPeerMediaPackets } from './utils/watchRTCPeerMediaPackets'
import { connectionPoolManager } from './connectionPoolManager'

const RESUME_TIMEOUT = 12_000

export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
Expand All @@ -28,18 +29,15 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
public instance: RTCPeerConnection

private _iceTimeout: any
private _iceGatheringTimeout: any
private _negotiating = false
private _processingRemoteSDP = false
private _restartingIce = false
private _watchMediaPacketsTimer: ReturnType<typeof setTimeout>
private _connectionStateTimer: ReturnType<typeof setTimeout>
private _resumeTimer?: ReturnType<typeof setTimeout>
private _mediaWatcher: ReturnType<typeof watchRTCPeerMediaPackets>
private _candidatesSnapshot: RTCIceCandidate[] = []
private _allCandidates: RTCIceCandidate[] = []
private _processingLocalSDP = false
private _waitNegotiation: Promise<void> = Promise.resolve()
private _waitNegotiationCompleter: () => void
/**
* Both of these properties are used to have granular
* control over when to `resolve` and when `reject` the
Expand Down Expand Up @@ -202,11 +200,9 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
private _negotiationCompleted(error?: unknown) {
if (!error) {
this._resolveStartMethod()
this._waitNegotiationCompleter?.()
this._pendingNegotiationPromise?.resolve()
} else {
this._rejectStartMethod(error)
this._waitNegotiationCompleter?.()
this._pendingNegotiationPromise?.reject(error)
}
}
Expand Down Expand Up @@ -455,6 +451,7 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
return this.logger.warn('Skip twice onnegotiationneeded!')
}
this._negotiating = true

try {
/**
* additionalDevice and screenShare are `sendonly`
Expand Down Expand Up @@ -507,11 +504,6 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}

this.logger.info('iceGatheringState', this.instance.iceGatheringState)
if (this.instance.iceGatheringState === 'gathering') {
this._iceTimeout = setTimeout(() => {
this._onIceTimeout()
}, this.options.maxIceGatheringTimeout)
}
} catch (error) {
this.logger.error(`Error creating ${this.type}:`, error)
this._negotiationCompleted(error)
Expand Down Expand Up @@ -921,20 +913,13 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}

try {
const isAllowedToSendLocalSDP = await this._isAllowedToSendLocalSDP()
if (!isAllowedToSendLocalSDP) {
this.logger.info('Skipping onLocalSDPReady due to early invite')
this._processingLocalSDP = false
return
}

this._waitNegotiation = new Promise((resolve) => {
this._waitNegotiationCompleter = resolve
})

await this.call.onLocalSDPReady(this)
this._processingLocalSDP = false
if (this.isAnswer) {
this.logger.debug('Setting negotiating false for inbound calls')
this._negotiating = false
this._restartingIce = false
this.resetNeedResume()
this._negotiationCompleted()
}
} catch (error) {
Expand All @@ -943,25 +928,6 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}
}

/**
* Waits for the pending negotiation promise to resolve
* and checks if the current signaling state allows to send a local SDP.
* This is used to prevent sending an offer when the signaling state is not appropriate.
* or when still waiting for a previous negotiation to complete.
*/
private async _isAllowedToSendLocalSDP() {
await this._waitNegotiation

// Check if signalingState have the right state to sand an offer
return (
(this.type === 'offer' &&
['have-local-offer', 'have-local-pranswer'].includes(
this.instance.signalingState
)) ||
(this.type === 'answer' && this.instance.signalingState === 'stable')
)
}

private _sdpIsValid() {
if (this.localSdp && this.hasIceServers) {
return sdpHasValidCandidates(this.localSdp)
Expand All @@ -977,6 +943,8 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}

private _onIceTimeout() {
this.instance.removeEventListener('icecandidate', this._onIce)

if (this._sdpIsValid()) {
this._sdpReady()
return
Expand All @@ -1002,94 +970,29 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}

private _onIce(event: RTCPeerConnectionIceEvent) {
/**
* Clear _iceTimeout on each single candidate
*/
// Clear _iceTimeout on each single candidate
if (this._iceTimeout) {
clearTimeout(this._iceTimeout)
}

// Add new _newTimeout for next candidate
this._iceTimeout = setTimeout(() => {
this._onIceTimeout()
}, this.options.iceGatheringTimeout)

/**
* Following spec: no candidate means the gathering is completed.
*/
if (!event.candidate) {
this.instance.removeEventListener('icecandidate', this._onIce)
// not call _sdpReady if an early invite has been sent
if (this._candidatesSnapshot.length > 0) {
this.logger.debug('No more candidates, calling _sdpReady')
this._sdpReady()
}
clearTimeout(this._iceTimeout)
this.logger.debug('No more candidates, calling _sdpReady')
this._sdpReady()

return
}

// Store all candidates
this._allCandidates.push(event.candidate)

this.logger.debug('RTCPeer Candidate:', event.candidate)
if (event.candidate.type === 'host') {
/**
* With `host` candidate set timeout to
* maxIceGatheringTimeout and then invoke
* _onIceTimeout to check if the SDP is valid
*/
this._iceTimeout = setTimeout(() => {
this.instance.removeEventListener('icecandidate', this._onIce)
this._onIceTimeout()
}, this.options.maxIceGatheringTimeout)
} else {
/**
* With non-HOST candidate (srflx, prflx or relay), check if we have
* candidates for all media sections to support early invite
*/
if (this.instance.localDescription?.sdp) {
if (sdpHasValidCandidates(this.instance.localDescription.sdp)) {
// Take a snapshot of candidates at this point
if (this._candidatesSnapshot.length === 0 && this.type === 'offer') {
this._candidatesSnapshot = [...this._allCandidates]
this.logger.info(
'SDP has candidates for all media sections, calling _sdpReady for early invite'
)
setTimeout(() => this._sdpReady(), 0) // Defer to allow any pending operations to complete
}
} else {
this.logger.info(
'SDP does not have candidates for all media sections, waiting for more candidates'
)
this.logger.debug(this.instance.localDescription?.sdp)
}
}
}
}

private _retryWithMoreCandidates() {
// Check if we have better candidates now than when we first sent SDP
const hasMoreCandidates = this._hasMoreCandidates()

if (hasMoreCandidates && this.instance.connectionState !== 'connected') {
this.logger.info(
'More candidates found after ICE gathering complete, triggering renegotiation'
)
// Reset negotiation state to allow new negotiation
this._negotiating = false
this._candidatesSnapshot = []
this._allCandidates = []

// set the SDP type to 'offer' since the client is initiating a new negotiation
this.type = 'offer'
// Start negotiation with force=true
if (this.instance.signalingState === 'stable') {
this.startNegotiation(true)
} else {
this.logger.warn(
'Signaling state is not stable, cannot start negotiation immediately'
)
this.restartIce()
}
}
}

private _hasMoreCandidates(): boolean {
return this._allCandidates.length > this._candidatesSnapshot.length
}

private _setLocalDescription(localDescription: RTCSessionDescriptionInit) {
Expand All @@ -1115,12 +1018,6 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
googleStartBitrate
)
}
// this.logger.debug(
// 'LOCAL SDP \n',
// `Type: ${localDescription.type}`,
// '\n\n',
// localDescription.sdp
// )
return this.instance.setLocalDescription(localDescription)
}

Expand Down Expand Up @@ -1161,9 +1058,14 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
case 'stable':
// Workaround to skip nested negotiations
// Chrome bug: https://bugs.chromium.org/p/chromium/issues/detail?id=740501
this._negotiating = false
this._restartingIce = false
this.resetNeedResume()

if (this.isOffer) {
// only when it's an offer that means the negotiation is done
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this comment (or better, on the reasons for this comment)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it's an answer, the signalingState is stable earlier on the client side in comparison with the server side, which only has the one SDP. In some cases, this leaves room for the client side to fire onnegotiationeed events since the media is not connecting. That is why for incoming calls they need to keep the negotiating flag true until we have a successful response from the verto.answer.

this.logger.debug('Setting negotiating false for outbound calls')
this._negotiating = false
this._restartingIce = false
this.resetNeedResume()
}

if (this.instance.connectionState === 'connected') {
// An ice restart won't change the connectionState so we emit the same event in here
Expand Down Expand Up @@ -1194,14 +1096,6 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
// case 'new':
// break
case 'connecting':
this._connectionStateTimer = setTimeout(() => {
this.logger.warn('connectionState timed out')
if (this._hasMoreCandidates()) {
this._retryWithMoreCandidates()
} else {
this.restartIceWithRelayOnly()
}
}, this.options.maxConnectionStateTimeout)
break
case 'connected':
this.clearConnectionStateTimer()
Expand Down Expand Up @@ -1230,18 +1124,34 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {

this.instance.addEventListener('icegatheringstatechange', () => {
this.logger.debug('iceGatheringState:', this.instance.iceGatheringState)
if (this.instance.iceGatheringState === 'complete') {
this.logger.debug('ICE gathering complete')
void this._sdpReady()
switch (this.instance.iceGatheringState) {
case 'gathering':
this._iceGatheringTimeout = setTimeout(() => {
this._onIceTimeout()
}, this.options.maxIceGatheringTimeout)
break
case 'complete':
this.clearIceGatheringTimer()

// start connectionState timer after the gathering is complete
this._connectionStateTimer = setTimeout(() => {
this.logger.warn('connectionState timed out')
this.restartIceWithRelayOnly()
}, this.options.maxConnectionStateTimeout)

this.logger.debug('ICE gathering complete')
void this._sdpReady()
break
}
})

// this.instance.addEventListener('icecandidateerror', (event) => {
// this.logger.warn('IceCandidate Error:', event)
// this.clearTimers()
// this._forceNegotiation()
// })

this.instance.addEventListener('track', (event: RTCTrackEvent) => {
this.logger.debug('Track event:', event, event.track.kind)
// @ts-expect-error
this.call.emit('track', event)

Expand All @@ -1266,9 +1176,14 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
private clearTimers() {
this.clearResumeTimer()
this.clearWatchMediaPacketsTimer()
this.clearIceGatheringTimer()
this.clearConnectionStateTimer()
}

clearIceGatheringTimer() {
clearTimeout(this._iceGatheringTimeout)
}

private clearConnectionStateTimer() {
clearTimeout(this._connectionStateTimer)
}
Expand All @@ -1283,7 +1198,6 @@ export default class RTCPeer<EventTypes extends EventEmitter.ValidEventTypes> {
}

private emitMediaConnected() {
this.logger.debug('Emitting media.connected event')
this.call.emit('media.connected')
}

Expand Down
5 changes: 0 additions & 5 deletions packages/webrtc/src/RTCPeerConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ export class RTCPeerConnectionManager {
return null
}


/**
* Clean up the manager and all connections
*/
Expand Down Expand Up @@ -185,10 +184,6 @@ export class RTCPeerConnectionManager {
}

this.logger.debug(`Pooled connection ${id} created successfully`)
this.logger.debug(
`ICE candidates gathered for connection ${id}:`,
pc.localDescription?.sdp
)
return pooledConnection
} catch (error) {
this.logger.error('Failed to create pooled connection:', error)
Expand Down
Loading