Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ publish = false

[workspace.dependencies]
anyhow = "1"
bytes = "1"
clap = { version = "4", features = ["derive"] }
env_logger = "0"
itertools = "0"
Expand Down
97 changes: 97 additions & 0 deletions mgmtd/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//! Interfaces and implementations for in-app interaction between tasks or threads.

mod runtime;
#[cfg(test)]
pub(crate) mod test;

use crate::StaticInfo;
use crate::license::LicensedFeature;
use anyhow::Result;
use protobuf::license::GetCertDataResult;
pub(crate) use runtime::RuntimeApp;
use rusqlite::{Connection, Transaction};
use shared::bee_msg::Msg;
use shared::bee_serde::{Deserializable, Serializable};
use shared::types::{NodeId, NodeType, Uid};
use std::fmt::Debug;
use std::future::Future;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;

pub(crate) trait App: Debug + Clone + Send + 'static {
/// Return a borrow to the applications static, immutable config and derived info
fn static_info(&self) -> &StaticInfo;

// Database access

/// DB Read transaction
fn db_read_tx<T: Send + 'static + FnOnce(&Transaction) -> Result<R>, R: Send + 'static>(
&self,
op: T,
) -> impl Future<Output = Result<R>> + Send;

/// DB write transaction
fn db_write_tx<T: Send + 'static + FnOnce(&Transaction) -> Result<R>, R: Send + 'static>(
&self,
op: T,
) -> impl Future<Output = Result<R>> + Send;

/// DB write transaction without fsync
fn db_write_tx_no_sync<
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
R: Send + 'static,
>(
&self,
op: T,
) -> impl Future<Output = Result<R>> + Send;

/// Provides access to a DB connection handle, no transaction
fn db_conn<T: Send + 'static + FnOnce(&mut Connection) -> Result<R>, R: Send + 'static>(
&self,
op: T,
) -> impl Future<Output = Result<R>> + Send;

// BeeMsg communication

/// Send a [Msg] to a node via TCP and receive the response
fn beemsg_request<M: Msg + Serializable, R: Msg + Deserializable>(
&self,
node_uid: Uid,
msg: &M,
) -> impl Future<Output = Result<R>> + Send;

/// Send a [Msg] to all nodes of a type via UDP
fn beemsg_send_notifications<M: Msg + Serializable>(
&self,
node_types: &'static [NodeType],
msg: &M,
) -> impl Future<Output = ()> + Send;

/// Replace all stored BeeMsg network addresses of a node in the store
fn beemsg_replace_node_addrs(&self, node_uid: Uid, new_addrs: impl Into<Arc<[SocketAddr]>>);

// Run state

/// Check if management is in pre shutdown state
fn rs_pre_shutdown(&self) -> bool;
/// Notify the runtime control that a particular client pulled states of a particular node type
fn rs_notify_client_pulled_state(&self, node_type: NodeType, node_id: NodeId);

// Licensing control

/// Load and verify a license certificate
fn lic_load_and_verify_cert(
&self,
cert_path: &Path,
) -> impl Future<Output = Result<String>> + Send;

/// Get license certificate data
fn lic_get_cert_data(&self) -> Result<GetCertDataResult>;

/// Get licensed number of machines
fn lic_get_num_machines(&self) -> Result<u32>;

/// Verify a feature is licensed
fn lic_verify_feature(&self, feature: LicensedFeature) -> Result<()>;
}
188 changes: 188 additions & 0 deletions mgmtd/src/app/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use super::*;
use crate::ClientPulledStateNotification;
use crate::bee_msg::dispatch_request;
use crate::license::LicenseVerifier;
use anyhow::Result;
use protobuf::license::GetCertDataResult;
use rusqlite::{Connection, Transaction};
use shared::conn::msg_dispatch::{DispatchRequest, Request};
use shared::conn::outgoing::Pool;
use shared::run_state::WeakRunStateHandle;
use sqlite::Connections;
use std::fmt::Debug;
use std::ops::Deref;
use tokio::sync::mpsc;

/// A collection of Handles used for interacting and accessing the different components of the app.
///
/// This is the actual runtime object that can be shared between tasks. Interfaces should, however,
/// accept any implementation of the AppContext trait instead.
#[derive(Clone, Debug)]
pub(crate) struct RuntimeApp(Arc<InnerAppHandles>);

/// Stores the actual handles.
#[derive(Debug)]
pub(crate) struct InnerAppHandles {
pub conn: Pool,
pub db: Connections,
pub license: LicenseVerifier,
pub info: &'static StaticInfo,
pub run_state: WeakRunStateHandle,
shutdown_client_id: mpsc::Sender<ClientPulledStateNotification>,
}

impl RuntimeApp {
/// Creates a new AppHandles object.
///
/// Takes all the stored handles.
pub(crate) fn new(
conn: Pool,
db: Connections,
license: LicenseVerifier,
info: &'static StaticInfo,
run_state: WeakRunStateHandle,
shutdown_client_id: mpsc::Sender<ClientPulledStateNotification>,
) -> Self {
Self(Arc::new(InnerAppHandles {
conn,
db,
license,
info,
run_state,
shutdown_client_id,
}))
}
}

/// Derefs to InnerAppHandle which stores all the handles.
///
/// Allows transparent access.
impl Deref for RuntimeApp {
type Target = InnerAppHandles;

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// Adds BeeMsg dispatching functionality to AppHandles
impl DispatchRequest for RuntimeApp {
async fn dispatch_request(&self, req: impl Request) -> Result<()> {
dispatch_request(self, req).await
}
}

impl App for RuntimeApp {
fn static_info(&self) -> &StaticInfo {
self.info
}

async fn db_read_tx<
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
R: Send + 'static,
>(
&self,
op: T,
) -> Result<R> {
Connections::read_tx(&self.db, op).await
}

async fn db_write_tx<
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
R: Send + 'static,
>(
&self,
op: T,
) -> Result<R> {
Connections::write_tx(&self.db, op).await
}

async fn db_write_tx_no_sync<
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
R: Send + 'static,
>(
&self,
op: T,
) -> Result<R> {
Connections::write_tx_no_sync(&self.db, op).await
}

async fn db_conn<
T: Send + 'static + FnOnce(&mut Connection) -> Result<R>,
R: Send + 'static,
>(
&self,
op: T,
) -> Result<R> {
Connections::conn(&self.db, op).await
}

async fn beemsg_request<M: Msg + Serializable, R: Msg + Deserializable>(
&self,
node_uid: Uid,
msg: &M,
) -> Result<R> {
Pool::request(&self.conn, node_uid, msg).await
}

async fn beemsg_send_notifications<M: Msg + Serializable>(
&self,
node_types: &'static [NodeType],
msg: &M,
) {
log::trace!("NOTIFICATION to {node_types:?}: {msg:?}");

for t in node_types {
if let Err(err) = async {
let nodes = self
.db_read_tx(move |tx| crate::db::node::get_with_type(tx, *t))
.await?;

self.conn
.broadcast_datagram(nodes.into_iter().map(|e| e.uid), msg)
.await?;

Ok(()) as Result<_>
}
.await
{
log::error!("Notification could not be sent to all {t} nodes: {err:#}");
}
}
}

fn beemsg_replace_node_addrs(&self, node_uid: Uid, new_addrs: impl Into<Arc<[SocketAddr]>>) {
Pool::replace_node_addrs(&self.conn, node_uid, new_addrs)
}

fn rs_pre_shutdown(&self) -> bool {
WeakRunStateHandle::pre_shutdown(&self.run_state)
}

fn rs_notify_client_pulled_state(&self, node_type: NodeType, node_id: NodeId) {
if self.run_state.pre_shutdown() {
let tx = self.shutdown_client_id.clone();

// We don't want to block the task calling this and are not interested by the results
tokio::spawn(async move {
let _ = tx.send((node_type, node_id)).await;
});
}
}

async fn lic_load_and_verify_cert(&self, cert_path: &Path) -> Result<String> {
LicenseVerifier::load_and_verify_cert(&self.license, cert_path).await
}

fn lic_get_cert_data(&self) -> Result<GetCertDataResult> {
LicenseVerifier::get_cert_data(&self.license)
}

fn lic_get_num_machines(&self) -> Result<u32> {
LicenseVerifier::get_num_machines(&self.license)
}

fn lic_verify_feature(&self, feature: LicensedFeature) -> Result<()> {
self.license.verify_feature(feature)
}
}
Loading
Loading