Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit fe3d655

Browse files
authored
Add functionality to stop and restart processes (#25)
Add functionality to stop processes, which in turn enables restarting on recreated pods. Also contains change to add pod-uid to the config directory name.
1 parent 44522b5 commit fe3d655

File tree

6 files changed

+77
-27
lines changed

6 files changed

+77
-27
lines changed

src/provider/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,6 @@ pub enum StackableError {
4343
"The following config maps were specified in a pod but not found: {missing_config_maps:?}"
4444
)]
4545
MissingConfigMapsError { missing_config_maps: Vec<String> },
46+
#[error("Object is missing key: {key}")]
47+
MissingObjectKey { key: &'static str },
4648
}

src/provider/mod.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ use kubelet::provider::Provider;
1313
use log::{debug, error};
1414

1515
use crate::provider::error::StackableError;
16-
use crate::provider::error::StackableError::{CrdMissing, KubeError, PodValidationError};
16+
use crate::provider::error::StackableError::{
17+
CrdMissing, KubeError, MissingObjectKey, PodValidationError,
18+
};
1719
use crate::provider::repository::package::Package;
1820
use crate::provider::states::downloading::Downloading;
1921
use crate::provider::states::terminated::Terminated;
@@ -40,13 +42,15 @@ pub struct PodState {
4042
log_directory: PathBuf,
4143
package_download_backoff_strategy: ExponentialBackoffStrategy,
4244
service_name: String,
45+
service_uid: String,
4346
package: Package,
4447
process_handle: Option<Child>,
4548
}
4649

4750
impl PodState {
4851
pub fn get_service_config_directory(&self) -> PathBuf {
49-
self.config_directory.join(&self.service_name)
52+
self.config_directory
53+
.join(format!("{}-{}", &self.service_name, &self.service_uid))
5054
}
5155

5256
pub fn get_service_package_directory(&self) -> PathBuf {
@@ -153,6 +157,17 @@ impl Provider for StackableProvider {
153157

154158
async fn initialize_pod_state(&self, pod: &Pod) -> anyhow::Result<Self::PodState> {
155159
let service_name = pod.name();
160+
161+
// Extract uid from pod object, if this fails we return an error -
162+
// this should not happen, as all objects we get from Kubernetes should have
163+
// a uid set!
164+
let service_uid = if let Some(uid) = pod.as_kube_pod().metadata.uid.as_ref() {
165+
uid.to_string()
166+
} else {
167+
return Err(anyhow::Error::new(MissingObjectKey {
168+
key: ".metadata.uid",
169+
}));
170+
};
156171
let parcel_directory = self.parcel_directory.clone();
157172
// TODO: make this configurable
158173
let download_directory = parcel_directory.join("_download");
@@ -175,6 +190,7 @@ impl Provider for StackableProvider {
175190
config_directory: self.config_directory.clone(),
176191
package_download_backoff_strategy: ExponentialBackoffStrategy::default(),
177192
service_name: String::from(service_name),
193+
service_uid,
178194
package,
179195
process_handle: None,
180196
})

src/provider/states/running.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use std::process::Child;
2-
1+
use k8s_openapi::api::core::v1::{
2+
ContainerState, ContainerStateRunning, ContainerStatus as KubeContainerStatus,
3+
};
34
use kubelet::pod::Pod;
45
use kubelet::state::prelude::*;
56
use kubelet::state::{State, Transition};
@@ -9,15 +10,10 @@ use crate::provider::states::failed::Failed;
910
use crate::provider::states::installing::Installing;
1011
use crate::provider::states::stopping::Stopping;
1112
use crate::provider::PodState;
12-
use k8s_openapi::api::core::v1::{
13-
ContainerState, ContainerStateRunning, ContainerStatus as KubeContainerStatus,
14-
};
1513

1614
#[derive(Debug, TransitionTo)]
1715
#[transition_to(Stopping, Failed, Running, Installing)]
18-
pub struct Running {
19-
pub process_handle: Option<Child>,
20-
}
16+
pub struct Running {}
2117

2218
#[async_trait::async_trait]
2319
impl State<PodState> for Running {
@@ -26,17 +22,32 @@ impl State<PodState> for Running {
2622
pod_state: &mut PodState,
2723
_pod: &Pod,
2824
) -> Transition<PodState> {
29-
debug!("waiting");
30-
let mut handle = std::mem::replace(&mut self.process_handle, None).unwrap();
31-
3225
loop {
3326
tokio::select! {
3427
_ = tokio::time::delay_for(std::time::Duration::from_secs(1)) => {
3528
trace!("Checking if service {} is still running.", &pod_state.service_name);
3629
}
3730
}
38-
match handle.try_wait() {
39-
Ok(None) => debug!("Service {} is still running", &pod_state.service_name),
31+
32+
// Obtain a mutable reference to the process handle
33+
let child = if let Some(testproc) = pod_state.process_handle.as_mut() {
34+
testproc
35+
} else {
36+
return Transition::next(
37+
self,
38+
Failed {
39+
message: "Unable to obtain process handle from podstate!".to_string(),
40+
},
41+
);
42+
};
43+
44+
// Check if an exit code is available for the process - if yes, it exited
45+
match child.try_wait() {
46+
Ok(None) => debug!(
47+
"Service {} is still running with pid {}",
48+
&pod_state.service_name,
49+
child.id()
50+
),
4051
_ => {
4152
error!(
4253
"Service {} died unexpectedly, moving to failed state",

src/provider/states/starting.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,10 @@ impl State<PodState> for Starting {
115115
);
116116
}
117117
}
118-
//pod_state.process_handle = Some(child);
119-
return Transition::next(
120-
self,
121-
Running {
122-
process_handle: Some(child),
123-
},
124-
);
118+
// Store the child handle in the podstate so that later states
119+
// can use it
120+
pod_state.process_handle = Some(child);
121+
return Transition::next(self, Running {});
125122
}
126123
Err(error) => {
127124
let error_message = format!("Failed to start process with error {}", error);

src/provider/states/stopping.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ pub struct Stopping;
1515
impl State<PodState> for Stopping {
1616
async fn next(self: Box<Self>, pod_state: &mut PodState, _pod: &Pod) -> Transition<PodState> {
1717
if let Some(child) = &pod_state.process_handle {
18-
let pid = child.id();
1918
info!(
2019
"Received stop command for service {}, stopping process with pid {}",
21-
pod_state.service_name, pid
20+
pod_state.service_name,
21+
child.id()
2222
);
2323
}
2424
Transition::next(self, Stopped)

src/provider/states/terminated.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use anyhow::anyhow;
12
use kubelet::state::prelude::*;
2-
use log::info;
3+
use log::{debug, error, info};
34

45
use crate::provider::PodState;
56

@@ -12,8 +13,31 @@ pub struct Terminated {
1213
#[async_trait::async_trait]
1314
impl State<PodState> for Terminated {
1415
async fn next(self: Box<Self>, pod_state: &mut PodState, _pod: &Pod) -> Transition<PodState> {
15-
info!("Service {} was terminated!", &pod_state.service_name);
16-
Transition::Complete(Ok(()))
16+
info!(
17+
"Pod {} was terminated, stopping process!",
18+
&pod_state.service_name
19+
);
20+
// Obtain a mutable reference to the process handle
21+
let child = if let Some(testproc) = pod_state.process_handle.as_mut() {
22+
testproc
23+
} else {
24+
return Transition::Complete(Err(anyhow!("Unable to retrieve process handle")));
25+
};
26+
27+
return match child.kill() {
28+
Ok(()) => {
29+
debug!("Successfully killed process {}", pod_state.service_name);
30+
Transition::Complete(Ok(()))
31+
}
32+
Err(e) => {
33+
error!(
34+
"Failed to stop process with pid {} due to: {:?}",
35+
child.id(),
36+
e
37+
);
38+
Transition::Complete(Err(anyhow::Error::new(e)))
39+
}
40+
};
1741
}
1842

1943
async fn json_status(

0 commit comments

Comments
 (0)