33//! The module offers the ability to create, remove, start, stop, enable and
44//! disable systemd units.
55//!
6- use super :: systemd1_api:: { ActiveState , AsyncManagerProxy , StartMode , StopMode } ;
6+ use super :: systemd1_api:: {
7+ ActiveState , AsyncJobProxy , AsyncManagerProxy , JobRemovedResult , JobRemovedSignal ,
8+ ManagerSignals , StartMode , StopMode ,
9+ } ;
710use crate :: provider:: systemdmanager:: systemdunit:: SystemDUnit ;
811use crate :: provider:: StackableError ;
912use crate :: provider:: StackableError :: RuntimeError ;
1013use anyhow:: anyhow;
14+ use futures_util:: { future, stream:: StreamExt } ;
1115use log:: debug;
1216use std:: fs;
1317use std:: fs:: File ;
18+ use std:: future:: Future ;
1419use std:: io:: Write ;
1520use std:: path:: PathBuf ;
1621use zbus:: azync:: Connection ;
@@ -273,15 +278,17 @@ impl SystemdManager {
273278 /// systemd at the time this is called.
274279 /// To make a service known please take a look at the [`SystemdManager::enable`] function.
275280 pub async fn start ( & self , unit : & str ) -> anyhow:: Result < ( ) > {
276- debug ! ( "Attempting to start unit {} " , unit) ;
281+ debug ! ( "Trying to start unit [{}] " , unit) ;
277282
278- match self . proxy . start_unit ( unit , StartMode :: Fail ) . await {
279- Ok ( result ) => {
280- debug ! ( "Successfully started service [{}]: [{:?}]" , unit , result ) ;
281- Ok ( ( ) )
282- }
283- Err ( e ) => Err ( anyhow ! ( "Error starting service [{}]: {} " , unit, e ) ) ,
283+ let result = self
284+ . call_method ( |proxy| proxy . start_unit ( unit , StartMode :: Fail ) )
285+ . await ;
286+
287+ if result . is_ok ( ) {
288+ debug ! ( "Successfully started service [{}]" , unit) ;
284289 }
290+
291+ result. map_err ( |e| anyhow ! ( "Error starting service [{}]: {}" , unit, e) )
285292 }
286293
287294 /// Attempts to stop a systemd unit
@@ -291,12 +298,62 @@ impl SystemdManager {
291298 pub async fn stop ( & self , unit : & str ) -> anyhow:: Result < ( ) > {
292299 debug ! ( "Trying to stop systemd unit [{}]" , unit) ;
293300
294- match self . proxy . stop_unit ( unit, StopMode :: Fail ) . await {
295- Ok ( result) => {
296- debug ! ( "Successfully stopped service [{}]: [{:?}]" , unit, result) ;
297- Ok ( ( ) )
298- }
299- Err ( e) => Err ( anyhow ! ( "Error stopping service [{}]: {}" , unit, e) ) ,
301+ let result = self
302+ . call_method ( |proxy| proxy. stop_unit ( unit, StopMode :: Fail ) )
303+ . await ;
304+
305+ if result. is_ok ( ) {
306+ debug ! ( "Successfully stopped service [{}]" , unit) ;
307+ }
308+
309+ result. map_err ( |e| anyhow ! ( "Error stopping service [{}]: {}" , unit, e) )
310+ }
311+
312+ /// Calls a systemd method and waits until the dependent job is
313+ /// finished.
314+ ///
315+ /// The given method enqueues a job in systemd and returns the job
316+ /// object. Systemd sends out a `JobRemoved` signal when the job is
317+ /// dequeued. The signal contains the reason for the dequeuing like
318+ /// `"done"`, `"failed"`, or `"canceled"`.
319+ ///
320+ /// This function subscribes to `JobRemoved` signals, calls the
321+ /// given method, awaits the signal for the corresponding job, and
322+ /// returns `Ok(())` if the result is [`JobRemovedResult::Done`].
323+ /// If the signal contains another result or no signal is returned
324+ /// (which should never happen) then an error with a corresponding
325+ /// message is returned.
326+ async fn call_method < ' a , F , Fut > ( & ' a self , method : F ) -> anyhow:: Result < ( ) >
327+ where
328+ F : Fn ( & ' a AsyncManagerProxy ) -> Fut ,
329+ Fut : Future < Output = zbus:: Result < AsyncJobProxy < ' a > > > ,
330+ {
331+ let signals = self
332+ . proxy
333+ . receive_signal ( ManagerSignals :: JobRemoved . into ( ) )
334+ . await ?
335+ . map ( |message| message. body :: < JobRemovedSignal > ( ) . unwrap ( ) ) ;
336+
337+ let job = method ( & self . proxy ) . await ?;
338+
339+ let mut signals = signals
340+ . filter ( |signal| future:: ready ( & signal. job . to_owned ( ) . into_inner ( ) == job. path ( ) ) ) ;
341+
342+ let signal = signals. next ( ) . await ;
343+
344+ // Unsubscribe from receiving signals.
345+ // If `signals` goes out of scope prematurely due to an error
346+ // then the subscription is cancelled synchronously in the
347+ // destructor of `SignalStream`.
348+ let _ = signals. into_inner ( ) . into_inner ( ) . close ( ) . await ;
349+
350+ match signal {
351+ Some ( message) if message. result == JobRemovedResult :: Done => Ok ( ( ) ) ,
352+ Some ( message) => Err ( anyhow ! ( "The systemd job failed: {:?}" , message) ) ,
353+ None => Err ( anyhow ! (
354+ "No signal was returned for the systemd job: {:?}" ,
355+ job
356+ ) ) ,
300357 }
301358 }
302359
0 commit comments