diff --git a/lib/shared/config-manager/__mocks__/fetch-retry.ts b/lib/shared/config-manager/__mocks__/fetch-retry.ts deleted file mode 100644 index 6fb1926a0..000000000 --- a/lib/shared/config-manager/__mocks__/fetch-retry.ts +++ /dev/null @@ -1,2 +0,0 @@ -export const fetchWithRetry = (_fetch: unknown): unknown => _fetch -export default fetchWithRetry diff --git a/lib/shared/config-manager/__tests__/environmentConfigManager.spec.ts b/lib/shared/config-manager/__tests__/environmentConfigManager.spec.ts index 380437b9a..47af70c54 100644 --- a/lib/shared/config-manager/__tests__/environmentConfigManager.spec.ts +++ b/lib/shared/config-manager/__tests__/environmentConfigManager.spec.ts @@ -1,16 +1,30 @@ -jest.mock('../src/request') jest.useFakeTimers() jest.spyOn(global, 'setInterval') +const getEnvironmentConfig_mock = jest.fn() +jest.doMock('../src/request', () => ({ + ...jest.requireActual('../src/request'), + getEnvironmentConfig: getEnvironmentConfig_mock, +})) + +const mockEventSourceMethods = { + onmessage: jest.fn(), + onerror: jest.fn(), + onopen: jest.fn(), + close: jest.fn(), +} +const MockEventSource = jest + .fn() + .mockImplementation(() => mockEventSourceMethods) +jest.mock('eventsource', () => MockEventSource) + import { EnvironmentConfigManager } from '../src' import { mocked } from 'jest-mock' import { Response } from 'cross-fetch' import { DVCLogger, DevCycleServerSDKOptions } from '@devcycle/types' -import { getEnvironmentConfig } from '../src/request' import { ResponseError } from '@devcycle/server-request' const setInterval_mock = mocked(setInterval) -const getEnvironmentConfig_mock = mocked(getEnvironmentConfig) const trackSDKConfigEvent_mock = jest.fn() const logger = { error: jest.fn(), @@ -39,20 +53,26 @@ function getConfigManager( describe('EnvironmentConfigManager Unit Tests', () => { beforeEach(() => { - getEnvironmentConfig_mock.mockReset() - setInterval_mock.mockReset() + jest.clearAllMocks() + jest.clearAllTimers() }) - function mockFetchResponse(obj: any): Response { + const lastModifiedDate = new Date() + + function mockFetchResponse( + obj: any, + bodyStr?: string, + headers?: HeadersInit, + ): Response { if (obj.status >= 400) { const error = new ResponseError('') error.status = obj.status throw error } - return new Response('{}', { + return new Response(bodyStr ?? '{}', { status: 200, statusText: '', - headers: {}, + headers: headers ?? {}, config: {}, ...obj, }) @@ -77,37 +97,46 @@ describe('EnvironmentConfigManager Unit Tests', () => { expect.objectContaining({ sdkKey: 'sdkKey', fetchConfigPromise: expect.any(Promise), - pollingIntervalMS: 1000, + currentPollingInterval: 1000, + configPollingIntervalMS: 1000, + sseConfigPollingIntervalMS: 10 * 60 * 1000, requestTimeoutMS: 1000, }), ) envConfig.cleanup() }) - it('should override the configPollingIntervalMS and configPollingTimeoutMS settings', async () => { - getEnvironmentConfig_mock.mockImplementation(async () => - mockFetchResponse({ status: 200 }), - ) - - const envConfig = getConfigManager(logger, 'sdkKey', { - configPollingIntervalMS: 10, - configPollingTimeoutMS: 10000, - }) - await envConfig.fetchConfigPromise - expect(setInterval_mock).toHaveBeenCalledTimes(1) - - await envConfig._fetchConfig() - - expect(envConfig).toEqual( - expect.objectContaining({ - sdkKey: 'sdkKey', - fetchConfigPromise: expect.any(Promise), - pollingIntervalMS: 1000, - requestTimeoutMS: 1000, - }), - ) - envConfig.cleanup() - }) + it( + 'should override the configPollingIntervalMS, configPollingTimeoutMS, ' + + 'and sseConfigPollingIntervalMS settings', + async () => { + getEnvironmentConfig_mock.mockImplementation(async () => + mockFetchResponse({ status: 200 }), + ) + + const envConfig = getConfigManager(logger, 'sdkKey', { + configPollingIntervalMS: 10, + configPollingTimeoutMS: 10000, + sseConfigPollingIntervalMS: 2 * 60 * 1000, + }) + await envConfig.fetchConfigPromise + expect(setInterval_mock).toHaveBeenCalledTimes(1) + + await envConfig._fetchConfig() + + expect(envConfig).toEqual( + expect.objectContaining({ + sdkKey: 'sdkKey', + fetchConfigPromise: expect.any(Promise), + currentPollingInterval: 1000, + configPollingIntervalMS: 1000, + sseConfigPollingIntervalMS: 2 * 60 * 1000, + requestTimeoutMS: 1000, + }), + ) + envConfig.cleanup() + }, + ) it('should call fetch config on the interval period time', async () => { getEnvironmentConfig_mock.mockImplementation(async () => @@ -134,7 +163,9 @@ describe('EnvironmentConfigManager Unit Tests', () => { expect.objectContaining({ sdkKey: 'sdkKey', fetchConfigPromise: expect.any(Promise), - pollingIntervalMS: 1000, + currentPollingInterval: 1000, + configPollingIntervalMS: 1000, + sseConfigPollingIntervalMS: 10 * 60 * 1000, requestTimeoutMS: 1000, }), ) @@ -145,6 +176,7 @@ describe('EnvironmentConfigManager Unit Tests', () => { undefined, undefined, undefined, + false, ) expect(getEnvironmentConfig_mock).toBeCalledTimes(3) }) @@ -167,6 +199,7 @@ describe('EnvironmentConfigManager Unit Tests', () => { }), undefined, undefined, + false, ) }) @@ -194,7 +227,14 @@ describe('EnvironmentConfigManager Unit Tests', () => { it('should use cached config if fetching config fails', async () => { const config = { config: {} } getEnvironmentConfig_mock.mockImplementation(async () => - mockFetchResponse({ status: 200, data: config }), + mockFetchResponse({ + status: 200, + data: config, + headers: { + etag: 'etag-1', + 'last-modified': lastModifiedDate.toISOString(), + }, + }), ) const envConfig = getConfigManager(logger, 'sdkKey', { @@ -203,6 +243,7 @@ describe('EnvironmentConfigManager Unit Tests', () => { }) await envConfig.fetchConfigPromise expect(setInterval_mock).toHaveBeenCalledTimes(1) + expect(envConfig.configEtag).toEqual('etag-1') await envConfig._fetchConfig() getEnvironmentConfig_mock.mockImplementation(async () => @@ -210,8 +251,10 @@ describe('EnvironmentConfigManager Unit Tests', () => { ) await envConfig._fetchConfig() - envConfig.cleanup() + expect(envConfig.configEtag).toEqual('etag-1') expect(getEnvironmentConfig_mock).toBeCalledTimes(3) + + envConfig.cleanup() }) it('should start interval if initial config fails', async () => { @@ -223,4 +266,289 @@ describe('EnvironmentConfigManager Unit Tests', () => { await expect(envConfig.fetchConfigPromise).rejects.toThrow() expect(setInterval_mock).toHaveBeenCalledTimes(1) }) + + it('should skip fetching config if older last modified date is received', async () => { + const config = { config: {} } + getEnvironmentConfig_mock.mockImplementation(async () => + mockFetchResponse({ + status: 200, + data: config, + headers: { + etag: 'etag-1', + 'last-modified': lastModifiedDate.toISOString(), + }, + }), + ) + + const envConfig = getConfigManager(logger, 'sdkKey', { + configPollingIntervalMS: 1000, + configPollingTimeoutMS: 1000, + }) + await envConfig.fetchConfigPromise + await envConfig._fetchConfig() + expect(envConfig.configEtag).toEqual('etag-1') + + getEnvironmentConfig_mock.mockImplementation(async () => + mockFetchResponse({ + status: 200, + data: config, + headers: { + etag: 'etag-2', + 'last-modified': new Date( + lastModifiedDate.getTime() - 10000, + ).toISOString(), + }, + }), + ) + await envConfig._fetchConfig() + expect(envConfig.configEtag).toEqual('etag-1') + expect(getEnvironmentConfig_mock).toBeCalledTimes(3) + + envConfig.cleanup() + }) + + it('should keep updated config if etag and last modified date are newer', async () => { + const config = { config: {} } + getEnvironmentConfig_mock.mockImplementation(async () => + mockFetchResponse({ + status: 200, + data: config, + headers: { + etag: 'etag-1', + 'last-modified': lastModifiedDate.toISOString(), + }, + }), + ) + + const envConfig = getConfigManager(logger, 'sdkKey', { + configPollingIntervalMS: 1000, + configPollingTimeoutMS: 1000, + }) + await envConfig.fetchConfigPromise + await envConfig._fetchConfig() + expect(envConfig.configEtag).toEqual('etag-1') + + getEnvironmentConfig_mock.mockImplementation(async () => + mockFetchResponse({ + status: 200, + data: config, + headers: { + etag: 'etag-2', + 'last-modified': new Date( + lastModifiedDate.getTime() + 10000, + ).toISOString(), + }, + }), + ) + await envConfig._fetchConfig() + expect(envConfig.configEtag).toEqual('etag-2') + expect(getEnvironmentConfig_mock).toBeCalledTimes(3) + + envConfig.cleanup() + }) + + describe('SSE Connection', () => { + const lastModifiedDate = new Date() + const connectToSSE = async (config?: string) => { + getEnvironmentConfig_mock.mockImplementation(async () => + mockFetchResponse( + { + status: 200, + headers: { + etag: 'etag-1', + 'last-modified': lastModifiedDate.toISOString(), + }, + }, + config ?? + JSON.stringify({ + sse: { + path: 'sse-path', + hostname: 'https://sse.devcycle.com', + }, + }), + ), + ) + + const envConfig = getConfigManager(logger, 'sdkKey', { + configPollingIntervalMS: 1000, + configPollingTimeoutMS: 1000, + enableBetaRealTimeUpdates: true, + }) + await envConfig.fetchConfigPromise + return envConfig + } + + it('should call fetch config if SSE connection is established after 10 min', async () => { + const envConfig = await connectToSSE() + mockEventSourceMethods.onopen() + + expect(setInterval_mock).toHaveBeenCalledTimes(2) + expect(getEnvironmentConfig_mock).toBeCalledTimes(1) + expect(MockEventSource).toBeCalledTimes(1) + + jest.advanceTimersByTime(10 * 60 * 1000) + expect(getEnvironmentConfig_mock).toBeCalledTimes(2) + + envConfig.cleanup() + }) + + it('should not connect to SSE if no config.see', async () => { + const envConfig = await connectToSSE('{}') + + expect(setInterval_mock).toHaveBeenCalledTimes(1) + expect(getEnvironmentConfig_mock).toBeCalledTimes(1) + expect(MockEventSource).not.toHaveBeenCalled() + + envConfig.cleanup() + }) + + it('should continue polling and stop SSE if connection fails', async () => { + const envConfig = await connectToSSE() + mockEventSourceMethods.onerror({ status: 401 }) + + expect(setInterval_mock).toHaveBeenCalledTimes(1) + expect(getEnvironmentConfig_mock).toBeCalledTimes(1) + expect(mockEventSourceMethods.close).toBeCalledTimes(1) + + jest.advanceTimersByTime(1000) + expect(setInterval_mock).toHaveBeenCalledTimes(1) + expect(getEnvironmentConfig_mock).toBeCalledTimes(2) + + envConfig.cleanup() + }) + + it('should process SSE messages and fetch new config', async () => { + const envConfig = await connectToSSE() + mockEventSourceMethods.onopen() + expect(getEnvironmentConfig_mock).toBeCalledTimes(1) + + mockEventSourceMethods.onmessage({ + data: JSON.stringify({ + data: JSON.stringify({ + type: 'refetchConfig', + etag: 'etag-2', + lastModified: new Date( + lastModifiedDate.getTime() + 1000, + ).toISOString(), + }), + }), + }) + jest.advanceTimersByTime(1005) + expect(getEnvironmentConfig_mock).toBeCalledTimes(2) + + envConfig.cleanup() + }) + + it('should skip SSE message if older last modified date is received', async () => { + const envConfig = await connectToSSE() + mockEventSourceMethods.onopen() + expect(getEnvironmentConfig_mock).toBeCalledTimes(1) + + const oldLastModifiedDate = new Date( + lastModifiedDate.getTime() - 100000, + ) + mockEventSourceMethods.onmessage({ + data: JSON.stringify({ + data: JSON.stringify({ + type: 'refetchConfig', + etag: 'etag-2', + lastModified: oldLastModifiedDate.toISOString(), + }), + }), + }) + jest.advanceTimersByTime(1000) + expect(getEnvironmentConfig_mock).toBeCalledTimes(1) + + envConfig.cleanup() + }) + + it('should handle SSE connection failures after initial connection and reconnect', async () => { + const envConfig = await connectToSSE() + mockEventSourceMethods.onopen() + expect(getEnvironmentConfig_mock).toBeCalledTimes(1) + expect(envConfig.configEtag).toEqual('etag-1') + + jest.advanceTimersByTime(10000) + getEnvironmentConfig_mock.mockImplementation(async () => + mockFetchResponse( + { + status: 200, + data: { config: {} }, + headers: { + etag: 'etag-2', + 'last-modified': new Date( + lastModifiedDate.getTime() + 10000, + ).toISOString(), + }, + }, + JSON.stringify({ + sse: { + path: 'sse-path', + hostname: 'https://sse.devcycle.com', + }, + }), + ), + ) + mockEventSourceMethods.onerror({ status: 500 }) + expect(mockEventSourceMethods.close).toBeCalledTimes(1) + + jest.advanceTimersByTime(1000) + // Have to use real timers here to allow the event loop to + // process the fetch call that is happening from the setTimeout + jest.useRealTimers() + await new Promise((resolve) => setTimeout(resolve, 10)) + + expect(getEnvironmentConfig_mock).toBeCalledTimes(2) + expect(MockEventSource).toBeCalledTimes(2) + expect(envConfig.configEtag).toEqual('etag-2') + + jest.useFakeTimers() + }) + + it('should re-connect SSE if path changes', async () => { + const envConfig = await connectToSSE() + mockEventSourceMethods.onopen() + expect(getEnvironmentConfig_mock).toBeCalledTimes(1) + expect(envConfig.configEtag).toEqual('etag-1') + + const newLastModifiedDate = new Date( + lastModifiedDate.getTime() + 1000, + ) + getEnvironmentConfig_mock.mockImplementation(async () => + mockFetchResponse( + { + status: 200, + headers: { + etag: 'etag-2', + 'last-modified': newLastModifiedDate.toISOString(), + }, + }, + JSON.stringify({ + sse: { + path: 'sse-path-2', + hostname: 'https://sse.devcycle.com', + }, + }), + ), + ) + mockEventSourceMethods.onmessage({ + data: JSON.stringify({ + data: JSON.stringify({ + type: 'refetchConfig', + etag: 'etag-2', + lastModified: newLastModifiedDate.toISOString(), + }), + }), + }) + jest.advanceTimersByTime(1000) + jest.useRealTimers() + await new Promise((resolve) => setTimeout(resolve, 10)) + + expect(mockEventSourceMethods.close).toBeCalledTimes(1) + expect(MockEventSource).toBeCalledTimes(2) + expect(envConfig.configEtag).toEqual('etag-2') + + jest.useFakeTimers() + }) + }) }) diff --git a/lib/shared/config-manager/__tests__/request.spec.ts b/lib/shared/config-manager/__tests__/request.spec.ts index f46eb5e41..5becca9fe 100644 --- a/lib/shared/config-manager/__tests__/request.spec.ts +++ b/lib/shared/config-manager/__tests__/request.spec.ts @@ -6,6 +6,13 @@ global.fetch = fetch import { getEnvironmentConfig } from '../src/request' const fetchRequestMock = fetch as jest.MockedFn +const logger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), +} + describe('request.ts Unit Tests', () => { beforeEach(() => { fetchRequestMock.mockReset() @@ -20,12 +27,13 @@ describe('request.ts Unit Tests', () => { new Response('', { status: 200 }) as any, ) - const res = await getEnvironmentConfig( + const res = await getEnvironmentConfig({ + logger, url, - 60000, - etag, - lastModified, - ) + requestTimeout: 60000, + currentEtag: etag, + currentLastModified: lastModified, + }) expect(res.status).toEqual(200) expect(fetchRequestMock).toBeCalledWith(url, { headers: { @@ -40,5 +48,33 @@ describe('request.ts Unit Tests', () => { signal: expect.any(AbortSignal), }) }) + + it('should retry requests where last-modified date of CDN is less than SSE date', async () => { + const url = 'https://test.devcycle.com/config' + const etag = 'etag_value' + const lastModifiedDate = new Date() + const sseLastModifiedDate = new Date( + lastModifiedDate.getTime() + 1000, + ) + fetchRequestMock.mockResolvedValue( + new Response('{}', { + status: 200, + headers: { + 'Last-Modified': lastModifiedDate.toISOString(), + }, + }) as any, + ) + + const res = await getEnvironmentConfig({ + logger, + url, + requestTimeout: 60000, + currentEtag: etag, + currentLastModified: lastModifiedDate.toISOString(), + sseLastModified: sseLastModifiedDate.toISOString(), + }) + expect(res.status).toEqual(200) + expect(fetchRequestMock).toHaveBeenCalledTimes(4) + }) }) }) diff --git a/lib/shared/config-manager/src/index.ts b/lib/shared/config-manager/src/index.ts index 5a2befd8d..235bea9c3 100644 --- a/lib/shared/config-manager/src/index.ts +++ b/lib/shared/config-manager/src/index.ts @@ -1,13 +1,16 @@ -import { DVCLogger } from '@devcycle/types' -import { getEnvironmentConfig } from './request' +import { ConfigBody, DVCLogger } from '@devcycle/types' +import { getEnvironmentConfig, isValidDate } from './request' import { ResponseError, UserError } from '@devcycle/server-request' +import { SSEConnection } from '@devcycle/sse-connection' type ConfigPollingOptions = { configPollingIntervalMS?: number + sseConfigPollingIntervalMS?: number configPollingTimeoutMS?: number configCDNURI?: string cdnURI?: string clientMode?: boolean + enableBetaRealTimeUpdates?: boolean } type SetIntervalInterface = (handler: () => void, timeout?: number) => any @@ -20,19 +23,26 @@ type TrackSDKConfigEventInterface = ( err?: ResponseError, reqEtag?: string, reqLastModified?: string, + sseConnected?: boolean, ) => void export class EnvironmentConfigManager { private _hasConfig = false configEtag?: string configLastModified?: string - private readonly pollingIntervalMS: number + configSSE?: ConfigBody['sse'] + + private currentPollingInterval: number + private readonly configPollingIntervalMS: number + private readonly sseConfigPollingIntervalMS: number private readonly requestTimeoutMS: number private readonly cdnURI: string + private readonly enableRealtimeUpdates: boolean + fetchConfigPromise: Promise private intervalTimeout?: any - private disablePolling = false private clientMode: boolean + private sseConnection?: SSEConnection constructor( private readonly logger: DVCLogger, @@ -43,18 +53,26 @@ export class EnvironmentConfigManager { private readonly trackSDKConfigEvent: TrackSDKConfigEventInterface, { configPollingIntervalMS = 10000, + sseConfigPollingIntervalMS = 10 * 60 * 1000, // 10 minutes configPollingTimeoutMS = 5000, configCDNURI, cdnURI = 'https://config-cdn.devcycle.com', clientMode = false, + enableBetaRealTimeUpdates = false, }: ConfigPollingOptions, ) { this.clientMode = clientMode - this.pollingIntervalMS = + this.enableRealtimeUpdates = enableBetaRealTimeUpdates + + this.configPollingIntervalMS = configPollingIntervalMS >= 1000 ? configPollingIntervalMS : 1000 + this.sseConfigPollingIntervalMS = + sseConfigPollingIntervalMS <= 60 * 1000 + ? 10 * 60 * 1000 + : sseConfigPollingIntervalMS this.requestTimeoutMS = - configPollingTimeoutMS >= this.pollingIntervalMS - ? this.pollingIntervalMS + configPollingTimeoutMS >= this.configPollingIntervalMS + ? this.configPollingIntervalMS : configPollingTimeoutMS this.cdnURI = configCDNURI || cdnURI @@ -63,28 +81,120 @@ export class EnvironmentConfigManager { this.logger.debug('DevCycle initial config loaded') }) .finally(() => { - if (this.disablePolling) { - return - } - this.intervalTimeout = this.setInterval(async () => { - try { - await this._fetchConfig() - } catch (ex) { - this.logger.error((ex as Error).message) - } - }, this.pollingIntervalMS) + this.startPolling(this.configPollingIntervalMS) + this.startSSE() }) } + + private startSSE(): void { + if (!this.enableRealtimeUpdates) return + + if (!this.configSSE) { + this.logger.warn('No SSE configuration found') + return + } + if (this.sseConnection) { + return + } + + const url = new URL( + this.configSSE.path, + this.configSSE.hostname, + ).toString() + this.logger.debug(`Starting SSE connection to ${url}`) + + this.sseConnection = new SSEConnection(url, this.logger, { + onMessage: this.onSSEMessage.bind(this), + onOpen: () => { + this.logger.debug('SSE connection opened') + // Set config polling interval to 10 minutes + this.startPolling(this.sseConfigPollingIntervalMS) + }, + onConnectionError: () => { + this.logger.debug('SSE connection error, switching to polling') + // reset polling interval to default + this.startPolling(this.configPollingIntervalMS) + this.stopSSE() + }, + }) + } + + private onSSEMessage(message: string): void { + this.logger.debug(`SSE message: ${message}`) + try { + const parsedMessage = JSON.parse(message) + const messageData = JSON.parse(parsedMessage.data) + if (!messageData) return + const { type, etag, lastModified } = messageData + + if (!(!type || type === 'refetchConfig')) { + return + } + if (this.configEtag && etag === this.configEtag) { + return + } + + if (this.isLastModifiedHeaderOld(lastModified)) { + this.logger.debug( + 'Skipping SSE message, config last modified is newer. ', + ) + return + } + + this._fetchConfig(lastModified) + .then(() => { + this.logger.debug('Config re-fetched from SSE message') + }) + .catch((e) => { + this.logger.warn( + `Failed to re-fetch config from SSE Message: ${e}`, + ) + }) + } catch (e) { + this.logger.debug( + `SSE Message Error: Unparseable message. Error: ${e}, message: ${message}`, + ) + } + } + + private stopSSE(): void { + if (this.sseConnection) { + this.sseConnection.close() + this.sseConnection = undefined + } + } + + private startPolling(pollingInterval: number): void { + if (this.intervalTimeout) { + if (pollingInterval === this.currentPollingInterval) { + return + } + // clear existing polling interval + this.stopPolling() + } + + this.intervalTimeout = this.setInterval(async () => { + try { + await this._fetchConfig() + } catch (ex) { + this.logger.error((ex as Error).message) + } + }, pollingInterval) + this.currentPollingInterval = pollingInterval + } + get hasConfig(): boolean { return this._hasConfig } - stopPolling(): void { - this.disablePolling = true + + private stopPolling(): void { this.clearInterval(this.intervalTimeout) + this.intervalTimeout = null } cleanup(): void { this.stopPolling() + this.stopSSE() } getConfigURL(): string { @@ -94,15 +204,15 @@ export class EnvironmentConfigManager { return `${this.cdnURI}/config/v1/server/${this.sdkKey}.json` } - async _fetchConfig(): Promise { + async _fetchConfig(sseLastModified?: string): Promise { const url = this.getConfigURL() let res: Response | null let projectConfig: string | null = null let responseError: ResponseError | null = null const startTime = Date.now() let responseTimeMS = 0 - const reqEtag = this.configEtag - const reqLastModified = this.configLastModified + const currentEtag = this.configEtag + const currentLastModified = this.configLastModified const logError = (error: any) => { const errMsg = @@ -122,8 +232,9 @@ export class EnvironmentConfigManager { responseTimeMS, res || undefined, err, - reqEtag, - reqLastModified, + currentEtag, + currentLastModified, + this.sseConnection?.isConnected() ?? false, ) } } @@ -133,12 +244,14 @@ export class EnvironmentConfigManager { `Requesting new config for ${url}, etag: ${this.configEtag}` + `, last-modified: ${this.configLastModified}`, ) - res = await getEnvironmentConfig( + res = await getEnvironmentConfig({ + logger: this.logger, url, - this.requestTimeoutMS, - reqEtag, - reqLastModified, - ) + requestTimeout: this.requestTimeoutMS, + currentEtag, + currentLastModified, + sseLastModified, + }) responseTimeMS = Date.now() - startTime projectConfig = await res.text() this.logger.debug( @@ -162,16 +275,24 @@ export class EnvironmentConfigManager { ) return } else if (res?.status === 200 && projectConfig) { + const lastModifiedHeader = res?.headers.get('last-modified') + if (this.isLastModifiedHeaderOld(lastModifiedHeader)) { + this.logger.debug( + 'Skipping saving config, existing last modified date is newer.', + ) + return + } + try { + this.handleSSEConfig(projectConfig) + this.setConfigBuffer( `${this.sdkKey}${this.clientMode ? '_client' : ''}`, projectConfig, ) this._hasConfig = true this.configEtag = res?.headers.get('etag') || '' - this.configLastModified = - res?.headers.get('last-modified') || '' - + this.configLastModified = lastModifiedHeader || '' return } catch (e) { logError(new Error('Invalid config JSON.')) @@ -186,10 +307,48 @@ export class EnvironmentConfigManager { `Failed to download config, using cached version. url: ${url}.`, ) } else if (responseError?.status === 403) { - this.stopPolling() + this.cleanup() throw new UserError(`Invalid SDK key provided: ${this.sdkKey}`) } else { throw new Error('Failed to download DevCycle config.') } } + + private isLastModifiedHeaderOld(lastModifiedHeader: string | null) { + const lastModifiedHeaderDate = lastModifiedHeader + ? new Date(lastModifiedHeader) + : null + const configLastModifiedDate = this.configLastModified + ? new Date(this.configLastModified) + : null + + return ( + isValidDate(configLastModifiedDate) && + isValidDate(lastModifiedHeaderDate) && + lastModifiedHeaderDate <= configLastModifiedDate + ) + } + + private handleSSEConfig(projectConfig: string) { + if (this.enableRealtimeUpdates) { + const configBody = JSON.parse(projectConfig) as ConfigBody + const originalConfigSSE = this.configSSE + this.configSSE = configBody.sse + + // Reconnect SSE if not first config fetch, and the SSE config has changed + if ( + this.hasConfig && + (!originalConfigSSE || + !this.sseConnection || + originalConfigSSE.hostname !== this.configSSE?.hostname || + originalConfigSSE.path !== this.configSSE?.path) + ) { + this.stopSSE() + this.startSSE() + } + } else { + this.configSSE = undefined + this.stopSSE() + } + } } diff --git a/lib/shared/config-manager/src/request.ts b/lib/shared/config-manager/src/request.ts index 78c99b510..88e6ec90e 100644 --- a/lib/shared/config-manager/src/request.ts +++ b/lib/shared/config-manager/src/request.ts @@ -1,24 +1,77 @@ import { getWithTimeout } from '@devcycle/server-request' +import { RequestInitWithRetry } from 'fetch-retry' +import { DVCLogger } from '@devcycle/types' -export async function getEnvironmentConfig( - url: string, - requestTimeout: number, - etag?: string, - lastModified?: string, -): Promise { +export const isValidDate = (date: Date | null): date is Date => + date instanceof Date && !isNaN(date.getTime()) + +export async function getEnvironmentConfig({ + logger, + url, + requestTimeout, + currentEtag, + currentLastModified, + sseLastModified, +}: { + logger: DVCLogger + url: string + requestTimeout: number + currentEtag?: string + currentLastModified?: string + sseLastModified?: string +}): Promise { const headers: Record = {} - if (etag) { - headers['If-None-Match'] = etag + let retries = 1 + + let retryOn: RequestInitWithRetry['retryOn'] | undefined + const sseLastModifiedDate = sseLastModified + ? new Date(sseLastModified) + : null + + // Retry fetching config if the Last-Modified header is older than + // the requiredLastModified from the SSE message + if (sseLastModified && isValidDate(sseLastModifiedDate)) { + retries = 3 + retryOn = (attempt, error, response) => { + if (attempt >= retries) { + return false + } else if (response && response?.status === 200) { + const lastModifiedHeader = response.headers.get('Last-Modified') + const lastModifiedHeaderDate = lastModifiedHeader + ? new Date(lastModifiedHeader) + : null + + if ( + isValidDate(lastModifiedHeaderDate) && + lastModifiedHeaderDate < sseLastModifiedDate + ) { + logger.debug( + `Retry fetching config, last modified is old: ${lastModifiedHeader}` + + `, sse last modified: ${sseLastModified}`, + ) + return true + } + return false + } else if (response && response?.status < 500) { + return false + } + + return true + } + } + if (currentEtag) { + headers['If-None-Match'] = currentEtag } - if (lastModified) { - headers['If-Modified-Since'] = lastModified + if (currentLastModified) { + headers['If-Modified-Since'] = currentLastModified } return await getWithTimeout( url, { headers: headers, - retries: 1, + retries, + retryOn, }, requestTimeout, ) diff --git a/lib/shared/server-request/src/request.ts b/lib/shared/server-request/src/request.ts index c6caf92d7..f1abe56a5 100644 --- a/lib/shared/server-request/src/request.ts +++ b/lib/shared/server-request/src/request.ts @@ -20,6 +20,7 @@ export const exponentialBackoff: RequestInitWithRetry['retryDelay'] = ( return delay + randomSum } +export type RequestInitConfig = RequestInit | RequestInitWithRetry type retryOnRequestErrorFunc = ( retries: number, ) => RequestInitWithRetry['retryOn'] @@ -151,7 +152,8 @@ async function getFetchAndConfig( const useRetries = 'retries' in requestConfig if (useRetries && requestConfig.retries) { const newConfig: RequestInitWithRetry = { ...requestConfig } - newConfig.retryOn = retryOnRequestError(requestConfig.retries) + newConfig.retryOn = + newConfig.retryOn || retryOnRequestError(requestConfig.retries) newConfig.retryDelay = exponentialBackoff return [await getFetchWithRetry(), newConfig] } diff --git a/lib/shared/sse-connection/pacakge.json b/lib/shared/sse-connection/pacakge.json new file mode 100644 index 000000000..cedcb95d3 --- /dev/null +++ b/lib/shared/sse-connection/pacakge.json @@ -0,0 +1,10 @@ +{ + "name": "@devcycle/sse-connection", + "version": "1.0.0", + "license": "MIT", + "dependencies": { + "eventsource": "^2.0.2" + }, + "types": "src/index.d.ts", + "main": "src/index.js" +} diff --git a/lib/shared/sse-connection/src/lib/SSEConnection.spec.ts b/lib/shared/sse-connection/src/lib/SSEConnection.spec.ts index 752cc8d38..36db7f51c 100644 --- a/lib/shared/sse-connection/src/lib/SSEConnection.spec.ts +++ b/lib/shared/sse-connection/src/lib/SSEConnection.spec.ts @@ -1,6 +1,58 @@ -// Add empty test as a placeholder, passWithNoTests option wasn't working +jest.mock('eventsource') + +import { SSEConnection } from './SSEConnection' +import EventSource from 'eventsource' + +const mockLogger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), +} +const mockSSEConnectionFunctions = { + onMessage: jest.fn(), + onOpen: jest.fn(), + onConnectionError: jest.fn(), +} + describe('SSEConnection', () => { - it('empty test', () => { - expect(null).toBeNull() + beforeAll(() => { + Object.defineProperty(EventSource.prototype, 'readyState', { + get: jest.fn(() => EventSource.OPEN), + }) + // jest mock EventSource.close function + EventSource.prototype.close = jest.fn() + }) + + it('should create a SSE connection and open an EventSource connection', () => { + const url = 'http://localhost:8080' + const connection = new SSEConnection( + url, + mockLogger, + mockSSEConnectionFunctions, + ) + expect(EventSource).toHaveBeenCalledWith(url, { withCredentials: true }) + expect(connection.isConnected()).toBe(true) + }) + + it('should close the EventSource connection', () => { + const connection = new SSEConnection( + 'http://localhost:8080', + mockLogger, + mockSSEConnectionFunctions, + ) + connection.close() + expect(EventSource.prototype.close).toHaveBeenCalled() + }) + + it('should reopen the EventSource connection', () => { + const connection = new SSEConnection( + 'http://localhost:8080', + mockLogger, + mockSSEConnectionFunctions, + ) + connection.reopen() + expect(EventSource.prototype.close).toHaveBeenCalled() + expect(EventSource).toHaveBeenCalledTimes(3) }) }) diff --git a/lib/shared/sse-connection/src/lib/SSEConnection.ts b/lib/shared/sse-connection/src/lib/SSEConnection.ts index d868960f2..fb0cef9b8 100644 --- a/lib/shared/sse-connection/src/lib/SSEConnection.ts +++ b/lib/shared/sse-connection/src/lib/SSEConnection.ts @@ -1,12 +1,19 @@ import type { DVCLogger } from '@devcycle/types' +import EventSource from 'eventsource' + +type SSEConnectionFunctions = { + onMessage: (message: unknown) => void + onOpen: () => void + onConnectionError: () => void +} export class SSEConnection { private connection?: EventSource constructor( private url: string, - private onMessage: (message: unknown) => void, private logger: DVCLogger, + private readonly callbacks: SSEConnectionFunctions, ) { this.openConnection() } @@ -14,21 +21,25 @@ export class SSEConnection { private openConnection() { if (typeof EventSource === 'undefined') { this.logger.warn( - 'StreamingConnection not opened. EventSource is not available.', + 'SSEConnection not opened. EventSource is not available.', ) return } this.connection = new EventSource(this.url, { withCredentials: true }) this.connection.onmessage = (event) => { - this.onMessage(event.data) + this.callbacks.onMessage(event.data) } - this.connection.onerror = () => { + this.connection.onerror = (err) => { this.logger.warn( - 'StreamingConnection warning. Connection failed to establish.', + `SSEConnection warning. Connection failed. Error status: ${ + err.status + }, message: ${(err as any).message}`, ) + this.callbacks.onConnectionError() } this.connection.onopen = () => { - this.logger.debug('StreamingConnection opened') + this.logger.debug('SSEConnection opened') + this.callbacks.onOpen() } } diff --git a/lib/shared/sse-connection/tsconfig.lib.json b/lib/shared/sse-connection/tsconfig.lib.json index fcf1bcdb3..163d90724 100644 --- a/lib/shared/sse-connection/tsconfig.lib.json +++ b/lib/shared/sse-connection/tsconfig.lib.json @@ -3,7 +3,7 @@ "compilerOptions": { "outDir": "../../../dist/out-tsc", "declaration": true, - "types": [] + "types": ["node"] }, "include": ["src/**/*.ts"], "exclude": ["jest.config.ts", "src/**/*.spec.ts", "src/**/*.test.ts"] diff --git a/lib/shared/types/src/types/apis/sdk/serverSDKTypes.ts b/lib/shared/types/src/types/apis/sdk/serverSDKTypes.ts index 409994fe6..9696ba5d0 100644 --- a/lib/shared/types/src/types/apis/sdk/serverSDKTypes.ts +++ b/lib/shared/types/src/types/apis/sdk/serverSDKTypes.ts @@ -90,4 +90,17 @@ export interface DevCycleServerSDKOptions { * and provided to the client */ enableClientBootstrapping?: boolean + + /** + * BETA: Enable Real Time Updates and their associated SSE connection + */ + enableBetaRealTimeUpdates?: boolean + + /** + * Controls the polling interval in milliseconds to fetch new environment config changes + * when SSE connections are enabled, defaults to 10 minutes. + * This is only used when enableBetaRealTimeUpdates is true. + * @min 60000 + */ + sseConfigPollingIntervalMS?: number } diff --git a/package.json b/package.json index ebebcd8c9..556664561 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,7 @@ "core-js": "^3.6.5", "cross-fetch": "^4.0.0", "eslint-plugin-lodash": "^7.4.0", + "eventsource": "^2.0.2", "fetch-retry": "^5.0.6", "hoist-non-react-statics": "^3.3.2", "iso-639-1": "^2.1.13", @@ -115,6 +116,7 @@ "@testing-library/react": "14.0.0", "@testing-library/react-native": "12.3.2", "@types/async": "^3.2.8", + "@types/eventsource": "^1.1.15", "@types/hoist-non-react-statics": "^3.3.1", "@types/jest": "29.4.4", "@types/lodash": "^4.14.175", diff --git a/sdk/nodejs/.eslintrc.json b/sdk/nodejs/.eslintrc.json index 1d432a488..b1f5993d3 100644 --- a/sdk/nodejs/.eslintrc.json +++ b/sdk/nodejs/.eslintrc.json @@ -16,7 +16,8 @@ "@devcycle/js-client-sdk", "@openfeature/core", "fetch-retry", - "cross-fetch" + "cross-fetch", + "eventsource" ] } ] diff --git a/sdk/nodejs/package.json b/sdk/nodejs/package.json index dccede081..6776c4d72 100644 --- a/sdk/nodejs/package.json +++ b/sdk/nodejs/package.json @@ -24,6 +24,7 @@ "@devcycle/js-cloud-server-sdk": "^1.12.1", "@devcycle/types": "^1.13.1", "cross-fetch": "^4.0.0", + "eventsource": "^2.0.2", "fetch-retry": "^5.0.6" }, "peerDependencies": { diff --git a/sdk/nodejs/src/client.ts b/sdk/nodejs/src/client.ts index ee9a66f77..e49376fe8 100644 --- a/sdk/nodejs/src/client.ts +++ b/sdk/nodejs/src/client.ts @@ -314,6 +314,7 @@ export class DevCycleClient { err?: ResponseError, reqEtag?: string, reqLastModified?: string, + sseConnected?: boolean, ): void { const populatedUser = DVCPopulatedUserFromDevCycleUser({ user_id: `${this.clientUUID}@${this.hostname}`, @@ -332,6 +333,7 @@ export class DevCycleClient { resRayId: res?.headers.get('cf-ray') ?? undefined, resStatus: (err?.status || res?.status) ?? undefined, errMsg: err?.message ?? undefined, + sseConnected: sseConnected ?? undefined, }, }) } diff --git a/yarn.lock b/yarn.lock index 118c09e05..20f2315de 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4749,6 +4749,7 @@ __metadata: "@devcycle/js-cloud-server-sdk": ^1.12.1 "@devcycle/types": ^1.13.1 cross-fetch: ^4.0.0 + eventsource: ^2.0.2 fetch-retry: ^5.0.6 peerDependencies: "@openfeature/server-sdk": ^1.13.5 @@ -10033,6 +10034,13 @@ __metadata: languageName: node linkType: hard +"@types/eventsource@npm:^1.1.15": + version: 1.1.15 + resolution: "@types/eventsource@npm:1.1.15" + checksum: 52e024f5aebfd6bc166f2162d6e408cf788886007e571519c75f8c3623feaa3c5a74681fd3a128de6d21b28ef88dd683421264f10d5c98728959b99b1229b85e + languageName: node + linkType: hard + "@types/express-serve-static-core@npm:*, @types/express-serve-static-core@npm:^4.17.18": version: 4.17.28 resolution: "@types/express-serve-static-core@npm:4.17.28" @@ -15291,6 +15299,7 @@ __metadata: "@testing-library/react": 14.0.0 "@testing-library/react-native": 12.3.2 "@types/async": ^3.2.8 + "@types/eventsource": ^1.1.15 "@types/express": ^4.17.17 "@types/hoist-non-react-statics": ^3.3.1 "@types/jest": 29.4.4 @@ -15338,6 +15347,7 @@ __metadata: eslint-plugin-react: 7.32.2 eslint-plugin-react-hooks: ~4.6.0 eslint-plugin-standard: ^5.0.0 + eventsource: ^2.0.2 express: ^4.18.2 fetch-retry: ^5.0.6 fs-extra: ^11.2.0 @@ -17063,6 +17073,13 @@ __metadata: languageName: node linkType: hard +"eventsource@npm:^2.0.2": + version: 2.0.2 + resolution: "eventsource@npm:2.0.2" + checksum: c0072d972753e10c705d9b2285b559184bf29d011bc208973dde9c8b6b8b7b6fdad4ef0846cecb249f7b1585e860fdf324cbd2ac854a76bc53649e797496e99a + languageName: node + linkType: hard + "example-react-native-app@workspace:examples/react-native": version: 0.0.0-use.local resolution: "example-react-native-app@workspace:examples/react-native"