Skip to content

Commit 2475631

Browse files
committed
chore(epoxy): add replica reconfigure endpoint
1 parent cd6e39d commit 2475631

File tree

4 files changed

+51
-3
lines changed

4 files changed

+51
-3
lines changed

packages/core/api-peer/src/internal.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ pub struct CachePurgeRequest {
1111
}
1212

1313
#[derive(Serialize)]
14+
#[serde(deny_unknown_fields)]
1415
pub struct CachePurgeResponse {}
1516

16-
#[tracing::instrument(skip_all)]
1717
pub async fn cache_purge(
1818
ctx: ApiCtx,
1919
_path: (),
@@ -30,9 +30,9 @@ pub async fn cache_purge(
3030
}
3131

3232
#[derive(Serialize)]
33+
#[serde(deny_unknown_fields)]
3334
pub struct BumpServerlessAutoscalerResponse {}
3435

35-
#[tracing::instrument(skip_all)]
3636
pub async fn bump_serverless_autoscaler(
3737
ctx: ApiCtx,
3838
_path: (),
@@ -82,3 +82,24 @@ pub async fn set_tracing_config(
8282

8383
Ok(SetTracingConfigResponse {})
8484
}
85+
86+
#[derive(Deserialize)]
87+
#[serde(deny_unknown_fields)]
88+
pub struct ReplicaReconfigureRequest {}
89+
90+
#[derive(Serialize)]
91+
#[serde(deny_unknown_fields)]
92+
pub struct ReplicaReconfigureResponse {}
93+
94+
pub async fn epoxy_replica_reconfigure(
95+
ctx: ApiCtx,
96+
_path: (),
97+
_query: (),
98+
_body: ReplicaReconfigureRequest,
99+
) -> Result<ReplicaReconfigureResponse> {
100+
ctx.signal(epoxy::workflows::coordinator::ReplicaReconfigure {})
101+
.send()
102+
.await?;
103+
104+
Ok(ReplicaReconfigureResponse {})
105+
}

packages/core/api-peer/src/router.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ pub async fn router(
3535
"/bump-serverless-autoscaler",
3636
post(internal::bump_serverless_autoscaler),
3737
)
38-
// MARK: Debug
38+
.route(
39+
"/epoxy/replica-reconfigure",
40+
post(internal::epoxy_replica_reconfigure),
41+
)
3942
.route("/debug/tracing/config", put(internal::set_tracing_config))
4043
})
4144
.await

packages/services/epoxy/src/workflows/coordinator/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ pub async fn epoxy_coordinator(ctx: &mut WorkflowCtx, _input: &Input) -> Result<
3737
Main::ReplicaStatusChange(sig) => {
3838
replica_status_change::replica_status_change(ctx, sig).await?;
3939
}
40+
Main::ReplicaReconfigure(_) => {
41+
replica_status_change::replica_reconfigure(ctx).await?;
42+
}
4043
}
4144

4245
Ok(Loop::<()>::Continue)
@@ -81,7 +84,11 @@ pub struct ReplicaStatusChange {
8184
pub status: types::ReplicaStatus,
8285
}
8386

87+
#[signal("epoxy_coordinator_replica_reconfigure")]
88+
pub struct ReplicaReconfigure {}
89+
8490
join_signal!(Main {
8591
Reconfigure,
8692
ReplicaStatusChange,
93+
ReplicaReconfigure,
8794
});

packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,23 @@ pub async fn replica_status_change(
3737
Ok(())
3838
}
3939

40+
#[tracing::instrument(skip_all)]
41+
pub async fn replica_reconfigure(
42+
ctx: &mut WorkflowCtx,
43+
) -> Result<()> {
44+
let notify_out = ctx.activity(NotifyAllReplicasInput {}).await?;
45+
46+
let replica_id = ctx.config().epoxy_replica_id();
47+
ctx.msg(super::ConfigChangeMessage {
48+
config: notify_out.config,
49+
})
50+
.tag("replica", replica_id)
51+
.send()
52+
.await?;
53+
54+
Ok(())
55+
}
56+
4057
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
4158
pub struct UpdateReplicaStatusInput {
4259
pub replica_id: protocol::ReplicaId,

0 commit comments

Comments
 (0)