Skip to content

Commit 24806c0

Browse files
committed
feat(serverless): pass runner name, namespace to runner
1 parent 63e1f81 commit 24806c0

File tree

1 file changed

+26
-2
lines changed
  • packages/core/pegboard-serverless/src

1 file changed

+26
-2
lines changed

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use universaldb::utils::IsolationLevel::*;
2222
use vbare::OwnedVersionedData;
2323

2424
const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token");
25-
const X_RIVETKIT_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivetkit-total-slots");
25+
const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots");
26+
const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name");
27+
const X_RIVET_NAMESPACE_ID: HeaderName = HeaderName::from_static("x-rivet-namespace-id");
2628

2729
struct OutboundConnection {
2830
handle: JoinHandle<()>,
@@ -107,6 +109,12 @@ async fn tick(
107109
.find(|rc| rc.namespace_id == *ns_id)
108110
.context("runner config not found")?;
109111

112+
let namespace = ctx.op(namespace::ops::get_global::Input {
113+
namespace_ids: vec![ns_id.clone()]
114+
}).await.context("runner namespace not found")?;
115+
let namespace = namespace.first().context("runner namespace not found")?;
116+
let namespace_name = &namespace.name;
117+
110118
let RunnerConfig::Serverless {
111119
url,
112120
headers,
@@ -160,6 +168,8 @@ async fn tick(
160168
headers.clone(),
161169
Duration::from_secs(*request_lifespan as u64),
162170
*slots_per_runner,
171+
runner_name.clone(),
172+
namespace_name.clone(),
163173
)
164174
})
165175
.take(start_count);
@@ -186,6 +196,8 @@ fn spawn_connection(
186196
headers: HashMap<String, String>,
187197
request_lifespan: Duration,
188198
slots_per_runner: u32,
199+
runner_name: String,
200+
namespace_id_string: String,
189201
) -> OutboundConnection {
190202
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
191203
let draining = Arc::new(AtomicBool::new(false));
@@ -198,6 +210,8 @@ fn spawn_connection(
198210
headers,
199211
request_lifespan,
200212
slots_per_runner,
213+
runner_name,
214+
namespace_id_string,
201215
shutdown_rx,
202216
draining2,
203217
)
@@ -229,6 +243,8 @@ async fn outbound_handler(
229243
headers: HashMap<String, String>,
230244
request_lifespan: Duration,
231245
slots_per_runner: u32,
246+
runner_name: String,
247+
namespace_id_string: String,
232248
shutdown_rx: oneshot::Receiver<()>,
233249
draining: Arc<AtomicBool>,
234250
) -> Result<()> {
@@ -243,9 +259,17 @@ async fn outbound_handler(
243259
))
244260
})
245261
.chain(std::iter::once((
246-
X_RIVETKIT_TOTAL_SLOTS,
262+
X_RIVET_TOTAL_SLOTS,
247263
HeaderValue::try_from(slots_per_runner)?,
248264
)))
265+
.chain(std::iter::once((
266+
X_RIVET_RUNNER_NAME,
267+
HeaderValue::try_from(runner_name)?,
268+
)))
269+
.chain(std::iter::once((
270+
X_RIVET_NAMESPACE_ID,
271+
HeaderValue::try_from(namespace_id_string)?
272+
)))
249273
// Add token if auth is enabled
250274
.chain(
251275
ctx.config()

0 commit comments

Comments
 (0)