diff --git a/packages/@webex/plugin-meetings/src/multistream/mediaRequestManager.ts b/packages/@webex/plugin-meetings/src/multistream/mediaRequestManager.ts index 788892cf008..d8a2c8ae857 100644 --- a/packages/@webex/plugin-meetings/src/multistream/mediaRequestManager.ts +++ b/packages/@webex/plugin-meetings/src/multistream/mediaRequestManager.ts @@ -10,7 +10,7 @@ import { RecommendedOpusBitrates, NamedMediaGroup, } from '@webex/internal-media-core'; -import {cloneDeepWith, debounce, isEmpty} from 'lodash'; +import {cloneDeepWith, debounce, isEmpty, max, throttle} from 'lodash'; import LoggerProxy from '../common/logs/logger-proxy'; @@ -63,6 +63,7 @@ const CODEC_DEFAULTS = { }; const DEBOUNCED_SOURCE_UPDATE_TIME = 1000; +const THROTTLED_SEND_REQUESTS_TIME = 2000; // used for sendRequests() calls triggered by Homer updates type DegradationPreferences = { maxMacroblocksLimit: number; @@ -93,6 +94,7 @@ export class MediaRequestManager { private sourceUpdateListener: () => void; private debouncedSourceUpdateListener: () => void; + private throttledSendRequests: () => void; private previousStreamRequests: Array = []; @@ -114,6 +116,10 @@ export class MediaRequestManager { this.sourceUpdateListener, DEBOUNCED_SOURCE_UPDATE_TIME ); + this.throttledSendRequests = throttle( + this.sendRequests.bind(this), + THROTTLED_SEND_REQUESTS_TIME + ); } public setDegradationPreferences(degradationPreferences: DegradationPreferences) { @@ -398,7 +404,7 @@ export class MediaRequestManager { mediaRequest.handleMaxFs = eventHandler; mediaRequest.receiveSlots.forEach((rs) => { - rs.on(ReceiveSlotEvents.SourceUpdate, this.sourceUpdateListener); + rs.on(ReceiveSlotEvents.SourceUpdate, this.throttledSendRequests); rs.on(ReceiveSlotEvents.MaxFsUpdate, mediaRequest.handleMaxFs); }); @@ -413,7 +419,7 @@ export class MediaRequestManager { const mediaRequest = this.clientRequests[requestId]; mediaRequest?.receiveSlots.forEach((rs) => { - rs.off(ReceiveSlotEvents.SourceUpdate, this.sourceUpdateListener); + rs.off(ReceiveSlotEvents.SourceUpdate, this.throttledSendRequests); rs.off(ReceiveSlotEvents.MaxFsUpdate, mediaRequest.handleMaxFs); }); @@ -438,6 +444,6 @@ export class MediaRequestManager { this.numTotalSources = numTotalSources; this.numLiveSources = numLiveSources; - this.sendRequests(); + this.throttledSendRequests(); } } diff --git a/packages/@webex/plugin-meetings/test/unit/spec/multistream/mediaRequestManager.ts b/packages/@webex/plugin-meetings/test/unit/spec/multistream/mediaRequestManager.ts index ede46c14585..27a5a895c0f 100644 --- a/packages/@webex/plugin-meetings/test/unit/spec/multistream/mediaRequestManager.ts +++ b/packages/@webex/plugin-meetings/test/unit/spec/multistream/mediaRequestManager.ts @@ -1118,8 +1118,148 @@ describe('MediaRequestManager', () => { }); }); + describe('throttling of calls to sendMediaRequests() caused by notifications from Homer', () => { + let clock; + const sourceUpdateHandlers = []; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + mediaRequestManager = new MediaRequestManager(sendMediaRequestsCallback, { + degradationPreferences, + kind: 'video', + trimRequestsToNumOfSources: true, + }); + + sourceUpdateHandlers.length = 0; + + fakeReceiveSlots.forEach((slot) => { + slot.on.callsFake((eventName, handler) => { + if (eventName === 'sourceUpdate') { + sourceUpdateHandlers.push({handler, slot}); + } + }); + }); + + // add some requests and commit them + addActiveSpeakerRequest( + 255, + [ + fakeReceiveSlots[0], + fakeReceiveSlots[1], + fakeReceiveSlots[2], + fakeReceiveSlots[3], + fakeReceiveSlots[4], + fakeReceiveSlots[4], + ], + MAX_FS_1080p, + false + ); + + mediaRequestManager.setNumCurrentSources(5, 5); + mediaRequestManager.setDegradationPreferences({maxMacroblocksLimit: 8192}); + mediaRequestManager.commit(); + + // advance time to reach a stable state after any initial throttling + clock.tick(9999); + sendMediaRequestsCallback.resetHistory(); + }); + + afterEach(() => { + clock.restore(); + }); + + it('throttles calls to sendMediaRequests() when multiple source updates happen', () => { + // simulate multiple source update changes + sourceUpdateHandlers.forEach(({handler, slot}) => { + slot.sourceState = `avatar`; + handler(); + }); + + // The throttled sendMediaRequests should execute immediately on first call, then throttle subsequent calls + clock.tick(1); + assert.calledOnce(sendMediaRequestsCallback); + + // after 1s simulate more updates -> they should not trigger any more calls to sendRequests() + clock.tick(1000); + sourceUpdateHandlers.forEach(({handler, slot}) => { + slot.sourceState = `live`; + handler(); + }); + clock.tick(1); + // still only 1 call due to throttling + assert.calledOnce(sendMediaRequestsCallback); + + // now advance time by another 1s (past the throttle period) -> this should trigger another call to sendRequests() + clock.tick(1000); + assert.calledTwice(sendMediaRequestsCallback); + + // and no more calls after that + clock.tick(9999); + assert.calledTwice(sendMediaRequestsCallback); + }); + + it('throttles calls to sendMediaRequests() when setNumCurrentSources() is called', () => { + // change number of available streams + mediaRequestManager.setNumCurrentSources(4, 4); + + // The throttled function should execute immediately on first call, then throttle subsequent calls + clock.tick(1); + assert.calledOnce(sendMediaRequestsCallback); + + // after 1s simulate more updates -> they should not trigger any more calls to sendRequests() + clock.tick(1000); + mediaRequestManager.setNumCurrentSources(3, 3); + clock.tick(1); + // still only 1 call due to throttling + assert.calledOnce(sendMediaRequestsCallback); + + // now advance time by another 1s (past the throttle period) -> this should trigger another call to sendRequests() + clock.tick(1000); + assert.calledTwice(sendMediaRequestsCallback); + + // and no more calls after that + clock.tick(9999); + assert.calledTwice(sendMediaRequestsCallback); + }); + + it('throttles calls to sendMediaRequests() when setNumCurrentSources() is called AND source updates happen', () => { + // change number of available streams and simulate source updates + mediaRequestManager.setNumCurrentSources(4, 4); + sourceUpdateHandlers.forEach(({handler, slot}) => { + slot.sourceState = `avatar`; + handler(); + }); + + // The throttled function should execute immediately on first call, then throttle subsequent calls + clock.tick(1); + assert.calledOnce(sendMediaRequestsCallback); + + // after 1s simulate more updates -> they should not trigger any more calls to sendRequests() + clock.tick(1000); + mediaRequestManager.setNumCurrentSources(3, 3); + sourceUpdateHandlers.forEach(({handler, slot}) => { + slot.sourceState = `live`; + handler(); + }); + clock.tick(1); + // still only 1 call due to throttling + assert.calledOnce(sendMediaRequestsCallback); + + // now advance time by another 1s (past the throttle period) -> this should trigger another call to sendRequests() + clock.tick(1000); + assert.calledTwice(sendMediaRequestsCallback); + + // and no more calls after that + clock.tick(9999); + assert.calledTwice(sendMediaRequestsCallback); + }); + }); + describe('trimming of requested receive slots', () => { + let clock; + beforeEach(() => { + clock = sinon.useFakeTimers(); mediaRequestManager = new MediaRequestManager(sendMediaRequestsCallback, { degradationPreferences, kind: 'video', @@ -1127,12 +1267,18 @@ describe('MediaRequestManager', () => { }); }); + afterEach(() => { + clock.restore(); + }); + const limitNumAvailableStreams = (preferLiveVideo, limit) => { if (preferLiveVideo) { mediaRequestManager.setNumCurrentSources(100, limit); } else { mediaRequestManager.setNumCurrentSources(limit, 1); } + // Advance time to trigger the throttled sendRequests + clock.tick(2000); }; [true, false].forEach((preferLiveVideo) =>