Skip to content

Commit fc05219

Browse files
committed
feat(pegboard): add metadata to runner configs
1 parent 34c7008 commit fc05219

File tree

10 files changed

+159
-75
lines changed

10 files changed

+159
-75
lines changed

packages/common/api-types/src/namespaces/runner_configs.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,17 @@ use std::collections::HashMap;
33
use gas::prelude::*;
44
use utoipa::ToSchema;
55

6+
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
7+
pub struct RunnerConfig {
8+
#[serde(flatten)]
9+
pub kind: RunnerConfigKind,
10+
#[serde(default, skip_serializing_if = "Option::is_none")]
11+
pub metadata: Option<serde_json::Value>,
12+
}
13+
614
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
715
#[serde(rename_all = "snake_case")]
8-
pub enum RunnerConfig {
16+
pub enum RunnerConfigKind {
917
Normal {},
1018
Serverless {
1119
url: String,
@@ -21,17 +29,18 @@ pub enum RunnerConfig {
2129

2230
impl Into<rivet_types::runner_configs::RunnerConfig> for RunnerConfig {
2331
fn into(self) -> rivet_types::runner_configs::RunnerConfig {
24-
match self {
25-
RunnerConfig::Normal {} => rivet_types::runner_configs::RunnerConfig::Normal {},
26-
RunnerConfig::Serverless {
32+
let RunnerConfig { kind, metadata } = self;
33+
let kind = match kind {
34+
RunnerConfigKind::Normal {} => rivet_types::runner_configs::RunnerConfigKind::Normal {},
35+
RunnerConfigKind::Serverless {
2736
url,
2837
headers,
2938
request_lifespan,
3039
slots_per_runner,
3140
min_runners,
3241
max_runners,
3342
runners_margin,
34-
} => rivet_types::runner_configs::RunnerConfig::Serverless {
43+
} => rivet_types::runner_configs::RunnerConfigKind::Serverless {
3544
url,
3645
headers: headers.unwrap_or_default(),
3746
request_lifespan,
@@ -40,6 +49,7 @@ impl Into<rivet_types::runner_configs::RunnerConfig> for RunnerConfig {
4049
max_runners,
4150
runners_margin: runners_margin.unwrap_or_default(),
4251
},
43-
}
52+
};
53+
rivet_types::runner_configs::RunnerConfig { kind, metadata }
4454
}
4555
}

packages/common/config/src/config/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,13 @@ impl Root {
167167

168168
pub fn validate_and_set_defaults(&mut self) -> Result<()> {
169169
// Set default pubsub to Postgres if configured for database
170-
if self.pubsub.is_none()
171-
&& let Some(Database::Postgres(pg)) = &self.database
172-
{
173-
self.pubsub = Some(PubSub::PostgresNotify(pubsub::Postgres {
174-
url: pg.url.clone(),
175-
memory_optimization: true,
176-
}));
170+
if self.pubsub.is_none() {
171+
if let Some(Database::Postgres(pg)) = &self.database {
172+
self.pubsub = Some(PubSub::PostgresNotify(pubsub::Postgres {
173+
url: pg.url.clone(),
174+
memory_optimization: true,
175+
}));
176+
}
177177
}
178178

179179
Ok(())

packages/common/types/src/runner_configs.rs

Lines changed: 89 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,17 @@ use std::collections::HashMap;
33
use gas::prelude::*;
44
use utoipa::ToSchema;
55

6+
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
7+
pub struct RunnerConfig {
8+
#[serde(flatten)]
9+
pub kind: RunnerConfigKind,
10+
#[serde(default, skip_serializing_if = "Option::is_none")]
11+
pub metadata: Option<serde_json::Value>,
12+
}
13+
614
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
715
#[serde(rename_all = "snake_case")]
8-
pub enum RunnerConfig {
16+
pub enum RunnerConfigKind {
917
Normal {},
1018
Serverless {
1119
url: String,
@@ -19,15 +27,74 @@ pub enum RunnerConfig {
1927
},
2028
}
2129

30+
impl From<RunnerConfig> for rivet_data::generated::namespace_runner_config_v2::RunnerConfig {
31+
fn from(value: RunnerConfig) -> Self {
32+
let RunnerConfig { kind, metadata } = value;
33+
rivet_data::generated::namespace_runner_config_v2::RunnerConfig {
34+
metadata: metadata.and_then(|value| serde_json::to_string(&value).ok()),
35+
kind: match kind {
36+
RunnerConfigKind::Normal {} => {
37+
rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal
38+
}
39+
RunnerConfigKind::Serverless {
40+
url,
41+
headers,
42+
request_lifespan,
43+
slots_per_runner,
44+
min_runners,
45+
max_runners,
46+
runners_margin,
47+
} => {
48+
rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Serverless(
49+
rivet_data::generated::namespace_runner_config_v2::Serverless {
50+
url,
51+
headers: headers.into(),
52+
request_lifespan,
53+
slots_per_runner,
54+
min_runners,
55+
max_runners,
56+
runners_margin,
57+
},
58+
)
59+
}
60+
},
61+
}
62+
}
63+
}
64+
65+
impl From<rivet_data::generated::namespace_runner_config_v2::RunnerConfig> for RunnerConfig {
66+
fn from(value: rivet_data::generated::namespace_runner_config_v2::RunnerConfig) -> Self {
67+
let rivet_data::generated::namespace_runner_config_v2::RunnerConfig { metadata, kind } =
68+
value;
69+
RunnerConfig {
70+
metadata: metadata.and_then(|raw| serde_json::from_str(&raw).ok()),
71+
kind: match kind {
72+
rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal => {
73+
RunnerConfigKind::Normal {}
74+
}
75+
rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Serverless(
76+
o,
77+
) => RunnerConfigKind::Serverless {
78+
url: o.url,
79+
headers: o.headers.into(),
80+
request_lifespan: o.request_lifespan,
81+
slots_per_runner: o.slots_per_runner,
82+
min_runners: o.min_runners,
83+
max_runners: o.max_runners,
84+
runners_margin: o.runners_margin,
85+
},
86+
},
87+
}
88+
}
89+
}
90+
2291
impl From<RunnerConfig> for rivet_data::generated::namespace_runner_config_v1::Data {
2392
fn from(value: RunnerConfig) -> Self {
24-
match value {
25-
RunnerConfig::Normal {} => {
26-
rivet_data::generated::namespace_runner_config_v1::Data::Normal(
27-
rivet_data::generated::namespace_runner_config_v1::Normal {},
28-
)
93+
match value.kind {
94+
RunnerConfigKind::Normal { .. } => {
95+
unreachable!("Normal runner configs do not have a v1 representation",)
2996
}
30-
RunnerConfig::Serverless {
97+
RunnerConfigKind::Serverless {
3198
url,
3299
headers,
33100
request_lifespan,
@@ -52,31 +119,28 @@ impl From<RunnerConfig> for rivet_data::generated::namespace_runner_config_v1::D
52119

53120
impl From<rivet_data::generated::namespace_runner_config_v1::Data> for RunnerConfig {
54121
fn from(value: rivet_data::generated::namespace_runner_config_v1::Data) -> Self {
55-
match value {
56-
rivet_data::generated::namespace_runner_config_v1::Data::Normal(_) => {
57-
RunnerConfig::Normal {}
58-
}
59-
rivet_data::generated::namespace_runner_config_v1::Data::Serverless(o) => {
60-
RunnerConfig::Serverless {
61-
url: o.url,
62-
headers: o.headers.into(),
63-
request_lifespan: o.request_lifespan,
64-
slots_per_runner: o.slots_per_runner,
65-
min_runners: o.min_runners,
66-
max_runners: o.max_runners,
67-
runners_margin: o.runners_margin,
122+
RunnerConfig {
123+
metadata: None,
124+
kind: match value {
125+
rivet_data::generated::namespace_runner_config_v1::Data::Serverless(o) => {
126+
RunnerConfigKind::Serverless {
127+
url: o.url,
128+
headers: o.headers.into(),
129+
request_lifespan: o.request_lifespan,
130+
slots_per_runner: o.slots_per_runner,
131+
min_runners: o.min_runners,
132+
max_runners: o.max_runners,
133+
runners_margin: o.runners_margin,
134+
}
68135
}
69-
}
136+
},
70137
}
71138
}
72139
}
73140

74141
impl RunnerConfig {
75142
/// If updates to this run config affects the autoscaler.
76143
pub fn affects_autoscaler(&self) -> bool {
77-
match self {
78-
Self::Serverless { .. } => true,
79-
Self::Normal { .. } => false,
80-
}
144+
matches!(self.kind, RunnerConfigKind::Serverless { .. })
81145
}
82146
}

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use pegboard::keys;
1515
use reqwest::header::{HeaderName, HeaderValue};
1616
use reqwest_eventsource as sse;
1717
use rivet_runner_protocol as protocol;
18-
use rivet_types::runner_configs::RunnerConfig;
18+
use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind};
1919
use tokio::{sync::oneshot, task::JoinHandle, time::Duration};
2020
use universaldb::options::StreamingMode;
2121
use universaldb::utils::IsolationLevel::*;
@@ -119,15 +119,15 @@ async fn tick(
119119
let namespace = namespace.first().context("runner namespace not found")?;
120120
let namespace_name = &namespace.name;
121121

122-
let RunnerConfig::Serverless {
122+
let RunnerConfigKind::Serverless {
123123
url,
124124
headers,
125125
request_lifespan,
126126
slots_per_runner,
127127
min_runners,
128128
max_runners,
129129
runners_margin,
130-
} = &runner_config.config
130+
} = &runner_config.config.kind
131131
else {
132132
tracing::warn!(
133133
?ns_id,

packages/services/namespace/src/ops/runner_config/upsert.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use gas::prelude::*;
2-
use rivet_types::runner_configs::RunnerConfig;
2+
use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind};
33
use universaldb::options::MutationType;
44

55
use crate::{errors, keys, utils::runner_config_variant};
@@ -33,9 +33,9 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -
3333
input.config.clone(),
3434
)?;
3535

36-
match &input.config {
37-
RunnerConfig::Normal {} => {}
38-
RunnerConfig::Serverless {
36+
match &input.config.kind {
37+
RunnerConfigKind::Normal { .. } => {}
38+
RunnerConfigKind::Serverless {
3939
url,
4040
headers,
4141
slots_per_runner,
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use rivet_types::{
2-
keys::namespace::runner_config::RunnerConfigVariant, runner_configs::RunnerConfig,
2+
keys::namespace::runner_config::RunnerConfigVariant,
3+
runner_configs::{RunnerConfig, RunnerConfigKind},
34
};
45

56
pub fn runner_config_variant(runner_config: &RunnerConfig) -> RunnerConfigVariant {
6-
match runner_config {
7-
RunnerConfig::Normal { .. } => RunnerConfigVariant::Normal,
8-
RunnerConfig::Serverless { .. } => RunnerConfigVariant::Serverless,
7+
match runner_config.kind {
8+
RunnerConfigKind::Normal { .. } => RunnerConfigVariant::Normal,
9+
RunnerConfigKind::Serverless { .. } => RunnerConfigVariant::Serverless,
910
}
1011
}

packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures_util::{FutureExt, StreamExt, TryFutureExt, stream::FuturesUnordered}
55
use gas::prelude::*;
66
use rivet_api_types::{runner_configs::list as runner_configs_list, runners::list as runners_list};
77
use rivet_api_util::{HeaderMap, Method, request_remote_datacenter};
8-
use rivet_types::runner_configs::RunnerConfig;
8+
use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind};
99
use serde::de::DeserializeOwned;
1010

1111
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -79,8 +79,8 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result<
7979
})
8080
.await?;
8181
if let Some(runner) = res.first() {
82-
match &runner.config {
83-
RunnerConfig::Serverless { max_runners, .. } => {
82+
match &runner.config.kind {
83+
RunnerConfigKind::Serverless { max_runners, .. } => {
8484
if *max_runners != 0 {
8585
return Ok(Some(ctx.config().dc_label()));
8686
}
@@ -143,10 +143,8 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result<
143143
|res| {
144144
res.runner_configs
145145
.iter()
146-
.filter(|(_, rc)| match rc {
147-
rivet_types::runner_configs::RunnerConfig::Serverless {
148-
max_runners, ..
149-
} => *max_runners != 0,
146+
.filter(|(_, rc)| match rc.kind {
147+
RunnerConfigKind::Serverless { max_runners, .. } => max_runners != 0,
150148
_ => false,
151149
})
152150
.count() != 0

packages/services/pegboard/src/workflows/actor/runtime.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ use gas::prelude::*;
66
use rivet_metrics::KeyValue;
77
use rivet_runner_protocol as protocol;
88
use rivet_types::{
9-
actors::CrashPolicy, keys::namespace::runner_config::RunnerConfigVariant,
10-
runner_configs::RunnerConfig,
9+
actors::CrashPolicy,
10+
keys::namespace::runner_config::RunnerConfigVariant,
11+
runner_configs::{RunnerConfig, RunnerConfigKind},
1112
};
1213
use std::time::Instant;
1314
use universaldb::options::{ConflictRangeType, MutationType, StreamingMode};
@@ -145,8 +146,8 @@ async fn allocate_actor(
145146
.await?;
146147
let has_valid_serverless = runner_config_res
147148
.first()
148-
.map(|runner| match &runner.config {
149-
RunnerConfig::Serverless { max_runners, .. } => *max_runners != 0,
149+
.map(|runner| match &runner.config.kind {
150+
RunnerConfigKind::Serverless { max_runners, .. } => *max_runners != 0,
150151
_ => false,
151152
})
152153
.unwrap_or_default();

sdks/rust/data/src/versioned.rs

Lines changed: 17 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)