From 8e536615e48fea77e233ca950a8a55a958193ac7 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 2 Oct 2025 14:39:39 -0700 Subject: [PATCH] chore: add tracing --- Cargo.lock | 1 + README.md | 2 +- docker/dev/grafana/dashboards/gasoline.json | 2 +- .../src/driver/postgres/database.rs | 2 +- packages/core/actor-kv/Cargo.toml | 1 + packages/core/actor-kv/src/lib.rs | 13 +++++++++++- packages/core/pegboard-serverless/src/lib.rs | 1 + .../epoxy/src/ops/explicit_prepare.rs | 8 ++++++-- .../services/epoxy/src/ops/kv/get_local.rs | 1 + .../epoxy/src/ops/kv/get_optimistic.rs | 2 ++ packages/services/epoxy/src/ops/propose.rs | 20 ++++++++++++++----- .../epoxy/src/ops/read_cluster_config.rs | 1 + packages/services/epoxy/src/replica/ballot.rs | 3 +++ .../services/epoxy/src/replica/commit_kv.rs | 1 + .../epoxy/src/replica/lead_consensus.rs | 1 + packages/services/epoxy/src/replica/log.rs | 1 + .../epoxy/src/replica/message_request.rs | 19 ++++++++++++------ .../epoxy/src/replica/messages/accept.rs | 1 + .../epoxy/src/replica/messages/accepted.rs | 1 + .../epoxy/src/replica/messages/commit.rs | 1 + .../epoxy/src/replica/messages/committed.rs | 1 + .../replica/messages/download_instances.rs | 1 + .../epoxy/src/replica/messages/pre_accept.rs | 1 + .../epoxy/src/replica/messages/prepare.rs | 1 + packages/services/epoxy/src/replica/utils.rs | 2 ++ .../src/workflows/coordinator/reconfigure.rs | 1 + .../coordinator/replica_status_change.rs | 1 + .../epoxy/src/workflows/replica/setup.rs | 9 +++++++-- .../services/namespace/src/ops/get_local.rs | 2 +- .../src/ops/resolve_for_name_local.rs | 2 +- .../namespace/src/ops/runner_config/delete.rs | 2 +- .../namespace/src/ops/runner_config/list.rs | 2 +- .../services/pegboard/src/ops/actor/get.rs | 2 +- .../services/pegboard/src/ops/runner/get.rs | 1 + .../pegboard/src/ops/runner/get_by_key.rs | 1 + .../pegboard/src/ops/runner/list_for_ns.rs | 1 + .../src/workflows/actor/actor_keys.rs | 1 + .../pegboard/src/workflows/actor/destroy.rs | 1 + .../pegboard/src/workflows/actor/runtime.rs | 3 +++ .../services/pegboard/src/workflows/runner.rs | 2 ++ scripts/tests/actor_e2e.ts | 9 ++++----- scripts/tests/spam_actors.ts | 9 ++++----- 42 files changed, 103 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3b695d857..94bc4c424a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3276,6 +3276,7 @@ version = "25.7.3" dependencies = [ "anyhow", "futures-util", + "gasoline", "pegboard", "rivet-runner-protocol", "rivet-util-id", diff --git a/README.md b/README.md index 33ce35c45d..a769416bd7 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ Projects powering Rivet Engine: - **[Pegboard](packages/services/pegboard/)**: Actor orchestrator - **[Guard](packages/core/guard/)**: Proxy for routing traffic to Rivet Actors -- **[Chirp](packages/common/chirp-workflow/)**: Core workflow engine that powers Rivet +- **[Gasoline](packages/common/gasoline/)**: Core durable execution engine that powers Rivet ## Get Started diff --git a/docker/dev/grafana/dashboards/gasoline.json b/docker/dev/grafana/dashboards/gasoline.json index bac8c7b6da..0aedb3490b 100644 --- a/docker/dev/grafana/dashboards/gasoline.json +++ b/docker/dev/grafana/dashboards/gasoline.json @@ -2580,4 +2580,4 @@ "uid": "636d22f9-d18f-4086-8b45-7c50886a105c", "version": 1, "weekStart": "" -} +} \ No newline at end of file diff --git a/packages/common/universaldb/src/driver/postgres/database.rs b/packages/common/universaldb/src/driver/postgres/database.rs index 1588835a1e..5bd24c8670 100644 --- a/packages/common/universaldb/src/driver/postgres/database.rs +++ b/packages/common/universaldb/src/driver/postgres/database.rs @@ -53,7 +53,7 @@ impl PostgresDatabaseDriver { .context("failed to create btree_gist extension")?; conn.execute( - "CREATE SEQUENCE global_version_seq START WITH 1 INCREMENT BY 1 MINVALUE 1", + "CREATE SEQUENCE IF NOT EXISTS global_version_seq START WITH 1 INCREMENT BY 1 MINVALUE 1", &[], ) .await diff --git a/packages/core/actor-kv/Cargo.toml b/packages/core/actor-kv/Cargo.toml index 8c92c45a1f..4e5d17df4e 100644 --- a/packages/core/actor-kv/Cargo.toml +++ b/packages/core/actor-kv/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] anyhow.workspace = true futures-util.workspace = true +gas.workspace = true rivet-runner-protocol.workspace = true rivet-util-id.workspace = true serde_bare.workspace = true diff --git a/packages/core/actor-kv/src/lib.rs b/packages/core/actor-kv/src/lib.rs index f42eb14280..65b1ca8c55 100644 --- a/packages/core/actor-kv/src/lib.rs +++ b/packages/core/actor-kv/src/lib.rs @@ -3,9 +3,9 @@ use std::result::Result::{Err, Ok}; use anyhow::*; use entry::{EntryBaseKey, EntryBuilder, EntryMetadataKey, EntryValueChunkKey}; use futures_util::{StreamExt, TryStreamExt}; +use gas::prelude::*; use key::{KeyWrapper, ListKeyWrapper}; use rivet_runner_protocol as rp; -use rivet_util_id::Id; use universaldb::prelude::*; use universaldb::tuple::Subspace; use utils::{validate_entries, validate_keys}; @@ -27,6 +27,7 @@ fn subspace(actor_id: Id) -> universaldb::utils::Subspace { } /// Returns estimated size of the given subspace. +#[tracing::instrument(skip_all)] pub async fn get_subspace_size(db: &universaldb::Database, subspace: &Subspace) -> Result { let (start, end) = subspace.range(); @@ -38,6 +39,7 @@ pub async fn get_subspace_size(db: &universaldb::Database, subspace: &Subspace) } /// Gets keys from the KV store. +#[tracing::instrument(skip_all)] pub async fn get( db: &universaldb::Database, actor_id: Id, @@ -120,11 +122,13 @@ pub async fn get( Ok((keys, values, metadata)) } }) + .custom_instrument(tracing::info_span!("kv_get_tx")) .await .map_err(Into::::into) } /// Gets keys from the KV store. +#[tracing::instrument(skip_all)] pub async fn list( db: &universaldb::Database, actor_id: Id, @@ -210,11 +214,13 @@ pub async fn list( Ok((keys, values, metadata)) } }) + .custom_instrument(tracing::info_span!("kv_list_tx")) .await .map_err(Into::::into) } /// Puts keys into the KV store. +#[tracing::instrument(skip_all)] pub async fn put( db: &universaldb::Database, actor_id: Id, @@ -273,11 +279,13 @@ pub async fn put( .await } }) + .custom_instrument(tracing::info_span!("kv_put_tx")) .await .map_err(Into::into) } /// Deletes keys from the KV store. Cannot be undone. +#[tracing::instrument(skip_all)] pub async fn delete(db: &universaldb::Database, actor_id: Id, keys: Vec) -> Result<()> { validate_keys(&keys)?; @@ -293,16 +301,19 @@ pub async fn delete(db: &universaldb::Database, actor_id: Id, keys: Vec Result<()> { db.run(|tx| async move { tx.clear_subspace_range(&subspace(actor_id)); Ok(()) }) + .custom_instrument(tracing::info_span!("kv_delete_all_tx")) .await .map_err(Into::into) } diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index a30d4b23e5..267979a7cd 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -91,6 +91,7 @@ async fn tick( .try_collect::>() .await }) + .custom_instrument(tracing::info_span!("tick_tx")) .await?; let runner_configs = ctx diff --git a/packages/services/epoxy/src/ops/explicit_prepare.rs b/packages/services/epoxy/src/ops/explicit_prepare.rs index b75070d3f7..40c36a0a30 100644 --- a/packages/services/epoxy/src/ops/explicit_prepare.rs +++ b/packages/services/epoxy/src/ops/explicit_prepare.rs @@ -36,13 +36,15 @@ pub async fn epoxy_explicit_prepare( // Read config let config = ctx .udb()? - .run(move |tx| async move { utils::read_config(&tx, replica_id).await }) + .run(|tx| async move { utils::read_config(&tx, replica_id).await }) + .custom_instrument(tracing::info_span!("read_config_tx")) .await?; // EPaxos Step 25: Increment ballot number let new_ballot = ctx .udb()? - .run(move |tx| async move { replica::ballot::increment_ballot(&tx, replica_id).await }) + .run(|tx| async move { replica::ballot::increment_ballot(&tx, replica_id).await }) + .custom_instrument(tracing::info_span!("increment_ballot_tx")) .await?; // Get quorum members @@ -248,6 +250,7 @@ fn compare_ballots(a: &protocol::Ballot, b: &protocol::Ballot) -> std::cmp::Orde } } +#[tracing::instrument(skip_all)] async fn send_prepares( ctx: &OperationCtx, config: &protocol::ClusterConfig, @@ -300,6 +303,7 @@ async fn send_prepares( Ok(responses) } +#[tracing::instrument(skip_all)] async fn restart_phase1( ctx: &OperationCtx, _config: &protocol::ClusterConfig, diff --git a/packages/services/epoxy/src/ops/kv/get_local.rs b/packages/services/epoxy/src/ops/kv/get_local.rs index ef314689a3..64b3f95116 100644 --- a/packages/services/epoxy/src/ops/kv/get_local.rs +++ b/packages/services/epoxy/src/ops/kv/get_local.rs @@ -41,6 +41,7 @@ pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result Resul .await } }) + .custom_instrument(tracing::info_span!("get_optimistic_tx")) .await?; if value.is_some() { @@ -144,6 +145,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul .await } }) + .custom_instrument(tracing::info_span!("cache_value_tx")) .await?; return Ok(Output { value: Some(value) }); diff --git a/packages/services/epoxy/src/ops/propose.rs b/packages/services/epoxy/src/ops/propose.rs index cc8a0fe7bc..c6f32940e5 100644 --- a/packages/services/epoxy/src/ops/propose.rs +++ b/packages/services/epoxy/src/ops/propose.rs @@ -33,16 +33,18 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result Result Result Res let replica_id = input.replica_id; async move { utils::read_config(&tx, replica_id).await } }) + .custom_instrument(tracing::info_span!("read_cluster_config_tx")) .await?; Ok(Output { config }) diff --git a/packages/services/epoxy/src/replica/ballot.rs b/packages/services/epoxy/src/replica/ballot.rs index ae52749299..f91855e253 100644 --- a/packages/services/epoxy/src/replica/ballot.rs +++ b/packages/services/epoxy/src/replica/ballot.rs @@ -6,6 +6,7 @@ use universaldb::utils::{FormalKey, IsolationLevel::*}; use crate::keys; /// Get the current ballot for this replica +#[tracing::instrument(skip_all)] pub async fn get_ballot( tx: &Transaction, replica_id: protocol::ReplicaId, @@ -31,6 +32,7 @@ pub async fn get_ballot( } /// Increment the ballot number and return the new ballot +#[tracing::instrument(skip_all)] pub async fn increment_ballot( tx: &Transaction, replica_id: protocol::ReplicaId, @@ -67,6 +69,7 @@ pub fn compare_ballots( /// ballot if needed. /// /// Returns true if the ballot is valid (higher than previously seen). +#[tracing::instrument(skip_all)] pub async fn validate_and_update_ballot_for_instance( tx: &Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/epoxy/src/replica/commit_kv.rs b/packages/services/epoxy/src/replica/commit_kv.rs index 7a9fcfa450..139ec64a92 100644 --- a/packages/services/epoxy/src/replica/commit_kv.rs +++ b/packages/services/epoxy/src/replica/commit_kv.rs @@ -6,6 +6,7 @@ use universaldb::prelude::*; use crate::{keys, ops::propose::CommandError, replica::utils}; /// Commits a proposal to KV store. +#[tracing::instrument(skip_all)] pub async fn commit_kv( tx: &Transaction, replica_id: ReplicaId, diff --git a/packages/services/epoxy/src/replica/lead_consensus.rs b/packages/services/epoxy/src/replica/lead_consensus.rs index 96de6e75a3..8103e78868 100644 --- a/packages/services/epoxy/src/replica/lead_consensus.rs +++ b/packages/services/epoxy/src/replica/lead_consensus.rs @@ -6,6 +6,7 @@ use universaldb::utils::{FormalKey, IsolationLevel::*}; use crate::keys; use crate::replica::{ballot, messages, utils}; +#[tracing::instrument(skip_all)] pub async fn lead_consensus( tx: &Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/epoxy/src/replica/log.rs b/packages/services/epoxy/src/replica/log.rs index ab9ba95d13..05f3b63eb7 100644 --- a/packages/services/epoxy/src/replica/log.rs +++ b/packages/services/epoxy/src/replica/log.rs @@ -15,6 +15,7 @@ pub fn state_order(state: &protocol::State) -> u8 { } } +#[tracing::instrument(skip_all)] pub async fn update_log( tx: &Transaction, replica_id: ReplicaId, diff --git a/packages/services/epoxy/src/replica/message_request.rs b/packages/services/epoxy/src/replica/message_request.rs index 518df80bb6..dd63e9d4b3 100644 --- a/packages/services/epoxy/src/replica/message_request.rs +++ b/packages/services/epoxy/src/replica/message_request.rs @@ -5,6 +5,7 @@ use rivet_api_builder::prelude::*; use crate::{ops, replica}; +#[tracing::instrument(skip_all)] pub async fn message_request( ctx: &ApiCtx, replica_id: ReplicaId, @@ -20,10 +21,11 @@ pub async fn message_request( // Store the configuration ctx.udb()? - .run(move |tx| { + .run(|tx| { let req = req.clone(); async move { replica::update_config::update_config(&*tx, replica_id, req) } }) + .custom_instrument(tracing::info_span!("update_config_tx")) .await?; protocol::ResponseKind::UpdateConfigResponse @@ -31,33 +33,36 @@ pub async fn message_request( protocol::RequestKind::PreAcceptRequest(req) => { let response = ctx .udb()? - .run(move |tx| { + .run(|tx| { let req = req.clone(); async move { replica::messages::pre_accept(&*tx, replica_id, req).await } }) + .custom_instrument(tracing::info_span!("pre_accept_tx")) .await?; protocol::ResponseKind::PreAcceptResponse(response) } protocol::RequestKind::AcceptRequest(req) => { let response = ctx .udb()? - .run(move |tx| { + .run(|tx| { let req = req.clone(); async move { replica::messages::accept(&*tx, replica_id, req).await } }) + .custom_instrument(tracing::info_span!("accept_tx")) .await?; protocol::ResponseKind::AcceptResponse(response) } protocol::RequestKind::CommitRequest(req) => { // Commit and update KV store ctx.udb()? - .run(move |tx| { + .run(|tx| { let req = req.clone(); async move { replica::messages::commit(&*tx, replica_id, req, true).await?; Result::Ok(()) } }) + .custom_instrument(tracing::info_span!("commit_tx")) .await?; protocol::ResponseKind::CommitResponse @@ -65,10 +70,11 @@ pub async fn message_request( protocol::RequestKind::PrepareRequest(req) => { let response = ctx .udb()? - .run(move |tx| { + .run(|tx| { let req = req.clone(); async move { replica::messages::prepare(&*tx, replica_id, req).await } }) + .custom_instrument(tracing::info_span!("prepare_tx")) .await?; protocol::ResponseKind::PrepareResponse(response) } @@ -76,10 +82,11 @@ pub async fn message_request( // Handle download instances request - read from UDB and return instances let instances = ctx .udb()? - .run(move |tx| { + .run(|tx| { let req = req.clone(); async move { replica::messages::download_instances(&*tx, replica_id, req).await } }) + .custom_instrument(tracing::info_span!("download_instances_tx")) .await?; protocol::ResponseKind::DownloadInstancesResponse(protocol::DownloadInstancesResponse { diff --git a/packages/services/epoxy/src/replica/messages/accept.rs b/packages/services/epoxy/src/replica/messages/accept.rs index 0f570e892e..f123796d0d 100644 --- a/packages/services/epoxy/src/replica/messages/accept.rs +++ b/packages/services/epoxy/src/replica/messages/accept.rs @@ -4,6 +4,7 @@ use universaldb::Transaction; use crate::replica::{ballot, messages}; +#[tracing::instrument(skip_all)] pub async fn accept( tx: &Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/epoxy/src/replica/messages/accepted.rs b/packages/services/epoxy/src/replica/messages/accepted.rs index 5314ef88c2..bcbb0a0147 100644 --- a/packages/services/epoxy/src/replica/messages/accepted.rs +++ b/packages/services/epoxy/src/replica/messages/accepted.rs @@ -5,6 +5,7 @@ use universaldb::Transaction; use crate::replica::{ballot, messages, utils}; // EPaxos Step 16 +#[tracing::instrument(skip_all)] pub async fn accepted( tx: &Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/epoxy/src/replica/messages/commit.rs b/packages/services/epoxy/src/replica/messages/commit.rs index fd52d64833..e85aff3d1a 100644 --- a/packages/services/epoxy/src/replica/messages/commit.rs +++ b/packages/services/epoxy/src/replica/messages/commit.rs @@ -5,6 +5,7 @@ use universaldb::Transaction; use crate::replica::ballot; // EPaxos Step 24 +#[tracing::instrument(skip_all)] pub async fn commit( tx: &Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/epoxy/src/replica/messages/committed.rs b/packages/services/epoxy/src/replica/messages/committed.rs index 444c7681d9..3ad726157d 100644 --- a/packages/services/epoxy/src/replica/messages/committed.rs +++ b/packages/services/epoxy/src/replica/messages/committed.rs @@ -5,6 +5,7 @@ use universaldb::Transaction; use crate::replica::ballot; // EPaxos Steps 21-22 +#[tracing::instrument(skip_all)] pub async fn committed( tx: &Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/epoxy/src/replica/messages/download_instances.rs b/packages/services/epoxy/src/replica/messages/download_instances.rs index aa997f1d04..2f7c9b37e7 100644 --- a/packages/services/epoxy/src/replica/messages/download_instances.rs +++ b/packages/services/epoxy/src/replica/messages/download_instances.rs @@ -6,6 +6,7 @@ use universaldb::{KeySelector, RangeOption, Transaction, options::StreamingMode} use crate::keys; +#[tracing::instrument(skip_all)] pub async fn download_instances( tx: &Transaction, replica_id: ReplicaId, diff --git a/packages/services/epoxy/src/replica/messages/pre_accept.rs b/packages/services/epoxy/src/replica/messages/pre_accept.rs index 23f33755a6..452f9d51ba 100644 --- a/packages/services/epoxy/src/replica/messages/pre_accept.rs +++ b/packages/services/epoxy/src/replica/messages/pre_accept.rs @@ -5,6 +5,7 @@ use universaldb::Transaction; use crate::replica::{ballot, messages, utils}; +#[tracing::instrument(skip_all)] pub async fn pre_accept( tx: &Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/epoxy/src/replica/messages/prepare.rs b/packages/services/epoxy/src/replica/messages/prepare.rs index df3581eb3b..1c9df0f47d 100644 --- a/packages/services/epoxy/src/replica/messages/prepare.rs +++ b/packages/services/epoxy/src/replica/messages/prepare.rs @@ -5,6 +5,7 @@ use universaldb::utils::{FormalKey, IsolationLevel::*}; use crate::{keys, replica::ballot}; +#[tracing::instrument(skip_all)] pub async fn prepare( tx: &Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/epoxy/src/replica/utils.rs b/packages/services/epoxy/src/replica/utils.rs index 870d3db7ef..f867c78f0a 100644 --- a/packages/services/epoxy/src/replica/utils.rs +++ b/packages/services/epoxy/src/replica/utils.rs @@ -8,6 +8,7 @@ use universaldb::{KeySelector, RangeOption, Transaction, options::StreamingMode} use crate::keys; // Helper function to find interference for a key +#[tracing::instrument(skip_all)] pub async fn find_interference( tx: &Transaction, replica_id: ReplicaId, @@ -54,6 +55,7 @@ pub async fn find_interference( } // Helper function to find max sequence number from interference set +#[tracing::instrument(skip_all)] pub async fn find_max_seq( tx: &Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs b/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs index 1e738cdc47..c82ee4ac2e 100644 --- a/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs +++ b/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs @@ -8,6 +8,7 @@ use crate::types; use super::State; +#[tracing::instrument(skip_all)] pub async fn reconfigure(ctx: &mut WorkflowCtx) -> Result<()> { // Check for config changes let config_change = ctx.activity(CheckConfigChangesInput {}).await?; diff --git a/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs b/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs index d35f2c6706..c2cd732996 100644 --- a/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs +++ b/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use super::State; use crate::types; +#[tracing::instrument(skip_all)] pub async fn replica_status_change( ctx: &mut WorkflowCtx, signal: super::ReplicaStatusChange, diff --git a/packages/services/epoxy/src/workflows/replica/setup.rs b/packages/services/epoxy/src/workflows/replica/setup.rs index 8d9b43b2f6..662a1f0524 100644 --- a/packages/services/epoxy/src/workflows/replica/setup.rs +++ b/packages/services/epoxy/src/workflows/replica/setup.rs @@ -14,6 +14,7 @@ use crate::types; // `BeginLearning`. This is because the value of `read_cluster_config` may change between // activities which can cause the learning process to enter an invalid state. +#[tracing::instrument(skip_all)] pub async fn setup_replica(ctx: &mut WorkflowCtx, _input: &super::Input) -> Result<()> { // Wait for cooridinator to send begin learning signal let begin_learning = ctx.listen::().await?; @@ -232,6 +233,7 @@ pub async fn download_instances_chunk( /// If we committed values during this phase, we might incorrectly apply an older value when /// a newer one exists but hasn't been downloaded yet, or apply a value that should be superseded /// by another concurrent operation. +#[tracing::instrument(skip_all)] async fn apply_log_entry( ctx: &ActivityCtx, log_entry: &protocol::LogEntry, @@ -247,7 +249,7 @@ async fn apply_log_entry( // Replay the log entry ctx.udb()? - .run(move |tx| { + .run(|tx| { let log_entry = log_entry.clone(); let instance = instance.clone(); @@ -301,6 +303,7 @@ async fn apply_log_entry( Result::Ok(()) } }) + .custom_instrument(tracing::info_span!("apply_log_entry_tx")) .await?; tracing::info!( @@ -364,7 +367,7 @@ pub async fn recover_keys_chunk( let (last_key, recovered_count) = ctx .udb()? - .run(move |tx| { + .run(|tx| { let after_key = input.after_key.clone(); let count = input.count; @@ -491,6 +494,7 @@ pub async fn recover_keys_chunk( Result::Ok((last_processed_key, recovered_count)) } }) + .custom_instrument(tracing::info_span!("recover_keys_chunk_tx")) .await?; Ok(RecoverKeysChunkOutput { @@ -623,6 +627,7 @@ fn topological_sort_entries( /// /// This is why we can't commit values during `apply_log_entry` - we need to see all /// instances and their dependencies first to correctly determine the execution order. +#[tracing::instrument(skip_all)] async fn recover_key_value_with_instances( tx: &universaldb::Transaction, replica_id: protocol::ReplicaId, diff --git a/packages/services/namespace/src/ops/get_local.rs b/packages/services/namespace/src/ops/get_local.rs index 82ec33b84b..e72a25c5da 100644 --- a/packages/services/namespace/src/ops/get_local.rs +++ b/packages/services/namespace/src/ops/get_local.rs @@ -30,7 +30,7 @@ pub async fn namespace_get_local(ctx: &OperationCtx, input: &Input) -> Result>() .await }) - .custom_instrument(tracing::info_span!("namespace_get_tx")) + .custom_instrument(tracing::info_span!("namespace_get_local_tx")) .await?; Ok(namespaces) diff --git a/packages/services/namespace/src/ops/resolve_for_name_local.rs b/packages/services/namespace/src/ops/resolve_for_name_local.rs index cd212abb82..5aaff209cf 100644 --- a/packages/services/namespace/src/ops/resolve_for_name_local.rs +++ b/packages/services/namespace/src/ops/resolve_for_name_local.rs @@ -35,7 +35,7 @@ pub async fn namespace_resolve_for_name_local( get_inner(namespace_id, &tx).await } }) - .custom_instrument(tracing::info_span!("namespace_resolve_for_name_tx")) + .custom_instrument(tracing::info_span!("namespace_resolve_for_name_local_tx")) .await .map_err(Into::into) } diff --git a/packages/services/namespace/src/ops/runner_config/delete.rs b/packages/services/namespace/src/ops/runner_config/delete.rs index 1600d780b1..ca15c28bf3 100644 --- a/packages/services/namespace/src/ops/runner_config/delete.rs +++ b/packages/services/namespace/src/ops/runner_config/delete.rs @@ -36,7 +36,7 @@ pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) - Ok(()) }) - .custom_instrument(tracing::info_span!("runner_config_upsert_tx")) + .custom_instrument(tracing::info_span!("runner_config_delete_tx")) .await?; // Bump autoscaler diff --git a/packages/services/namespace/src/ops/runner_config/list.rs b/packages/services/namespace/src/ops/runner_config/list.rs index 12501d8160..cab782523e 100644 --- a/packages/services/namespace/src/ops/runner_config/list.rs +++ b/packages/services/namespace/src/ops/runner_config/list.rs @@ -91,7 +91,7 @@ pub async fn namespace_runner_config_list( .try_collect() .await }) - .custom_instrument(tracing::info_span!("runner_config_get_local_tx")) + .custom_instrument(tracing::info_span!("runner_config_list_tx")) .await?; Ok(runner_configs) diff --git a/packages/services/pegboard/src/ops/actor/get.rs b/packages/services/pegboard/src/ops/actor/get.rs index 22c9b55f17..237453c19f 100644 --- a/packages/services/pegboard/src/ops/actor/get.rs +++ b/packages/services/pegboard/src/ops/actor/get.rs @@ -44,7 +44,7 @@ pub async fn pegboard_actor_get(ctx: &OperationCtx, input: &Input) -> Result>() .await }) - .custom_instrument(tracing::info_span!("actor_list_wf_tx")) + .custom_instrument(tracing::info_span!("actor_get_tx")) .await?; let wfs = ctx diff --git a/packages/services/pegboard/src/ops/runner/get.rs b/packages/services/pegboard/src/ops/runner/get.rs index 3c588170fb..3dbf83a5c0 100644 --- a/packages/services/pegboard/src/ops/runner/get.rs +++ b/packages/services/pegboard/src/ops/runner/get.rs @@ -37,6 +37,7 @@ pub async fn pegboard_runner_get(ctx: &OperationCtx, input: &Input) -> Result Re } } }) + .custom_instrument(tracing::info_span!("runner_get_by_key_tx")) .await?; Ok(Output { runner }) diff --git a/packages/services/pegboard/src/ops/runner/list_for_ns.rs b/packages/services/pegboard/src/ops/runner/list_for_ns.rs index 0947a8d1c6..b99ae83b0a 100644 --- a/packages/services/pegboard/src/ops/runner/list_for_ns.rs +++ b/packages/services/pegboard/src/ops/runner/list_for_ns.rs @@ -202,6 +202,7 @@ pub async fn pegboard_runner_list_for_ns(ctx: &OperationCtx, input: &Input) -> R .await } }) + .custom_instrument(tracing::info_span!("runner_list_for_ns_tx")) .await?; Ok(Output { runners }) diff --git a/packages/services/pegboard/src/workflows/actor/actor_keys.rs b/packages/services/pegboard/src/workflows/actor/actor_keys.rs index 7bb8bcf851..28aae688fe 100644 --- a/packages/services/pegboard/src/workflows/actor/actor_keys.rs +++ b/packages/services/pegboard/src/workflows/actor/actor_keys.rs @@ -276,6 +276,7 @@ pub async fn reserve_actor_key( Ok(ReserveActorKeyOutput::Success) }) + .custom_instrument(tracing::info_span!("actor_reserve_key_tx")) .await?; Ok(res) diff --git a/packages/services/pegboard/src/workflows/actor/destroy.rs b/packages/services/pegboard/src/workflows/actor/destroy.rs index 0a18fa6cf0..67043d6aba 100644 --- a/packages/services/pegboard/src/workflows/actor/destroy.rs +++ b/packages/services/pegboard/src/workflows/actor/destroy.rs @@ -179,6 +179,7 @@ async fn clear_kv(ctx: &ActivityCtx, input: &ClearKvInput) -> Result Result<( Ok(()) }) + .custom_instrument(tracing::info_span!("actor_set_started_tx")) .await?; Ok(()) @@ -752,6 +754,7 @@ pub async fn set_sleeping(ctx: &ActivityCtx, input: &SetSleepingInput) -> Result Ok(()) }) + .custom_instrument(tracing::info_span!("actor_set_sleeping_tx")) .await?; Ok(()) diff --git a/packages/services/pegboard/src/workflows/runner.rs b/packages/services/pegboard/src/workflows/runner.rs index 2bd68a981f..abed09a15f 100644 --- a/packages/services/pegboard/src/workflows/runner.rs +++ b/packages/services/pegboard/src/workflows/runner.rs @@ -508,6 +508,7 @@ async fn init(ctx: &ActivityCtx, input: &InitInput) -> Result { Ok(evict_workflow_id) }) + .custom_instrument(tracing::info_span!("runner_init_tx")) .await?; Ok(InitOutput { evict_workflow_id }) @@ -728,6 +729,7 @@ async fn clear_db(ctx: &ActivityCtx, input: &ClearDbInput) -> Result<()> { Ok(()) }) + .custom_instrument(tracing::info_span!("runner_clear_tx")) .await?; // Does not clear the data keys like last ping ts, just the allocation idx diff --git a/scripts/tests/actor_e2e.ts b/scripts/tests/actor_e2e.ts index d350c4c549..0003902155 100755 --- a/scripts/tests/actor_e2e.ts +++ b/scripts/tests/actor_e2e.ts @@ -1,7 +1,6 @@ #!/usr/bin/env tsx import { RIVET_ENDPOINT, createActor, destroyActor } from "./utils"; -import WebSocket from "ws"; async function main() { try { @@ -74,7 +73,7 @@ function testWebSocket(actorId: string): Promise { resolve(); }, 2000); - ws.on("open", () => { + ws.addEventListener("open", () => { console.log("WebSocket connected"); // Test ping-pong @@ -82,7 +81,7 @@ function testWebSocket(actorId: string): Promise { ws.send("ping"); }); - ws.on("message", (data) => { + ws.addEventListener("message", (data) => { const message = data.toString(); console.log(`WebSocket received raw data:`, data); console.log(`WebSocket received message: "${message}"`); @@ -108,12 +107,12 @@ function testWebSocket(actorId: string): Promise { } }); - ws.on("error", (error) => { + ws.addEventListener("error", (error) => { clearTimeout(timeout); reject(new Error(`WebSocket error: ${error.message}`)); }); - ws.on("close", () => { + ws.addEventListener("close", () => { clearTimeout(timeout); if (!pingReceived || !echoReceived) { reject(new Error("WebSocket closed before completing tests")); diff --git a/scripts/tests/spam_actors.ts b/scripts/tests/spam_actors.ts index 7cad3cc2ff..dd00625128 100755 --- a/scripts/tests/spam_actors.ts +++ b/scripts/tests/spam_actors.ts @@ -1,7 +1,6 @@ #!/usr/bin/env tsx import { RIVET_ENDPOINT, createActor, destroyActor } from "./utils"; -import WebSocket from "ws"; const ACTORS = parseInt(process.argv[2]) || 15; @@ -81,7 +80,7 @@ function testWebSocket(actorId: string): Promise { resolve(); }, 2000); - ws.on("open", () => { + ws.addEventListener("open", () => { console.log("WebSocket connected"); // Test ping-pong @@ -89,7 +88,7 @@ function testWebSocket(actorId: string): Promise { ws.send("ping"); }); - ws.on("message", (data) => { + ws.addEventListener("message", (data) => { const message = data.toString(); console.log(`WebSocket received raw data:`, data); console.log(`WebSocket received message: "${message}"`); @@ -115,12 +114,12 @@ function testWebSocket(actorId: string): Promise { } }); - ws.on("error", (error) => { + ws.addEventListener("error", (error) => { clearTimeout(timeout); reject(new Error(`WebSocket error: ${error.message}`)); }); - ws.on("close", () => { + ws.addEventListener("close", () => { clearTimeout(timeout); if (!pingReceived || !echoReceived) { reject(new Error("WebSocket closed before completing tests"));