Skip to content

Commit e2155cd

Browse files
committed
feat(serverless): pass runner name, namespace to runner (#3056)
Fixes KIT-332 (merge with rivet-dev/rivetkit#1335)
1 parent e410867 commit e2155cd

File tree

1 file changed

+29
-2
lines changed
  • packages/core/pegboard-serverless/src

1 file changed

+29
-2
lines changed

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use universaldb::utils::IsolationLevel::*;
2222
use vbare::OwnedVersionedData;
2323

2424
const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token");
25-
const X_RIVETKIT_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivetkit-total-slots");
25+
const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots");
26+
const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name");
27+
const X_RIVET_NAMESPACE_ID: HeaderName = HeaderName::from_static("x-rivet-namespace-id");
2628

2729
struct OutboundConnection {
2830
handle: JoinHandle<()>,
@@ -107,6 +109,15 @@ async fn tick(
107109
.find(|rc| rc.namespace_id == *ns_id)
108110
.context("runner config not found")?;
109111

112+
let namespace = ctx
113+
.op(namespace::ops::get_global::Input {
114+
namespace_ids: vec![ns_id.clone()],
115+
})
116+
.await
117+
.context("runner namespace not found")?;
118+
let namespace = namespace.first().context("runner namespace not found")?;
119+
let namespace_name = &namespace.name;
120+
110121
let RunnerConfig::Serverless {
111122
url,
112123
headers,
@@ -160,6 +171,8 @@ async fn tick(
160171
headers.clone(),
161172
Duration::from_secs(*request_lifespan as u64),
162173
*slots_per_runner,
174+
runner_name.clone(),
175+
namespace_name.clone(),
163176
)
164177
})
165178
.take(start_count);
@@ -186,6 +199,8 @@ fn spawn_connection(
186199
headers: HashMap<String, String>,
187200
request_lifespan: Duration,
188201
slots_per_runner: u32,
202+
runner_name: String,
203+
namespace_name: String,
189204
) -> OutboundConnection {
190205
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
191206
let draining = Arc::new(AtomicBool::new(false));
@@ -198,6 +213,8 @@ fn spawn_connection(
198213
headers,
199214
request_lifespan,
200215
slots_per_runner,
216+
runner_name,
217+
namespace_name,
201218
shutdown_rx,
202219
draining2,
203220
)
@@ -229,6 +246,8 @@ async fn outbound_handler(
229246
headers: HashMap<String, String>,
230247
request_lifespan: Duration,
231248
slots_per_runner: u32,
249+
runner_name: String,
250+
namespace_name: String,
232251
shutdown_rx: oneshot::Receiver<()>,
233252
draining: Arc<AtomicBool>,
234253
) -> Result<()> {
@@ -243,9 +262,17 @@ async fn outbound_handler(
243262
))
244263
})
245264
.chain(std::iter::once((
246-
X_RIVETKIT_TOTAL_SLOTS,
265+
X_RIVET_TOTAL_SLOTS,
247266
HeaderValue::try_from(slots_per_runner)?,
248267
)))
268+
.chain(std::iter::once((
269+
X_RIVET_RUNNER_NAME,
270+
HeaderValue::try_from(runner_name)?,
271+
)))
272+
.chain(std::iter::once((
273+
X_RIVET_NAMESPACE_ID,
274+
HeaderValue::try_from(namespace_name)?,
275+
)))
249276
// Add token if auth is enabled
250277
.chain(
251278
ctx.config()

0 commit comments

Comments
 (0)