Skip to content

Commit eeb89d5

Browse files
committed
indexer-common,-agent: Various robustness and UX improvements
- Do not add action to queue if identical action was recently successful - Do not add action to queue if identical action failed within last 15 minutes - Use retries for epoch subgraph queries (5 attempts) - Improve clarity of code to add offchain subgraphs to targetDeployments - Improve ActionManager.fetchActions - Improve code clarity and logging on failed indexing status query
1 parent 781a2dc commit eeb89d5

File tree

10 files changed

+364
-81
lines changed

10 files changed

+364
-81
lines changed

packages/indexer-agent/src/agent.ts

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -310,25 +310,28 @@ class Agent {
310310
indexingRules,
311311
networkDeploymentAllocationDecisions,
312312
}).tryMap(
313-
async target => {
314-
const rules = await indexingRules.value()
313+
async ({ indexingRules, networkDeploymentAllocationDecisions }) => {
314+
const rules = indexingRules
315315
const targetDeploymentIDs = new Set(
316-
target.networkDeploymentAllocationDecisions
316+
networkDeploymentAllocationDecisions
317317
.filter(decision => decision.toAllocate === true)
318318
.map(decision => decision.deployment),
319319
)
320320

321321
// add offchain subgraphs to the deployment list
322+
// from rules
322323
rules
323324
.filter(
324325
rule => rule?.decisionBasis === IndexingDecisionBasis.OFFCHAIN,
325326
)
326327
.map(rule => {
327328
targetDeploymentIDs.add(new SubgraphDeploymentID(rule.identifier))
328329
})
329-
return rules.length === 0
330-
? []
331-
: [...targetDeploymentIDs, ...this.offchainSubgraphs]
330+
// from startup args
331+
this.offchainSubgraphs.map(deployment => {
332+
targetDeploymentIDs.add(deployment)
333+
})
334+
return [...targetDeploymentIDs]
332335
},
333336
{
334337
onError: error =>
@@ -505,9 +508,12 @@ class Agent {
505508
maxAllocationEpochs,
506509
)
507510
} catch (err) {
508-
this.logger.warn(`Failed to reconcile indexer and network`, {
509-
err: indexerError(IndexerErrorCode.IE005, err),
510-
})
511+
this.logger.warn(
512+
`Exited early while reconciling deployments/allocations`,
513+
{
514+
err: indexerError(IndexerErrorCode.IE005, err),
515+
},
516+
)
511517
}
512518
},
513519
)

packages/indexer-agent/src/indexer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import {
1818
SubgraphIdentifierType,
1919
parseGraphQLIndexingStatus,
2020
CostModelAttributes,
21-
ActionFilter,
2221
ActionResult,
2322
ActionItem,
2423
Action,
@@ -28,6 +27,7 @@ import {
2827
ActionType,
2928
Allocation,
3029
AllocationDecision,
30+
ActionFilter,
3131
} from '@graphprotocol/indexer-common'
3232
import { CombinedError } from '@urql/core'
3333
import pMap from 'p-map'

packages/indexer-common/src/actions.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { ActionManager, NetworkMonitor } from './indexer-management'
22
import { AllocationStatus } from './allocations'
3-
import { WhereOperators } from 'sequelize'
3+
import { WhereOperators, WhereOptions } from 'sequelize'
4+
import { Op } from 'sequelize'
5+
import { WhereAttributeHashValue } from 'sequelize/types/model'
46

57
export interface ActionParamsInput {
68
deploymentID?: string
@@ -123,11 +125,25 @@ export const validateActionInputs = async (
123125
}
124126

125127
export interface ActionFilter {
126-
type?: ActionType | undefined
127-
status?: ActionStatus | undefined
128-
source?: string | undefined
129-
reason?: string | undefined
130-
updatedAt?: WhereOperators | undefined
128+
type?: ActionType
129+
status?: ActionStatus
130+
source?: string
131+
reason?: string
132+
updatedAt?: WhereOperators
133+
}
134+
135+
export const actionFilterToWhereOptions = (filter: ActionFilter): WhereOptions => {
136+
const whereOptions = [] as WhereAttributeHashValue<any>[]
137+
138+
Object.entries(filter).forEach(([key, value]) => {
139+
if (value) {
140+
const obj: { [key: string]: any } = {}
141+
obj[key] = value
142+
whereOptions.push(obj)
143+
}
144+
})
145+
146+
return whereOptions.length == 0 ? {} : { [Op.and]: whereOptions }
131147
}
132148

133149
export interface ActionResult {

packages/indexer-common/src/errors.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ export enum IndexerErrorCode {
8282
IE069 = 'IE069',
8383
IE070 = 'IE070',
8484
IE071 = 'IE071',
85+
IE072 = 'IE072',
8586
}
8687

8788
export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
@@ -157,6 +158,7 @@ export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
157158
IE069: 'Failed to query Epoch Block Oracle Subgraph',
158159
IE070: 'Failed to query latest valid epoch and block hash',
159160
IE071: 'Add Epoch subgraph support for non-protocol chains',
161+
IE072: 'Failed to execute batch tx (contract: staking)',
160162
}
161163

162164
export type IndexerErrorCause = unknown

packages/indexer-common/src/indexer-management/__tests__/helpers.ts

Lines changed: 92 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import {
44
IndexerManagementModels,
55
IndexingDecisionBasis,
66
IndexingRuleAttributes,
7-
SubgraphIdentifierType,
8-
fetchIndexingRules,
9-
upsertIndexingRule,
10-
} from '@graphprotocol/indexer-common'
11-
import { Sequelize } from 'sequelize'
7+
} from '../models'
8+
import { fetchIndexingRules, upsertIndexingRule } from '../rules'
9+
import { SubgraphIdentifierType } from '../../subgraphs'
10+
import { ActionManager } from '../actions'
11+
import { actionFilterToWhereOptions, ActionStatus, ActionType } from '../../actions'
12+
import { literal, Op, Sequelize } from 'sequelize'
1213

1314
// Make global Jest variable available
1415
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -29,7 +30,7 @@ describe('Indexing Rules', () => {
2930
beforeAll(setupModels)
3031
test('Insert and fetch indexing rule', async () => {
3132
const logger = createLogger({
32-
name: 'POI dispute tests',
33+
name: 'Indexing rule helpers tests',
3334
async: false,
3435
level: __LOG_LEVEL__ ?? 'error',
3536
})
@@ -59,3 +60,88 @@ describe('Indexing Rules', () => {
5960
await expect(fetchIndexingRules(models, false)).resolves.toHaveLength(1)
6061
})
6162
})
63+
64+
describe('Actions', () => {
65+
beforeAll(setupModels)
66+
67+
test('Generate where options', async () => {
68+
const ActionFilter = {
69+
status: ActionStatus.FAILED,
70+
type: ActionType.ALLOCATE,
71+
}
72+
expect(actionFilterToWhereOptions(ActionFilter)).toEqual({
73+
[Op.and]: [{ status: 'failed' }, { type: 'allocate' }],
74+
})
75+
76+
const yesterday = literal("NOW() - INTERVAL '1d'")
77+
const ActionFilter2 = {
78+
status: ActionStatus.FAILED,
79+
type: ActionType.ALLOCATE,
80+
updatedAt: { [Op.gte]: yesterday },
81+
}
82+
83+
const where = actionFilterToWhereOptions(ActionFilter2)
84+
expect(where).toEqual({
85+
[Op.and]: [
86+
{ status: 'failed' },
87+
{ type: 'allocate' },
88+
{ updatedAt: { [Op.gte]: yesterday } },
89+
],
90+
})
91+
92+
await expect(
93+
models.Action.findAll({
94+
where,
95+
}),
96+
).resolves.toHaveLength(0)
97+
})
98+
99+
test('Insert and fetch actions', async () => {
100+
const action = {
101+
status: ActionStatus.FAILED,
102+
type: ActionType.ALLOCATE,
103+
deploymentID: 'QmQ44hgrWWt3Qf2X9XEX2fPyTbmQbChxwNm5c1t4mhKpGt',
104+
amount: '10000',
105+
force: false,
106+
source: 'indexerAgent',
107+
reason: 'indexingRule',
108+
priority: 0,
109+
}
110+
111+
await models.Action.upsert(action)
112+
113+
const filterOptions = {
114+
status: ActionStatus.FAILED,
115+
type: ActionType.ALLOCATE,
116+
}
117+
118+
const whereOptions = actionFilterToWhereOptions(filterOptions)
119+
expect(whereOptions).toEqual({
120+
[Op.and]: [{ status: 'failed' }, { type: 'allocate' }],
121+
})
122+
123+
await expect(ActionManager.fetchActions(models, filterOptions)).resolves.toHaveLength(
124+
1,
125+
)
126+
127+
await expect(ActionManager.fetchActions(models, filterOptions)).resolves.toHaveLength(
128+
1,
129+
)
130+
131+
await expect(
132+
ActionManager.fetchActions(models, {
133+
status: ActionStatus.FAILED,
134+
type: ActionType.ALLOCATE,
135+
updatedAt: { [Op.gte]: literal("NOW() - INTERVAL '1d'") },
136+
}),
137+
).resolves.toHaveLength(1)
138+
139+
await expect(
140+
ActionManager.fetchActions(models, {
141+
status: ActionStatus.FAILED,
142+
type: ActionType.ALLOCATE,
143+
updatedAt: { [Op.lte]: literal("NOW() - INTERVAL '1d'") },
144+
}),
145+
).resolves.toHaveLength(0)
146+
})
147+
})

packages/indexer-common/src/indexer-management/__tests__/resolvers/actions.ts

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,4 +828,104 @@ describe('Actions', () => {
828828
.toPromise(),
829829
).resolves.toHaveProperty('data.actions', [])
830830
})
831+
832+
test('Reject queueing for action that has recently failed', async () => {
833+
const failedAction = {
834+
status: ActionStatus.FAILED,
835+
type: ActionType.ALLOCATE,
836+
deploymentID: subgraphDeployment1,
837+
amount: '10000',
838+
force: false,
839+
source: 'indexerAgent',
840+
reason: 'indexingRule',
841+
priority: 0,
842+
} as ActionInput
843+
844+
const proposedAction = {
845+
status: ActionStatus.QUEUED,
846+
type: ActionType.ALLOCATE,
847+
deploymentID: subgraphDeployment1,
848+
amount: '10000',
849+
source: 'indexerAgent',
850+
reason: 'indexingRule',
851+
priority: 0,
852+
} as ActionInput
853+
854+
await managementModels.Action.create(failedAction, {
855+
validate: true,
856+
returning: true,
857+
})
858+
859+
await expect(
860+
client.mutation(QUEUE_ACTIONS_MUTATION, { actions: [proposedAction] }).toPromise(),
861+
).resolves.toHaveProperty(
862+
'error',
863+
new CombinedError({
864+
graphQLErrors: [
865+
new GraphQLError(
866+
"Recently executed 'allocate' action found in queue targeting 'QmQ44hgrWWt3Qf2X9XEX2fPyTbmQbChxwNm5c1t4mhKpGt', ignoring.",
867+
),
868+
],
869+
}),
870+
)
871+
await expect(
872+
client
873+
.query(ACTIONS_QUERY, {
874+
filter: { source: 'indexerAgent' },
875+
})
876+
.toPromise(),
877+
).resolves.toHaveProperty('data.actions', [
878+
await actionInputToExpected(failedAction, 1),
879+
])
880+
})
881+
882+
test('Reject queueing for action that has recently succeeded', async () => {
883+
const successfulAction = {
884+
status: ActionStatus.SUCCESS,
885+
type: ActionType.ALLOCATE,
886+
deploymentID: subgraphDeployment1,
887+
amount: '10000',
888+
force: false,
889+
source: 'indexerAgent',
890+
reason: 'indexingRule',
891+
priority: 0,
892+
} as ActionInput
893+
894+
const proposedAction = {
895+
status: ActionStatus.QUEUED,
896+
type: ActionType.ALLOCATE,
897+
deploymentID: subgraphDeployment1,
898+
amount: '10000',
899+
source: 'indexerAgent',
900+
reason: 'indexingRule',
901+
priority: 0,
902+
} as ActionInput
903+
904+
await managementModels.Action.create(successfulAction, {
905+
validate: true,
906+
returning: true,
907+
})
908+
909+
await expect(
910+
client.mutation(QUEUE_ACTIONS_MUTATION, { actions: [proposedAction] }).toPromise(),
911+
).resolves.toHaveProperty(
912+
'error',
913+
new CombinedError({
914+
graphQLErrors: [
915+
new GraphQLError(
916+
"Recently executed 'allocate' action found in queue targeting 'QmQ44hgrWWt3Qf2X9XEX2fPyTbmQbChxwNm5c1t4mhKpGt', ignoring.",
917+
),
918+
],
919+
}),
920+
)
921+
await expect(
922+
client
923+
.query(ACTIONS_QUERY, {
924+
filter: { source: 'indexerAgent' },
925+
})
926+
.toPromise(),
927+
).resolves.toHaveProperty('data.actions', [
928+
await actionInputToExpected(successfulAction, 1),
929+
])
930+
})
831931
})

0 commit comments

Comments
 (0)