Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion CHANGELOG.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
= Changelog

== 0.3.0 - unreleased
== 0.3.0 - 2021-05-27

:165: https://github.com/stackabletech/agent/pull/165[#165]
:169: https://github.com/stackabletech/agent/pull/169[#169]
:173: https://github.com/stackabletech/agent/pull/176[#173]
:176: https://github.com/stackabletech/agent/pull/176[#176]

=== Added
* Artifacts for merge requests are created ({169}, {173}).

=== Changed
* Structure of the documentation changed so that it can be incorporated
into the overall Stackable documentation ({165}).

=== Fixed
* Deadlock fixed which occurred when multiple pods were started or
stopped simultaneously ({176}).

== 0.2.0 - 2021-05-20

Expand Down
76 changes: 46 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "stackable-agent"
description = "The component of the Stackable Platform that manages installation of services on the workers"
version = "0.3.0-nightly"
version = "0.3.0"
authors = ["Sönke Liebau <[email protected]>"]
edition = "2018"
license = "Apache-2.0"
Expand Down Expand Up @@ -42,8 +42,15 @@ tar = "0.4"
thiserror = "1.0"
tokio = { version = "1.6", features = ["macros", "rt-multi-thread", "time"] }
url = "2.2"
zbus = "2.0.0-beta.3"
zvariant = "2.6"
# The current zbus version 2.0.0-beta.3 causes deadlocks (see
# https://gitlab.freedesktop.org/dbus/zbus/-/issues/150). So the
# dependency is pinned to the current HEAD of the main branch until the
# next version is released.
zbus = { git = "https://gitlab.freedesktop.org/dbus/zbus.git", rev = "9c551554e665532abc76469cdc73c1943bfb6285" }
# The current zvariant version 2.6.0 is not compatible with the pinned
# zbus version. So the dependency is pinned to the current HEAD of the
# main branch until the next version is released.
zvariant = { git = "https://gitlab.freedesktop.org/dbus/zbus.git", rev = "9c551554e665532abc76469cdc73c1943bfb6285" }

[dev-dependencies]
indoc = "1.0"
Expand Down
1 change: 1 addition & 0 deletions src/bin/stackable-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ async fn main() -> anyhow::Result<()> {
agent_config.log_directory.clone(),
agent_config.session,
agent_config.pod_cidr,
krustlet_config.max_pods,
)
.await
.expect("Error initializing provider.");
Expand Down
3 changes: 2 additions & 1 deletion src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ impl StackableProvider {
log_directory: PathBuf,
session: bool,
pod_cidr: String,
max_pods: u16,
) -> Result<Self, StackableError> {
let systemd_manager = Arc::new(SystemdManager::new(session).await?);
let systemd_manager = Arc::new(SystemdManager::new(session, max_pods).await?);

let provider_state = ProviderState {
handles: Default::default(),
Expand Down
10 changes: 7 additions & 3 deletions src/provider/systemdmanager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct SystemdManager {
impl SystemdManager {
/// Creates a new instance, takes a flag whether to run within the
/// user session or manage services system-wide.
pub async fn new(user_mode: bool) -> Result<Self, StackableError> {
pub async fn new(user_mode: bool, max_pods: u16) -> Result<Self, StackableError> {
// Connect to session or system bus depending on the value of [user_mode]
let connection = if user_mode {
Connection::new_session().await.map_err(|e| RuntimeError {
Expand All @@ -57,8 +57,12 @@ impl SystemdManager {
})?
};

let proxy =
AsyncManagerProxy::new(&connection).map_err(|e| RuntimeError { msg: e.to_string() })?;
// The maximum number of queued DBus messages must be higher
// than the number of containers which can be started and
// stopped simultaneously.
let connection = connection.set_max_queued(max_pods as usize * 2);

let proxy = AsyncManagerProxy::new(&connection);

// Depending on whether we are supposed to run in user space or system-wide
// we'll pick the default directory to initialize the systemd manager with
Expand Down
24 changes: 12 additions & 12 deletions src/provider/systemdmanager/systemd1_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ macro_rules! impl_type_for_enum {
}

/// Type of an entry in a changes list
#[derive(Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)]
#[derive(Clone, Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)]
#[strum(serialize_all = "kebab-case")]
pub enum ChangeType {
Symlink,
Expand All @@ -91,7 +91,7 @@ impl_deserialize_for_enum!(ChangeType);
impl_type_for_enum!(ChangeType);

/// Entry of a changes list
#[derive(Debug, Type, Deserialize)]
#[derive(Clone, Debug, Type, Deserialize)]
pub struct Change {
pub change_type: ChangeType,
pub filename: String,
Expand All @@ -102,7 +102,7 @@ pub struct Change {
type Changes = Vec<Change>;

/// Mode in which a unit will be started
#[derive(Debug, Display, AsRefStr)]
#[derive(Clone, Debug, Display, AsRefStr)]
#[strum(serialize_all = "kebab-case")]
pub enum StartMode {
/// The unit and its dependencies will be started, possibly
Expand Down Expand Up @@ -134,7 +134,7 @@ impl_serialize_for_enum!(StartMode);
impl_type_for_enum!(StartMode);

/// Mode in which a unit will be stopped
#[derive(Debug, Display, AsRefStr)]
#[derive(Clone, Debug, Display, AsRefStr)]
#[strum(serialize_all = "kebab-case")]
pub enum StopMode {
/// The unit and its dependencies will be stopped, possibly
Expand Down Expand Up @@ -172,7 +172,7 @@ impl_type_for_enum!(StopMode);
/// ```
/// # use stackable_agent::provider::systemdmanager::systemd1_api::*;
/// let connection = zbus::Connection::new_system().unwrap();
/// let manager = ManagerProxy::new(&connection).unwrap();
/// let manager = ManagerProxy::new(&connection);
/// let unit = manager.load_unit("dbus.service").unwrap();
/// ```
///
Expand All @@ -182,7 +182,7 @@ impl_type_for_enum!(StopMode);
/// # use stackable_agent::provider::systemdmanager::systemd1_api::*;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let connection = zbus::azync::Connection::new_system().await.unwrap();
/// let manager = AsyncManagerProxy::new(&connection).unwrap();
/// let manager = AsyncManagerProxy::new(&connection);
/// let unit = manager.load_unit("dbus.service").await.unwrap();
/// # });
/// ```
Expand Down Expand Up @@ -262,20 +262,20 @@ trait Manager {
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let connection = zbus::azync::Connection::new_system().await.unwrap();
/// let manager = AsyncManagerProxy::new(&connection).unwrap();
/// let manager = AsyncManagerProxy::new(&connection);
/// let signals = manager
/// .receive_signal(ManagerSignals::JobRemoved.into()).await.unwrap()
/// .map(|message| message.body::<JobRemovedSignal>().unwrap());
/// # });
/// ```
#[derive(Debug, Display, Eq, PartialEq, IntoStaticStr)]
#[derive(Clone, Debug, Display, Eq, PartialEq, IntoStaticStr)]
pub enum ManagerSignals {
/// Sent out each time a job is dequeued
JobRemoved,
}

/// Result in the `JobRemoved` signal.
#[derive(Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)]
#[derive(Clone, Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)]
#[strum(serialize_all = "kebab-case")]
pub enum JobRemovedResult {
/// Indicates successful execution of a job
Expand Down Expand Up @@ -305,7 +305,7 @@ impl_deserialize_for_enum!(JobRemovedResult);
impl_type_for_enum!(JobRemovedResult);

/// Message body of [`ManagerSignals::JobRemoved`]
#[derive(Debug, Deserialize, Type)]
#[derive(Clone, Debug, Deserialize, Type)]
pub struct JobRemovedSignal {
/// Numeric job ID
pub id: u32,
Expand All @@ -322,7 +322,7 @@ pub struct JobRemovedSignal {

/// ActiveState contains a state value that reflects whether the unit is
/// currently active or not.
#[derive(Debug, Display, EnumString, Eq, PartialEq)]
#[derive(Clone, Debug, Display, EnumString, Eq, PartialEq)]
#[strum(serialize_all = "kebab-case")]
pub enum ActiveState {
/// The unit is active.
Expand Down Expand Up @@ -358,7 +358,7 @@ impl TryFrom<OwnedValue> for ActiveState {
}

/// Unique ID for a runtime cycle of a unit
#[derive(Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct InvocationId(Vec<u8>);

impl TryFrom<OwnedValue> for InvocationId {
Expand Down