Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Projects powering Rivet Engine:

- **[Pegboard](packages/services/pegboard/)**: Actor orchestrator
- **[Guard](packages/core/guard/)**: Proxy for routing traffic to Rivet Actors
- **[Chirp](packages/common/chirp-workflow/)**: Core workflow engine that powers Rivet
- **[Gasoline](packages/common/gasoline/)**: Core durable execution engine that powers Rivet

## Get Started

Expand Down
2 changes: 1 addition & 1 deletion docker/dev/grafana/dashboards/gasoline.json
Original file line number Diff line number Diff line change
Expand Up @@ -2580,4 +2580,4 @@
"uid": "636d22f9-d18f-4086-8b45-7c50886a105c",
"version": 1,
"weekStart": ""
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl PostgresDatabaseDriver {
.context("failed to create btree_gist extension")?;

conn.execute(
"CREATE SEQUENCE global_version_seq START WITH 1 INCREMENT BY 1 MINVALUE 1",
"CREATE SEQUENCE IF NOT EXISTS global_version_seq START WITH 1 INCREMENT BY 1 MINVALUE 1",
&[],
)
.await
Expand Down
1 change: 1 addition & 0 deletions packages/core/actor-kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition.workspace = true
[dependencies]
anyhow.workspace = true
futures-util.workspace = true
gas.workspace = true
rivet-runner-protocol.workspace = true
rivet-util-id.workspace = true
serde_bare.workspace = true
Expand Down
13 changes: 12 additions & 1 deletion packages/core/actor-kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::result::Result::{Err, Ok};
use anyhow::*;
use entry::{EntryBaseKey, EntryBuilder, EntryMetadataKey, EntryValueChunkKey};
use futures_util::{StreamExt, TryStreamExt};
use gas::prelude::*;
use key::{KeyWrapper, ListKeyWrapper};
use rivet_runner_protocol as rp;
use rivet_util_id::Id;
use universaldb::prelude::*;
use universaldb::tuple::Subspace;
use utils::{validate_entries, validate_keys};
Expand All @@ -27,6 +27,7 @@ fn subspace(actor_id: Id) -> universaldb::utils::Subspace {
}

/// Returns estimated size of the given subspace.
#[tracing::instrument(skip_all)]
pub async fn get_subspace_size(db: &universaldb::Database, subspace: &Subspace) -> Result<i64> {
let (start, end) = subspace.range();

Expand All @@ -38,6 +39,7 @@ pub async fn get_subspace_size(db: &universaldb::Database, subspace: &Subspace)
}

/// Gets keys from the KV store.
#[tracing::instrument(skip_all)]
pub async fn get(
db: &universaldb::Database,
actor_id: Id,
Expand Down Expand Up @@ -120,11 +122,13 @@ pub async fn get(
Ok((keys, values, metadata))
}
})
.custom_instrument(tracing::info_span!("kv_get_tx"))
.await
.map_err(Into::<anyhow::Error>::into)
}

/// Gets keys from the KV store.
#[tracing::instrument(skip_all)]
pub async fn list(
db: &universaldb::Database,
actor_id: Id,
Expand Down Expand Up @@ -210,11 +214,13 @@ pub async fn list(
Ok((keys, values, metadata))
}
})
.custom_instrument(tracing::info_span!("kv_list_tx"))
.await
.map_err(Into::<anyhow::Error>::into)
}

/// Puts keys into the KV store.
#[tracing::instrument(skip_all)]
pub async fn put(
db: &universaldb::Database,
actor_id: Id,
Expand Down Expand Up @@ -273,11 +279,13 @@ pub async fn put(
.await
}
})
.custom_instrument(tracing::info_span!("kv_put_tx"))
.await
.map_err(Into::into)
}

/// Deletes keys from the KV store. Cannot be undone.
#[tracing::instrument(skip_all)]
pub async fn delete(db: &universaldb::Database, actor_id: Id, keys: Vec<rp::KvKey>) -> Result<()> {
validate_keys(&keys)?;

Expand All @@ -293,16 +301,19 @@ pub async fn delete(db: &universaldb::Database, actor_id: Id, keys: Vec<rp::KvKe
Ok(())
}
})
.custom_instrument(tracing::info_span!("kv_delete_tx"))
.await
.map_err(Into::into)
}

/// Deletes all keys from the KV store. Cannot be undone.
#[tracing::instrument(skip_all)]
pub async fn delete_all(db: &universaldb::Database, actor_id: Id) -> Result<()> {
db.run(|tx| async move {
tx.clear_subspace_range(&subspace(actor_id));
Ok(())
})
.custom_instrument(tracing::info_span!("kv_delete_all_tx"))
.await
.map_err(Into::into)
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async fn tick(
.try_collect::<Vec<_>>()
.await
})
.custom_instrument(tracing::info_span!("tick_tx"))
.await?;

let runner_configs = ctx
Expand Down
8 changes: 6 additions & 2 deletions packages/services/epoxy/src/ops/explicit_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ pub async fn epoxy_explicit_prepare(
// Read config
let config = ctx
.udb()?
.run(move |tx| async move { utils::read_config(&tx, replica_id).await })
.run(|tx| async move { utils::read_config(&tx, replica_id).await })
.custom_instrument(tracing::info_span!("read_config_tx"))
.await?;

// EPaxos Step 25: Increment ballot number
let new_ballot = ctx
.udb()?
.run(move |tx| async move { replica::ballot::increment_ballot(&tx, replica_id).await })
.run(|tx| async move { replica::ballot::increment_ballot(&tx, replica_id).await })
.custom_instrument(tracing::info_span!("increment_ballot_tx"))
.await?;

// Get quorum members
Expand Down Expand Up @@ -248,6 +250,7 @@ fn compare_ballots(a: &protocol::Ballot, b: &protocol::Ballot) -> std::cmp::Orde
}
}

#[tracing::instrument(skip_all)]
async fn send_prepares(
ctx: &OperationCtx,
config: &protocol::ClusterConfig,
Expand Down Expand Up @@ -300,6 +303,7 @@ async fn send_prepares(
Ok(responses)
}

#[tracing::instrument(skip_all)]
async fn restart_phase1(
ctx: &OperationCtx,
_config: &protocol::ClusterConfig,
Expand Down
1 change: 1 addition & 0 deletions packages/services/epoxy/src/ops/kv/get_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result<Out
.await
}
})
.custom_instrument(tracing::info_span!("get_local_tx"))
.await?;

Ok(Output { value })
Expand Down
2 changes: 2 additions & 0 deletions packages/services/epoxy/src/ops/kv/get_optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
.await
}
})
.custom_instrument(tracing::info_span!("get_optimistic_tx"))
.await?;

if value.is_some() {
Expand Down Expand Up @@ -144,6 +145,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
.await
}
})
.custom_instrument(tracing::info_span!("cache_value_tx"))
.await?;

return Ok(Output { value: Some(value) });
Expand Down
20 changes: 15 additions & 5 deletions packages/services/epoxy/src/ops/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
// Read config
let config = ctx
.udb()?
.run(move |tx| async move { utils::read_config(&tx, replica_id).await })
.run(|tx| async move { utils::read_config(&tx, replica_id).await })
.custom_instrument(tracing::info_span!("read_config_tx"))
.await?;

// Lead consensus
let payload = ctx
.udb()?
.run(move |tx| {
.run(|tx| {
let proposal = input.proposal.clone();
async move { replica::lead_consensus::lead_consensus(&*tx, replica_id, proposal).await }
})
.custom_instrument(tracing::info_span!("lead_consensus_tx"))
.await?;

// Get quorum members (only active replicas for voting)
Expand All @@ -55,11 +57,12 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
// Decide path
let path = ctx
.udb()?
.run(move |tx| {
.run(|tx| {
let pre_accept_oks = pre_accept_oks.clone();
let payload = payload.clone();
async move { replica::decide_path::decide_path(&*tx, pre_accept_oks, &payload) }
})
.custom_instrument(tracing::info_span!("decide_path_tx"))
.await?;

match path {
Expand All @@ -72,6 +75,7 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
}
}

#[tracing::instrument(skip_all)]
pub async fn run_paxos_accept(
ctx: &OperationCtx,
config: &protocol::ClusterConfig,
Expand All @@ -84,10 +88,11 @@ pub async fn run_paxos_accept(

// Mark as accepted
ctx.udb()?
.run(move |tx| {
.run(|tx| {
let payload = payload.clone();
async move { replica::messages::accepted(&*tx, replica_id, payload).await }
})
.custom_instrument(tracing::info_span!("accept_tx"))
.await?;

// EPaxos Step 17
Expand Down Expand Up @@ -115,6 +120,7 @@ pub async fn run_paxos_accept(
}
}

#[tracing::instrument(skip_all)]
pub async fn commit(
ctx: &OperationCtx,
config: &protocol::ClusterConfig,
Expand All @@ -130,14 +136,15 @@ pub async fn commit(
let cmd_err = {
let payload = payload.clone();
ctx.udb()?
.run(move |tx| {
.run(|tx| {
let payload = payload.clone();
async move {
let cmd_err = replica::messages::committed(&*tx, replica_id, &payload).await?;

Result::Ok(cmd_err)
}
})
.custom_instrument(tracing::info_span!("committed_tx"))
.await?
};

Expand All @@ -163,6 +170,7 @@ pub async fn commit(
}
}

#[tracing::instrument(skip_all)]
async fn send_pre_accepts(
ctx: &OperationCtx,
config: &protocol::ClusterConfig,
Expand Down Expand Up @@ -207,6 +215,7 @@ async fn send_pre_accepts(
Ok(responses)
}

#[tracing::instrument(skip_all)]
async fn send_accepts(
ctx: &OperationCtx,
config: &protocol::ClusterConfig,
Expand Down Expand Up @@ -252,6 +261,7 @@ async fn send_accepts(
Ok(responses.len() + 1)
}

#[tracing::instrument(skip_all)]
async fn send_commits(
ctx: &OperationCtx,
config: &protocol::ClusterConfig,
Expand Down
1 change: 1 addition & 0 deletions packages/services/epoxy/src/ops/read_cluster_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub async fn epoxy_read_cluster_config(ctx: &OperationCtx, input: &Input) -> Res
let replica_id = input.replica_id;
async move { utils::read_config(&tx, replica_id).await }
})
.custom_instrument(tracing::info_span!("read_cluster_config_tx"))
.await?;

Ok(Output { config })
Expand Down
3 changes: 3 additions & 0 deletions packages/services/epoxy/src/replica/ballot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use universaldb::utils::{FormalKey, IsolationLevel::*};
use crate::keys;

/// Get the current ballot for this replica
#[tracing::instrument(skip_all)]
pub async fn get_ballot(
tx: &Transaction,
replica_id: protocol::ReplicaId,
Expand All @@ -31,6 +32,7 @@ pub async fn get_ballot(
}

/// Increment the ballot number and return the new ballot
#[tracing::instrument(skip_all)]
pub async fn increment_ballot(
tx: &Transaction,
replica_id: protocol::ReplicaId,
Expand Down Expand Up @@ -67,6 +69,7 @@ pub fn compare_ballots(
/// ballot if needed.
///
/// Returns true if the ballot is valid (higher than previously seen).
#[tracing::instrument(skip_all)]
pub async fn validate_and_update_ballot_for_instance(
tx: &Transaction,
replica_id: protocol::ReplicaId,
Expand Down
1 change: 1 addition & 0 deletions packages/services/epoxy/src/replica/commit_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use universaldb::prelude::*;
use crate::{keys, ops::propose::CommandError, replica::utils};

/// Commits a proposal to KV store.
#[tracing::instrument(skip_all)]
pub async fn commit_kv(
tx: &Transaction,
replica_id: ReplicaId,
Expand Down
1 change: 1 addition & 0 deletions packages/services/epoxy/src/replica/lead_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use universaldb::utils::{FormalKey, IsolationLevel::*};
use crate::keys;
use crate::replica::{ballot, messages, utils};

#[tracing::instrument(skip_all)]
pub async fn lead_consensus(
tx: &Transaction,
replica_id: protocol::ReplicaId,
Expand Down
1 change: 1 addition & 0 deletions packages/services/epoxy/src/replica/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub fn state_order(state: &protocol::State) -> u8 {
}
}

#[tracing::instrument(skip_all)]
pub async fn update_log(
tx: &Transaction,
replica_id: ReplicaId,
Expand Down
Loading
Loading