diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 08f0bb5d38..813ff3c3f1 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -15,8 +15,8 @@ class TestDynamicDsService extends DynamicDsService { let datasourceParams: DatasourceParams[] = initData; @@ -40,7 +41,7 @@ const mockMetadata = (initData: DatasourceParams[] = []) => { describe('DynamicDsService', () => { let service: TestDynamicDsService; const project = { - templates: [{name: 'Test'}], + templates: [{name: 'Test'}, {name: 'Other'}], } as any as ISubqueryProject; beforeEach(() => { @@ -70,6 +71,69 @@ describe('DynamicDsService', () => { ]); }); + it('can destroy a dynamic datasource', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + // Destroy specific datasource by index + await service.destroyDynamicDatasource('Test', 50, 0); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); + expect(updatedParams[1]).toEqual(testParam2); + + const datasources = (service as any)._datasources; + expect(datasources[0].endBlock).toBe(50); + }); + + it('throws error when destroying non-existent datasource', async () => { + const meta = mockMetadata([testParam1]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('NonExistent', 50, 0)).rejects.toThrow( + 'Datasource at index 0 has template name "Test", not "NonExistent"' + ); + }); + + it('throws error when destroying already destroyed datasource', async () => { + const destroyedParam = {...testParam1, endBlock: 30}; + const meta = mockMetadata([destroyedParam]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'Dynamic datasource at index 0 is already destroyed' + ); + }); + + it('allows creating new datasource after destroying existing one', async () => { + const meta = mockMetadata([testParam1]); + await service.init(meta); + + expect((service as any)._datasourceParams).toEqual([testParam1]); + + // Destroy by index + await service.destroyDynamicDatasource('Test', 50, 0); + + const paramsAfterDestroy = (service as any)._datasourceParams; + expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 50}); + + const newParam = {templateName: 'Test', startBlock: 60}; + await service.createDynamicDatasource(newParam); + + const finalParams = (service as any)._datasourceParams; + const destroyedCount = finalParams.filter((p) => p.endBlock !== undefined).length; + const activeCount = finalParams.filter((p) => p.endBlock === undefined).length; + + expect(destroyedCount).toBeGreaterThanOrEqual(1); + expect(activeCount).toBeGreaterThanOrEqual(1); + + const destroyedParam = finalParams.find((p) => p.startBlock === 1 && p.endBlock === 50); + expect(destroyedParam).toBeDefined(); + + const newParamFound = finalParams.find((p) => p.startBlock === 60 && !p.endBlock); + expect(newParamFound).toBeDefined(); + }); + it('resets dynamic datasources', async () => { const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); await service.init(meta); @@ -83,6 +147,26 @@ describe('DynamicDsService', () => { ]); }); + it('handles reset after datasource destruction correctly', async () => { + const params = [testParam1, testParam2, testParam3, testParam4]; + const meta = mockMetadata(params); + await service.init(meta); + + // Destroy only the first datasource by index + await service.destroyDynamicDatasource('Test', 25, 0); + + const paramsAfterDestroy = (service as any)._datasourceParams; + expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 25}); + + // Reset to block 2 (should keep testParam1 and testParam2) + await service.resetDynamicDatasource(2, null as any); + + const paramsAfterReset = (service as any)._datasourceParams; + expect(paramsAfterReset).toHaveLength(2); + expect(paramsAfterReset[0]).toEqual({...testParam1, endBlock: 25}); + expect(paramsAfterReset[1]).toEqual(testParam2); + }); + it('getDynamicDatasources with force reloads from metadata', async () => { const meta = mockMetadata([testParam1, testParam2]); await service.init(meta); @@ -107,6 +191,30 @@ describe('DynamicDsService', () => { ]); }); + it('loads destroyed datasources with endBlock correctly', async () => { + const destroyedParam = {...testParam1, endBlock: 100}; + const meta = mockMetadata([destroyedParam, testParam2]); + await service.init(meta); + + const datasources = await service.getDynamicDatasources(); + expect(datasources).toHaveLength(2); + expect((datasources[0] as any).endBlock).toBe(100); + expect((datasources[1] as any).endBlock).toBeUndefined(); + }); + + it('updates metadata correctly when destroying datasource', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + // Destroy first datasource by index + await service.destroyDynamicDatasource('Test', 75, 0); + + const metadataParams = await meta.find('dynamicDatasources'); + expect(metadataParams).toBeDefined(); + expect(metadataParams![0]).toEqual({...testParam1, endBlock: 75}); + expect(metadataParams![1]).toEqual(testParam2); + }); + it('can find a template and cannot mutate the template', () => { const template1 = service.getTemplate('Test', 1); const template2 = service.getTemplate('Test', 2); @@ -119,4 +227,232 @@ describe('DynamicDsService', () => { expect(project.templates![0]).toEqual({name: 'Test'}); }); + + it('can create template with endBlock', () => { + const template = service.getTemplate('Test', 1, 100); + + expect(template.startBlock).toBe(1); + expect((template as any).endBlock).toBe(100); + expect((template as any).name).toBeUndefined(); + }); + + it('handles multiple templates with same name during destruction', async () => { + const param1 = {templateName: 'Test', startBlock: 1}; + const param2 = {templateName: 'Test', startBlock: 5}; + const param3 = {templateName: 'Other', startBlock: 3}; + + const meta = mockMetadata([param1, param2, param3]); + await service.init(meta); + + // Should destroy the first matching one by index + await service.destroyDynamicDatasource('Test', 10, 0); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...param1, endBlock: 10}); + expect(updatedParams[1]).toEqual(param2); // Not destroyed + expect(updatedParams[2]).toEqual(param3); // Not destroyed + }); + + it('throws error when service not initialized for destruction', async () => { + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'DynamicDsService has not been initialized' + ); + }); + + describe('getDynamicDatasourcesByTemplate', () => { + it('returns list of active datasources for a template', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); + await service.init(meta); + + const testDatasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(testDatasources).toHaveLength(3); + expect(testDatasources[0]).toEqual({ + index: 0, + templateName: 'Test', + startBlock: 1, + endBlock: undefined, + args: undefined, + }); + expect(testDatasources[1]).toEqual({ + index: 1, + templateName: 'Test', + startBlock: 2, + endBlock: undefined, + args: undefined, + }); + expect(testDatasources[2]).toEqual({ + index: 2, + templateName: 'Test', + startBlock: 3, + endBlock: undefined, + args: undefined, + }); + }); + + it('excludes destroyed datasources from list', async () => { + const destroyedParam = {...testParam1, endBlock: 50}; + const meta = mockMetadata([destroyedParam, testParam2, testParam3]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toHaveLength(2); + expect(datasources[0].index).toBe(1); // Global index + expect(datasources[0].startBlock).toBe(2); + expect(datasources[1].index).toBe(2); // Global index + expect(datasources[1].startBlock).toBe(3); + }); + + it('returns empty array when no datasources match template', async () => { + const meta = mockMetadata([testParamOther]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toEqual([]); + }); + + it('includes args in datasource info when present', async () => { + const paramWithArgs = {...testParam1, args: {address: '0x123', tokenId: 1}}; + const meta = mockMetadata([paramWithArgs]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toHaveLength(1); + expect(datasources[0].args).toEqual({address: '0x123', tokenId: 1}); + }); + + it('throws error when service not initialized', () => { + expect(() => service.getDynamicDatasourcesByTemplate('Test')).toThrow( + 'DynamicDsService has not been initialized' + ); + }); + }); + + describe('destroyDynamicDatasource with index', () => { + it('destroys specific datasource by index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 50, 1); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual(testParam1); // Not destroyed + expect(updatedParams[1]).toEqual({...testParam2, endBlock: 50}); // Destroyed + expect(updatedParams[2]).toEqual(testParam3); // Not destroyed + expect(updatedParams[3]).toEqual(testParamOther); // Not destroyed + }); + + it('throws error when index is out of bounds', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 5)).rejects.toThrow( + 'Index 5 is out of bounds. There are 2 datasource(s) in total' + ); + }); + + it('throws error when index is negative', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, -1)).rejects.toThrow( + 'Index -1 is out of bounds. There are 2 datasource(s) in total' + ); + }); + + it('throws error when trying to destroy already destroyed datasource', async () => { + const destroyedParam = {...testParam1, endBlock: 30}; + const meta = mockMetadata([destroyedParam]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'Dynamic datasource at index 0 is already destroyed' + ); + }); + + it('correctly handles global index after some datasources are destroyed', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); + await service.init(meta); + + // Destroy the first one using global index 0 + await service.destroyDynamicDatasource('Test', 40, 0); + + // Now only 3 active datasources for 'Test' template, with global indices 1, 2, 3 + const activeDatasources = service.getDynamicDatasourcesByTemplate('Test'); + expect(activeDatasources).toHaveLength(3); + expect(activeDatasources[0].index).toBe(1); // Global index + expect(activeDatasources[0].startBlock).toBe(2); + expect(activeDatasources[1].index).toBe(2); // Global index + expect(activeDatasources[1].startBlock).toBe(3); + expect(activeDatasources[2].index).toBe(3); // Global index + expect(activeDatasources[2].startBlock).toBe(4); + + // Destroy using global index 2 (testParam3) + await service.destroyDynamicDatasource('Test', 60, 2); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 40}); + expect(updatedParams[1]).toEqual(testParam2); // Still active + expect(updatedParams[2]).toEqual({...testParam3, endBlock: 60}); + expect(updatedParams[3]).toEqual(testParam4); // Still active + }); + + it('updates datasources in memory correctly when destroying by index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 100, 1); + + const datasources = (service as any)._datasources; + expect(datasources[0].endBlock).toBeUndefined(); + expect(datasources[1].endBlock).toBe(100); + expect(datasources[2].endBlock).toBeUndefined(); + }); + + it('allows destroying datasources from different templates independently', async () => { + const meta = mockMetadata([testParam1, testParam2, testParamOther]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 50, 0); + await service.destroyDynamicDatasource('Other', 60, 2); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); + expect(updatedParams[1]).toEqual(testParam2); // Not destroyed + expect(updatedParams[2]).toEqual({...testParamOther, endBlock: 60}); + }); + + it('throws error when template name does not match global index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParamOther]); + await service.init(meta); + + // Try to destroy 'Test' template with index 2, which is 'Other' template + await expect(service.destroyDynamicDatasource('Test', 50, 2)).rejects.toThrow( + 'Datasource at index 2 has template name "Other", not "Test"' + ); + }); + + it('sets endBlock correctly allowing in-place removal during block processing', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + // Destroy datasource at index 1 at block 50 + await service.destroyDynamicDatasource('Test', 50, 1); + + // Verify the datasource has endBlock set + const dsParam = service.getDatasourceParamByIndex(1); + expect(dsParam).toBeDefined(); + expect(dsParam?.endBlock).toBe(50); + expect(dsParam?.startBlock).toBe(2); + expect(dsParam?.templateName).toBe('Test'); + + // Verify the internal _datasources array also has endBlock set + const datasources = (service as any)._datasources; + expect(datasources[1]).toBeDefined(); + expect((datasources[1] as any).endBlock).toBe(50); + }); + }); }); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 708ab5f3bb..00422ad395 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import {Inject, Injectable} from '@nestjs/common'; -import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource} from '@subql/types-core'; +import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource, DynamicDatasourceInfo} from '@subql/types-core'; import {Transaction} from '@subql/x-sequelize'; import {cloneDeep} from 'lodash'; import {IBlockchainService} from '../blockchain.service'; @@ -19,12 +19,16 @@ export interface DatasourceParams { templateName: string; args?: Record; startBlock: number; + endBlock?: number; } export interface IDynamicDsService { dynamicDatasources: DS[]; createDynamicDatasource(params: DatasourceParams): Promise; + destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise; getDynamicDatasources(forceReload?: boolean): Promise; + getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[]; + getDatasourceParamByIndex(index: number): DatasourceParams | undefined; } @Injectable() @@ -91,6 +95,90 @@ export class DynamicDsService ({params, globalIndex})) + .filter(({params}) => params.templateName === templateName && params.endBlock === undefined); + + return matchingDatasources.map(({globalIndex, params}) => ({ + index: globalIndex, + templateName: params.templateName, + startBlock: params.startBlock, + endBlock: params.endBlock, + args: params.args, + })); + } + + /** + * Get datasource parameters by global index. + * + * @param index - Global index in the internal datasource parameters array + * @returns DatasourceParams if found, undefined otherwise + */ + getDatasourceParamByIndex(index: number): DatasourceParams | undefined { + return this._datasourceParams?.[index]; + } + + async destroyDynamicDatasource( + templateName: string, + currentBlockHeight: number, + index: number, + tx?: Transaction + ): Promise { + if (!this._datasources || !this._datasourceParams) { + throw new Error('DynamicDsService has not been initialized'); + } + + // Get the datasource at the global index + const dsParam = this._datasourceParams[index]; + + // Validate datasource exists + if (!dsParam) { + throw new Error( + `Index ${index} is out of bounds. There are ${this._datasourceParams.length} datasource(s) in total` + ); + } + + // Validate it matches the template name and is not already destroyed + if (dsParam.templateName !== templateName) { + throw new Error( + `Datasource at index ${index} has template name "${dsParam.templateName}", not "${templateName}"` + ); + } + + if (dsParam.endBlock !== undefined) { + throw new Error(`Dynamic datasource at index ${index} is already destroyed`); + } + + // Update the datasource params + const updatedParams = {...dsParam, endBlock: currentBlockHeight}; + this._datasourceParams[index] = updatedParams; + + // Update the datasource object if it exists + // Note: _datasources and _datasourceParams arrays should always be in sync. + // If the index is valid for params, it must also be valid for datasources. + if (!this._datasources[index]) { + throw new Error(`Datasources array out of sync with params at index ${index}`); + } + (this._datasources[index] as any).endBlock = currentBlockHeight; + + await this.metadata.set(METADATA_KEY, this._datasourceParams, tx); + + logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`); + } + // Not force only seems to be used for project changes async getDynamicDatasources(forceReload?: boolean): Promise { // Workers should not cache this result in order to keep in sync @@ -117,19 +205,19 @@ export class DynamicDsService t.name === templateName); if (!t) { throw new Error(`Unable to find matching template in project for name: "${templateName}"`); } const {name, ...template} = cloneDeep(t); - return {...template, startBlock} as DS; + return {...template, startBlock, endBlock} as DS; } private async getDatasource(params: DatasourceParams): Promise { - const dsObj = this.getTemplate(params.templateName, params.startBlock); + const dsObj = this.getTemplate(params.templateName, params.startBlock, params.endBlock); try { await this.blockchainService.updateDynamicDs(params, dsObj); diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index c484eeab0f..10bfc2303b 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -116,6 +116,32 @@ export abstract class BaseIndexerManager< dynamicDsCreated = true; }, 'createDynamicDatasource'); + // Inject function to get dynamic datasources by template into vm + vm.freeze((templateName: string) => { + return this.dynamicDsService.getDynamicDatasourcesByTemplate(templateName); + }, 'getDynamicDatasources'); + + // Inject function to destroy ds into vm + vm.freeze(async (templateName: string, index: number) => { + await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); + + // Remove the destroyed datasource from the current processing array + // Find the datasource by matching the global index stored in the service + const destroyedDsParam = this.dynamicDsService.getDatasourceParamByIndex(index); + if (destroyedDsParam) { + const dsIndex = filteredDataSources.findIndex((fds) => { + return ( + fds.startBlock === destroyedDsParam.startBlock && + JSON.stringify((fds as any).options || (fds as any).processor?.options || {}) === + JSON.stringify(destroyedDsParam.args || {}) + ); + }); + if (dsIndex !== -1) { + filteredDataSources.splice(dsIndex, 1); + } + } + }, 'destroyDynamicDatasource'); + return vm; }); } diff --git a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts index 3c6e456a08..88193f7a06 100644 --- a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts @@ -3,16 +3,21 @@ import {isMainThread} from 'node:worker_threads'; import {Injectable} from '@nestjs/common'; +import {DynamicDatasourceInfo} from '@subql/types-core'; import {DatasourceParams, IDynamicDsService} from '../dynamic-ds.service'; export type HostDynamicDS = { dynamicDsCreateDynamicDatasource: (params: DatasourceParams) => Promise; + dynamicDsDestroyDynamicDatasource: (templateName: string, currentBlockHeight: number, index: number) => Promise; dynamicDsGetDynamicDatasources: () => Promise; + dynamicDsGetDynamicDatasourcesByTemplate: (templateName: string) => DynamicDatasourceInfo[]; }; export const hostDynamicDsKeys: (keyof HostDynamicDS)[] = [ 'dynamicDsCreateDynamicDatasource', + 'dynamicDsDestroyDynamicDatasource', 'dynamicDsGetDynamicDatasources', + 'dynamicDsGetDynamicDatasourcesByTemplate', ]; @Injectable() @@ -32,14 +37,24 @@ export class WorkerDynamicDsService implements IDynamicDsService { return this.host.dynamicDsCreateDynamicDatasource(JSON.parse(JSON.stringify(params))); } + async destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise { + return this.host.dynamicDsDestroyDynamicDatasource(templateName, currentBlockHeight, index); + } + async getDynamicDatasources(): Promise { return this.host.dynamicDsGetDynamicDatasources(); } + + getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[] { + return this.host.dynamicDsGetDynamicDatasourcesByTemplate(templateName); + } } export function dynamicDsHostFunctions(dynamicDsService: IDynamicDsService): HostDynamicDS { return { dynamicDsCreateDynamicDatasource: dynamicDsService.createDynamicDatasource.bind(dynamicDsService), + dynamicDsDestroyDynamicDatasource: dynamicDsService.destroyDynamicDatasource.bind(dynamicDsService), dynamicDsGetDynamicDatasources: dynamicDsService.getDynamicDatasources.bind(dynamicDsService), + dynamicDsGetDynamicDatasourcesByTemplate: dynamicDsService.getDynamicDatasourcesByTemplate.bind(dynamicDsService), }; } diff --git a/packages/types-core/src/global.ts b/packages/types-core/src/global.ts index cbaf1be173..654999c971 100644 --- a/packages/types-core/src/global.ts +++ b/packages/types-core/src/global.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import type Pino from 'pino'; -import {Cache, DynamicDatasourceCreator} from './interfaces'; +import {Cache, DynamicDatasourceCreator, DynamicDatasourceDestructor, DynamicDatasourceGetter} from './interfaces'; import {Store} from './store'; // base global @@ -12,4 +12,6 @@ declare global { const cache: Cache; const chainId: string; const createDynamicDatasource: DynamicDatasourceCreator; + const destroyDynamicDatasource: DynamicDatasourceDestructor; + const getDynamicDatasources: DynamicDatasourceGetter; } diff --git a/packages/types-core/src/interfaces.ts b/packages/types-core/src/interfaces.ts index bde78e77ac..7930692ad8 100644 --- a/packages/types-core/src/interfaces.ts +++ b/packages/types-core/src/interfaces.ts @@ -2,6 +2,28 @@ // SPDX-License-Identifier: GPL-3.0 export type DynamicDatasourceCreator = (name: string, args: Record) => Promise; +export type DynamicDatasourceDestructor = (name: string, index: number) => Promise; + +/** + * Information about a dynamic datasource instance. + */ +export interface DynamicDatasourceInfo { + /** + * Global index of the datasource in the internal storage array. + * Use this value when calling destroyDynamicDatasource(). + */ + index: number; + /** Template name this datasource was created from */ + templateName: string; + /** Block height where this datasource starts processing */ + startBlock: number; + /** Block height where this datasource stops processing (if destroyed) */ + endBlock?: number; + /** Arguments passed when creating this datasource */ + args?: Record; +} + +export type DynamicDatasourceGetter = (templateName: string) => DynamicDatasourceInfo[]; export interface Cache = Record> { set(key: keyof T, value: T[keyof T]): Promise;