1717
1818import { _FirebaseInstallationsInternal } from '@firebase/installations' ;
1919import { Logger } from '@firebase/logger' ;
20- import { ConfigUpdateObserver } from '../public_types' ;
20+ import {
21+ ConfigUpdate ,
22+ ConfigUpdateObserver ,
23+ FetchResponse ,
24+ FirebaseRemoteConfigObject
25+ } from '../public_types' ;
2126import { calculateBackoffMillis , FirebaseError } from '@firebase/util' ;
2227import { ERROR_FACTORY , ErrorCode } from '../errors' ;
2328import { Storage } from '../storage/storage' ;
2429import { VisibilityMonitor } from './visibility_monitor' ;
30+ import { StorageCache } from '../storage/storage_cache' ;
31+ import {
32+ FetchRequest ,
33+ RemoteConfigAbortSignal
34+ } from './remote_config_fetch_client' ;
35+ import { RestClient } from './rest_client' ;
2536
2637const API_KEY_HEADER = 'X-Goog-Api-Key' ;
2738const INSTALLATIONS_AUTH_TOKEN_HEADER = 'X-Goog-Firebase-Installations-Auth' ;
2839const ORIGINAL_RETRIES = 8 ;
40+ const MAXIMUM_FETCH_ATTEMPTS = 3 ;
2941const NO_BACKOFF_TIME_IN_MILLIS = - 1 ;
3042const NO_FAILED_REALTIME_STREAMS = 0 ;
43+ const REALTIME_DISABLED_KEY = 'featureDisabled' ;
44+ const REALTIME_RETRY_INTERVAL = 'retryIntervalSeconds' ;
45+ const TEMPLATE_VERSION_KEY = 'latestTemplateVersionNumber' ;
3146
3247export class RealtimeHandler {
3348 constructor (
@@ -38,7 +53,9 @@ export class RealtimeHandler {
3853 private readonly projectId : string ,
3954 private readonly apiKey : string ,
4055 private readonly appId : string ,
41- private readonly logger : Logger
56+ private readonly logger : Logger ,
57+ private readonly storageCache : StorageCache ,
58+ private readonly restClient : RestClient
4259 ) {
4360 void this . setRetriesRemaining ( ) ;
4461 void VisibilityMonitor . getInstance ( ) . on (
@@ -56,6 +73,7 @@ export class RealtimeHandler {
5673 private reader : ReadableStreamDefaultReader | undefined ;
5774 private httpRetriesRemaining : number = ORIGINAL_RETRIES ;
5875 private isInBackground : boolean = false ;
76+ private readonly decoder = new TextDecoder ( 'utf-8' ) ;
5977
6078 private async setRetriesRemaining ( ) : Promise < void > {
6179 // Retrieve number of remaining retries from last session. The minimum retry count being one.
@@ -90,6 +108,21 @@ export class RealtimeHandler {
90108 } ) ;
91109 }
92110
111+ private async updateBackoffMetadataWithRetryInterval (
112+ retryIntervalSeconds : number
113+ ) : Promise < void > {
114+ const currentTime = Date . now ( ) ;
115+ const backoffDurationInMillis = retryIntervalSeconds * 1000 ;
116+ const backoffEndTime = new Date ( currentTime + backoffDurationInMillis ) ;
117+ const numFailedStreams =
118+ ( await this . storage . getRealtimeBackoffMetadata ( ) ) ?. numFailedStreams || 0 ;
119+ await this . storage . setRealtimeBackoffMetadata ( {
120+ backoffEndTimeMillis : backoffEndTime ,
121+ numFailedStreams
122+ } ) ;
123+ this . retryHttpConnectionWhenBackoffEnds ( ) ;
124+ }
125+
93126 /**
94127 * HTTP status code that the Realtime client should retry on.
95128 */
@@ -229,6 +262,276 @@ export class RealtimeHandler {
229262 return canMakeConnection ;
230263 }
231264
265+ private fetchResponseIsUpToDate (
266+ fetchResponse : FetchResponse ,
267+ lastKnownVersion : number
268+ ) : boolean {
269+ if ( fetchResponse . config != null && fetchResponse . templateVersion ) {
270+ return fetchResponse . templateVersion >= lastKnownVersion ;
271+ }
272+ return false ;
273+ }
274+
275+ private parseAndValidateConfigUpdateMessage ( message : string ) : string {
276+ const left = message . indexOf ( '{' ) ;
277+ const right = message . indexOf ( '}' , left ) ;
278+
279+ if ( left < 0 || right < 0 ) {
280+ return '' ;
281+ }
282+ return left >= right ? '' : message . substring ( left , right + 1 ) ;
283+ }
284+
285+ private isEventListenersEmpty ( ) : boolean {
286+ return this . observers . size === 0 ;
287+ }
288+
289+ private getRandomInt ( max : number ) : number {
290+ return Math . floor ( Math . random ( ) * max ) ;
291+ }
292+
293+ private executeAllListenerCallbacks ( configUpdate : ConfigUpdate ) : void {
294+ this . observers . forEach ( observer => observer . next ( configUpdate ) ) ;
295+ }
296+
297+ private getChangedParams (
298+ newConfig : FirebaseRemoteConfigObject ,
299+ oldConfig : FirebaseRemoteConfigObject
300+ ) : Set < string > {
301+ const changed = new Set < string > ( ) ;
302+ const newKeys = new Set ( Object . keys ( newConfig || { } ) ) ;
303+ const oldKeys = new Set ( Object . keys ( oldConfig || { } ) ) ;
304+
305+ for ( const key of newKeys ) {
306+ if ( ! oldKeys . has ( key ) ) {
307+ changed . add ( key ) ;
308+ continue ;
309+ }
310+ if (
311+ JSON . stringify ( ( newConfig as any ) [ key ] ) !==
312+ JSON . stringify ( ( oldConfig as any ) [ key ] )
313+ ) {
314+ changed . add ( key ) ;
315+ continue ;
316+ }
317+ }
318+
319+ for ( const key of oldKeys ) {
320+ if ( ! newKeys . has ( key ) ) {
321+ changed . add ( key ) ;
322+ }
323+ }
324+ return changed ;
325+ }
326+
327+ private async fetchLatestConfig (
328+ remainingAttempts : number ,
329+ targetVersion : number
330+ ) : Promise < void > {
331+ const remainingAttemptsAfterFetch = remainingAttempts - 1 ;
332+ const currentAttempt = MAXIMUM_FETCH_ATTEMPTS - remainingAttemptsAfterFetch ;
333+ const customSignals = this . storageCache . getCustomSignals ( ) ;
334+ if ( customSignals ) {
335+ this . logger . debug (
336+ `Fetching config with custom signals: ${ JSON . stringify ( customSignals ) } `
337+ ) ;
338+ }
339+ try {
340+ const fetchRequest : FetchRequest = {
341+ cacheMaxAgeMillis : 0 ,
342+ signal : new RemoteConfigAbortSignal ( ) ,
343+ customSignals : customSignals ,
344+ fetchType : 'REALTIME' ,
345+ fetchAttempt : currentAttempt
346+ } ;
347+
348+ const fetchResponse : FetchResponse = await this . restClient . fetch (
349+ fetchRequest
350+ ) ;
351+ let activatedConfigs = await this . storage . getActiveConfig ( ) ;
352+
353+ if ( ! this . fetchResponseIsUpToDate ( fetchResponse , targetVersion ) ) {
354+ this . logger . debug (
355+ "Fetched template version is the same as SDK's current version." +
356+ ' Retrying fetch.'
357+ ) ;
358+ // Continue fetching until template version number is greater than current.
359+ await this . autoFetch ( remainingAttemptsAfterFetch , targetVersion ) ;
360+ return ;
361+ }
362+
363+ if ( fetchResponse . config == null ) {
364+ this . logger . debug (
365+ 'The fetch succeeded, but the backend had no updates.'
366+ ) ;
367+ return ;
368+ }
369+
370+ if ( activatedConfigs == null ) {
371+ activatedConfigs = { } ;
372+ }
373+
374+ const updatedKeys = this . getChangedParams (
375+ fetchResponse . config ,
376+ activatedConfigs
377+ ) ;
378+
379+ if ( updatedKeys . size === 0 ) {
380+ this . logger . debug ( 'Config was fetched, but no params changed.' ) ;
381+ return ;
382+ }
383+
384+ const configUpdate : ConfigUpdate = {
385+ getUpdatedKeys ( ) : Set < string > {
386+ return new Set ( updatedKeys ) ;
387+ }
388+ } ;
389+ this . executeAllListenerCallbacks ( configUpdate ) ;
390+ } catch ( e : unknown ) {
391+ const errorMessage = e instanceof Error ? e . message : String ( e ) ;
392+ const error = ERROR_FACTORY . create ( ErrorCode . CONFIG_UPDATE_NOT_FETCHED , {
393+ originalErrorMessage : `Failed to auto-fetch config update: ${ errorMessage } `
394+ } ) ;
395+ this . propagateError ( error ) ;
396+ }
397+ }
398+
399+ private async autoFetch (
400+ remainingAttempts : number ,
401+ targetVersion : number
402+ ) : Promise < void > {
403+ if ( remainingAttempts === 0 ) {
404+ const error = ERROR_FACTORY . create ( ErrorCode . CONFIG_UPDATE_NOT_FETCHED , {
405+ originalErrorMessage :
406+ 'Unable to fetch the latest version of the template.'
407+ } ) ;
408+ this . propagateError ( error ) ;
409+ return ;
410+ }
411+
412+ const timeTillFetch = this . getRandomInt ( 4 ) ;
413+ setTimeout ( async ( ) => {
414+ await this . fetchLatestConfig ( remainingAttempts , targetVersion ) ;
415+ } , timeTillFetch ) ;
416+ }
417+
418+ private async handleNotifications (
419+ reader : ReadableStreamDefaultReader
420+ ) : Promise < void > {
421+ if ( reader == null ) {
422+ return ;
423+ }
424+
425+ let partialConfigUpdateMessage : string ;
426+ let currentConfigUpdateMessage = '' ;
427+
428+ while ( true ) {
429+ const { done, value } = await reader . read ( ) ;
430+ if ( done ) {
431+ break ;
432+ }
433+
434+ partialConfigUpdateMessage = this . decoder . decode ( value , { stream : true } ) ;
435+ currentConfigUpdateMessage += partialConfigUpdateMessage ;
436+
437+ if ( partialConfigUpdateMessage . includes ( '}' ) ) {
438+ currentConfigUpdateMessage = this . parseAndValidateConfigUpdateMessage (
439+ currentConfigUpdateMessage
440+ ) ;
441+
442+ if ( currentConfigUpdateMessage . length === 0 ) {
443+ continue ;
444+ }
445+
446+ try {
447+ const jsonObject = JSON . parse ( currentConfigUpdateMessage ) ;
448+
449+ if ( this . isEventListenersEmpty ( ) ) {
450+ break ;
451+ }
452+
453+ if (
454+ REALTIME_DISABLED_KEY in jsonObject &&
455+ jsonObject [ REALTIME_DISABLED_KEY ] === true
456+ ) {
457+ const error = ERROR_FACTORY . create (
458+ ErrorCode . CONFIG_UPDATE_UNAVAILABLE ,
459+ {
460+ originalErrorMessage :
461+ 'The server is temporarily unavailable. Try again in a few minutes.'
462+ }
463+ ) ;
464+ this . propagateError ( error ) ;
465+ break ;
466+ }
467+
468+ if ( TEMPLATE_VERSION_KEY in jsonObject ) {
469+ const oldTemplateVersion =
470+ await this . storage . getLastKnownTemplateVersion ( ) ;
471+ let targetTemplateVersion = Number (
472+ jsonObject [ TEMPLATE_VERSION_KEY ]
473+ ) ;
474+ if (
475+ oldTemplateVersion &&
476+ targetTemplateVersion > oldTemplateVersion
477+ ) {
478+ await this . autoFetch (
479+ MAXIMUM_FETCH_ATTEMPTS ,
480+ targetTemplateVersion
481+ ) ;
482+ }
483+ }
484+
485+ // This field in the response indicates that the realtime request should retry after the
486+ // specified interval to establish a long-lived connection. This interval extends the
487+ // backoff duration without affecting the number of retries, so it will not enter an
488+ // exponential backoff state.
489+ if ( REALTIME_RETRY_INTERVAL in jsonObject ) {
490+ const retryIntervalSeconds = Number (
491+ jsonObject [ REALTIME_RETRY_INTERVAL ]
492+ ) ;
493+ await this . updateBackoffMetadataWithRetryInterval (
494+ retryIntervalSeconds
495+ ) ;
496+ }
497+ } catch ( e : any ) {
498+ this . logger . error ( 'Unable to parse latest config update message.' , e ) ;
499+ this . propagateError (
500+ ERROR_FACTORY . create ( ErrorCode . CONFIG_UPDATE_MESSAGE_INVALID , {
501+ originalErrorMessage : e
502+ } )
503+ ) ;
504+ }
505+ currentConfigUpdateMessage = '' ;
506+ }
507+ }
508+ }
509+
510+ public async listenForNotifications (
511+ reader : ReadableStreamDefaultReader
512+ ) : Promise < void > {
513+ try {
514+ await this . handleNotifications ( reader ) ;
515+ } catch ( e ) {
516+ // If the real-time connection is at an unexpected lifecycle state when the app is
517+ // backgrounded, it's expected closing the connection and will throw an exception.
518+ if ( ! this . isInBackground ) {
519+ // Otherwise, the real-time server connection was closed due to a transient issue.
520+ this . logger . debug (
521+ 'Real-time connection was closed due to an exception.' ,
522+ e
523+ ) ;
524+ }
525+ } finally {
526+ // Only need to close the reader, beginRealtimeHttpStream will disconnect
527+ // the connection
528+ if ( this . reader ) {
529+ this . reader . cancel ( ) ;
530+ this . reader = undefined ;
531+ }
532+ }
533+ }
534+
232535 /**
233536 * Open the real-time connection, begin listening for updates, and auto-fetch when an update is
234537 * received.
@@ -263,8 +566,9 @@ export class RealtimeHandler {
263566 if ( response . ok && response . body ) {
264567 this . resetRetryCount ( ) ;
265568 await this . resetRealtimeBackoff ( ) ;
266- //const configAutoFetch = this.startAutoFetch(reader);
267- //await configAutoFetch.listenForNotifications();
569+ const reader = response . body . getReader ( ) ;
570+ // Start listening for realtime notifications.
571+ await this . listenForNotifications ( reader ) ;
268572 }
269573 } catch ( error ) {
270574 if ( this . isInBackground ) {
0 commit comments