sui_futures/
service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::fmt;
5use std::panic;
6use std::time::Duration;
7
8use futures::future;
9use futures::future::BoxFuture;
10use futures::future::FutureExt;
11use tap::TapFallible;
12use tokio::signal;
13use tokio::task::JoinSet;
14use tokio::time::timeout;
15use tracing::error;
16use tracing::info;
17
18/// Default grace period for shutdown.
19///
20/// After shutdown signals are sent, tasks have this duration to complete gracefully before being
21/// forcefully aborted.
22pub const GRACE: Duration = Duration::from_secs(30);
23
24/// A collection of related tasks that succeed or fail together, consisting of:
25///
26/// - A set of primary tasks, which control the lifetime of the service in the happy path. When all
27///   primary tasks complete successfully or have been cancelled, the service will start a graceful
28///   shutdown.
29///
30/// - A set of secondary tasks, which run alongside the primary tasks, but do not extend the
31///   service's lifetime (The service will not wait for all secondary tasks to complete or be
32///   cancelled before triggering a shutdown).
33///
34/// - A set of exit signals, which are executed when the service wants to trigger graceful
35///   shutdown.
36///
37/// Any task (primary or secondary) failing by returning an error, or panicking, will also trigger
38/// a graceful shutdown.
39///
40/// If graceful shutdown takes longer than the grace period, or another task fails during shutdown,
41/// all remaining tasks are aborted and dropped immediately. Tasks are expected to clean-up after
42/// themselves when dropped (e.g. if a task has spawned its own sub-tasks, these should also be
43/// aborted when the parent task is dropped).
44#[must_use = "Dropping the service aborts all its tasks immediately"]
45#[derive(Default)]
46pub struct Service {
47    /// Futures that are run when the service is instructed to shutdown gracefully.
48    exits: Vec<BoxFuture<'static, ()>>,
49
50    /// Tasks that control the lifetime of the service in the happy path.
51    fsts: JoinSet<anyhow::Result<()>>,
52
53    /// Tasks that run alongside the primary tasks, but do not extend the service's lifetime.
54    snds: JoinSet<anyhow::Result<()>>,
55}
56
57#[derive(thiserror::Error, Debug)]
58pub enum Error {
59    #[error("Service has been terminated gracefully")]
60    Terminated,
61
62    #[error("Service has been aborted due to ungraceful shutdown")]
63    Aborted,
64
65    #[error(transparent)]
66    Task(anyhow::Error),
67}
68
69impl Service {
70    /// Create a new, empty service.
71    pub fn new() -> Self {
72        Self::default()
73    }
74
75    /// Add a primary task.
76    ///
77    /// The task will start running in the background immediately, once added. It is expected to
78    /// clean up after itself when it is dropped, which will happen when it is aborted
79    /// (non-graceful shutdown).
80    pub fn spawn(
81        mut self,
82        task: impl Future<Output = anyhow::Result<()>> + Send + 'static,
83    ) -> Self {
84        self.fsts.spawn(task);
85        self
86    }
87
88    /// Add a primary task that aborts immediately on graceful shutdown.
89    ///
90    /// This is useful for tasks that don't need a graceful shutdown.
91    pub fn spawn_aborting(
92        mut self,
93        task: impl Future<Output = anyhow::Result<()>> + Send + 'static,
94    ) -> Self {
95        let h = self.fsts.spawn(task);
96        self.with_shutdown_signal(async move { h.abort() })
97    }
98
99    /// Add a shutdown signal.
100    ///
101    /// This future will be executed when the service is instructed to shutdown gracefully, before
102    /// a grace period timer starts (in which the task receiving the shutdown signal is expected to
103    /// wind down and exit cleanly).
104    ///
105    /// Evaluation order of shutdown signals is non-determinate.
106    pub fn with_shutdown_signal(mut self, exit: impl Future<Output = ()> + Send + 'static) -> Self {
107        self.exits.push(exit.boxed());
108        self
109    }
110
111    /// Add the tasks and signals from `other` into `self`.
112    pub fn merge(mut self, mut other: Service) -> Self {
113        self.exits.extend(other.exits);
114
115        if !other.fsts.is_empty() {
116            self.fsts.spawn(async move { run(&mut other.fsts).await });
117        }
118
119        if !other.snds.is_empty() {
120            self.snds.spawn(async move { run(&mut other.snds).await });
121        }
122
123        self
124    }
125
126    /// Attach `other` to `self` as a secondary service.
127    ///
128    /// All its tasks (primary and secondary) will be treated as secondary tasks of `self`.
129    pub fn attach(mut self, mut other: Service) -> Self {
130        self.exits.extend(other.exits);
131
132        if !other.fsts.is_empty() {
133            self.snds.spawn(async move { run(&mut other.fsts).await });
134        }
135
136        if !other.snds.is_empty() {
137            self.snds.spawn(async move { run(&mut other.snds).await });
138        }
139
140        self
141    }
142
143    /// Runs the service, waiting for interrupt signals from the operating system to trigger
144    /// graceful shutdown, within the default grace period.
145    pub async fn main(self) -> Result<(), Error> {
146        self.wait_for_shutdown(GRACE, terminate).await
147    }
148
149    /// Waits for an exit condition to trigger shutdown, within `grace` period. Detects the
150    /// following exit conditions:
151    ///
152    /// - All primary tasks complete successfully or are cancelled (returns `Ok(())`).
153    /// - Any task (primary or secondary) fails or panics (returns `Err(Error::Task(_))`).
154    /// - The `terminate` future completes (returns `Err(Error::Terminated)`).
155    ///
156    /// Any tasks that do not complete within the grace period are aborted. Aborted tasks are not
157    /// joined, they are simply dropped (returns `Err(Error::Aborted)` regardless of the primary
158    /// reason for shutting down).
159    async fn wait_for_shutdown<T: Future<Output = ()>>(
160        mut self,
161        grace: Duration,
162        mut terminate: impl FnMut() -> T,
163    ) -> Result<(), Error> {
164        let exec = tokio::select! {
165            res = self.join() => {
166                res.map_err(Error::Task)
167            }
168
169            _ = terminate() => {
170                info!("Termination received");
171                Err(Error::Terminated)
172            }
173        };
174
175        info!("Shutting down gracefully...");
176        tokio::select! {
177            res = timeout(grace, self.shutdown()) => {
178                match res {
179                    Ok(Ok(())) => {},
180                    Ok(Err(_)) => return Err(Error::Aborted),
181                    Err(_) => {
182                        error!("Grace period elapsed, aborting...");
183                        return Err(Error::Aborted);
184                    }
185                }
186            }
187
188            _ = terminate() => {
189                error!("Termination received during shutdown, aborting...");
190                return Err(Error::Aborted);
191            },
192        }
193
194        exec
195    }
196
197    /// Wait until all primary tasks in the service either complete successfully or are cancelled,
198    /// or one task fails.
199    ///
200    /// This operation does not consume the service, so that it can be interacted with further in
201    /// case of an error.
202    pub async fn join(&mut self) -> anyhow::Result<()> {
203        tokio::select! {
204            res = run(&mut self.fsts) => {
205                res.tap_err(|e| error!("Primary task failure: {e:#}"))
206            },
207
208            res = run_secondary(&mut self.snds) => {
209                res.tap_err(|e| error!("Secondary task failure: {e:#}"))
210            }
211        }
212    }
213
214    /// Trigger a graceful shutdown of the service.
215    ///
216    /// Returns with an error if any of the constituent tasks produced an error during shutdown,
217    /// otherwise waits for all tasks (primary and secondy) to complete successfully.
218    pub async fn shutdown(mut self) -> Result<(), Error> {
219        for exit in self.exits {
220            exit.await;
221        }
222        if let Err(e) = future::try_join(run(&mut self.fsts), run(&mut self.snds)).await {
223            error!("Task failure during shutdown: {e:#}");
224            return Err(Error::Task(e));
225        }
226
227        Ok(())
228    }
229}
230
231// SAFETY: `Service` is not `Send` by default because `self.exits` is not `Sync`, but it is only
232// ever accessed through exclusive references (`&mut self` or `self`), so it cannot be accessed
233// through multiple threads simultaneously.
234unsafe impl Sync for Service {}
235
236impl fmt::Debug for Service {
237    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
238        f.debug_struct("Service")
239            .field("exits", &self.exits.len())
240            .field("fsts", &self.fsts)
241            .field("snds", &self.snds)
242            .finish()
243    }
244}
245
246/// Wait until all tasks in `tasks` complete successfully or is cancelled, or any individual task
247/// fails or panics.
248async fn run(tasks: &mut JoinSet<anyhow::Result<()>>) -> anyhow::Result<()> {
249    while let Some(res) = tasks.join_next().await {
250        match res {
251            Ok(Ok(())) => continue,
252            Ok(Err(e)) => return Err(e),
253
254            Err(e) => {
255                if e.is_panic() {
256                    panic::resume_unwind(e.into_panic());
257                }
258            }
259        }
260    }
261
262    Ok(())
263}
264
265/// Like `run` but never completes successfully (only propagates errors).
266///
267/// If the secondary tasks do all complete successfully, this future holds off indefinitely, to
268/// give the primary tasks a chance to complete.
269async fn run_secondary(tasks: &mut JoinSet<anyhow::Result<()>>) -> anyhow::Result<()> {
270    run(tasks).await?;
271    std::future::pending().await
272}
273
274/// Waits for various termination signals from the operating system.
275///
276/// On unix systems, this waits for either `SIGINT` or `SIGTERM`, while on other systems it will
277/// only wait for `SIGINT`.
278pub async fn terminate() {
279    tokio::select! {
280        _ = signal::ctrl_c() => {},
281
282        _ = async {
283            #[cfg(unix)]
284            let _ = signal::unix::signal(signal::unix::SignalKind::terminate()).unwrap().recv().await;
285
286            #[cfg(not(unix))]
287            future::pending::<()>().await;
288        } => {}
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use std::sync::Arc;
295
296    use anyhow::bail;
297    use tokio::sync::Notify;
298    use tokio::sync::oneshot;
299
300    use super::*;
301
302    #[tokio::test]
303    async fn test_empty() {
304        // The empty service should exit immediately.
305        Service::new()
306            .wait_for_shutdown(GRACE, std::future::pending)
307            .await
308            .unwrap();
309    }
310
311    #[tokio::test]
312    async fn test_empty_attach_merge() {
313        // Attaching and merging empty services should work without issue.
314        Service::new()
315            .attach(Service::new())
316            .merge(Service::new())
317            .wait_for_shutdown(GRACE, std::future::pending)
318            .await
319            .unwrap();
320    }
321
322    #[tokio::test]
323    async fn test_completion() {
324        let (atx, arx) = oneshot::channel::<()>();
325        let (btx, brx) = oneshot::channel::<()>();
326
327        let svc = Service::new().spawn(async move {
328            let _brx = brx;
329            Ok(arx.await?)
330        });
331
332        // The task has not finished yet (dropping the receiver)
333        assert!(!btx.is_closed());
334
335        // Sending the signal allows the task to complete successfully, which allows the service to
336        // exit, and at that point, the second channel should be closed too.
337        atx.send(()).unwrap();
338        svc.wait_for_shutdown(GRACE, std::future::pending)
339            .await
340            .unwrap();
341        assert!(btx.is_closed());
342    }
343
344    #[tokio::test]
345    async fn test_failure() {
346        let svc = Service::new().spawn(async move { bail!("boom") });
347        let res = svc.wait_for_shutdown(GRACE, std::future::pending).await;
348        assert!(matches!(res, Err(Error::Task(_))));
349    }
350
351    #[tokio::test]
352    #[should_panic]
353    async fn test_panic() {
354        let svc = Service::new().spawn(async move { panic!("boom") });
355        let _ = svc.wait_for_shutdown(GRACE, std::future::pending).await;
356    }
357
358    #[tokio::test]
359    async fn test_graceful_shutdown() {
360        let (atx, arx) = oneshot::channel::<()>();
361        let (btx, brx) = oneshot::channel::<()>();
362
363        let srx = Arc::new(Notify::new());
364        let stx = srx.clone();
365
366        let svc = Service::new()
367            .with_shutdown_signal(async move { atx.send(()).unwrap() })
368            .spawn(async move {
369                arx.await?;
370                btx.send(()).unwrap();
371                Ok(())
372            });
373
374        // The service is now running in the background.
375        let handle =
376            tokio::spawn(svc.wait_for_shutdown(GRACE, move || srx.clone().notified_owned()));
377
378        // Send the shutdown signal, and confirm the task went through its graceful shutdwon
379        // process.
380        stx.notify_one();
381        brx.await.unwrap();
382
383        // The service should exit gracefully now, dropping the receiver it was holding.
384        let res = handle.await.unwrap();
385        assert!(matches!(res, Err(Error::Terminated)));
386    }
387
388    #[tokio::test]
389    async fn test_multiple_tasks() {
390        let (atx, arx) = oneshot::channel::<()>();
391        let (btx, brx) = oneshot::channel::<()>();
392        let (ctx, crx) = oneshot::channel::<()>();
393
394        // Three different tasks each waiting on a oneshot channel. We should be able to unblock
395        // each of them before the service exits.
396        let svc = Service::new()
397            .spawn(async move { Ok(arx.await?) })
398            .spawn(async move { Ok(brx.await?) })
399            .spawn(async move { Ok(crx.await?) });
400
401        let handle = tokio::spawn(svc.wait_for_shutdown(GRACE, std::future::pending));
402
403        atx.send(()).unwrap();
404        tokio::task::yield_now().await;
405
406        btx.send(()).unwrap();
407        tokio::task::yield_now().await;
408
409        ctx.send(()).unwrap();
410        handle.await.unwrap().unwrap();
411    }
412
413    #[tokio::test]
414    async fn test_multiple_task_failure() {
415        let (atx, arx) = oneshot::channel::<()>();
416
417        // The task waiting on the channel (that aborts on shutdown) will never get to finish because
418        // the other task errors out.
419        let svc = Service::new()
420            .spawn_aborting(async move { Ok(arx.await?) })
421            .spawn(async move { bail!("boom") });
422
423        let handle = tokio::spawn(svc.wait_for_shutdown(GRACE, std::future::pending));
424        let res = handle.await.unwrap();
425
426        // The service exits with a task error because one of the tasks failed, and this also
427        // means the other task is aborted before it can complete successfully.
428        assert!(matches!(res, Err(Error::Task(_))));
429        assert!(atx.is_closed());
430    }
431
432    #[tokio::test]
433    async fn test_secondary_stuck() {
434        let (atx, arx) = oneshot::channel::<()>();
435        let (btx, brx) = oneshot::channel::<()>();
436
437        // A secondary task and a primary task.
438        let snd = Service::new().spawn_aborting(async move { Ok(brx.await?) });
439        let svc = Service::new()
440            .spawn(async move { Ok(arx.await?) })
441            .attach(snd);
442
443        let handle = tokio::spawn(svc.wait_for_shutdown(GRACE, std::future::pending));
444
445        // Complete the primary task, and the service as a whole should wrap up.
446        atx.send(()).unwrap();
447        handle.await.unwrap().unwrap();
448        assert!(btx.is_closed());
449    }
450
451    #[tokio::test]
452    async fn test_secondary_complete() {
453        let (atx, arx) = oneshot::channel::<()>();
454        let (btx, brx) = oneshot::channel::<()>();
455        let (mut ctx, crx) = oneshot::channel::<()>();
456
457        // A secondary task and a primary task.
458        let snd = Service::new().spawn(async move {
459            let _crx = crx;
460            brx.await?;
461            Ok(())
462        });
463
464        let svc = Service::new()
465            .spawn(async move { Ok(arx.await?) })
466            .attach(snd);
467
468        let handle = tokio::spawn(svc.wait_for_shutdown(GRACE, std::future::pending));
469
470        // Complete the secondary task, and wait for it to complete (dropping the other channel).
471        btx.send(()).unwrap();
472        ctx.closed().await;
473        tokio::task::yield_now().await;
474
475        // The primary task will not have been cleaned up, so we can send to it, completing that
476        // task, and allowing the service as a whole to complete.
477        atx.send(()).unwrap();
478        handle.await.unwrap().unwrap();
479    }
480
481    #[tokio::test]
482    async fn test_secondary_failure() {
483        let (atx, arx) = oneshot::channel::<()>();
484
485        // A secondary task that fails, with a primary task.
486        let snd = Service::new().spawn(async move { bail!("boom") });
487        let svc = Service::new()
488            .spawn_aborting(async move { Ok(arx.await?) })
489            .attach(snd);
490
491        // Run the service -- it should fail immediately because the secondary task failed,
492        // cleaning up the primary task.
493        let res = svc.wait_for_shutdown(GRACE, std::future::pending).await;
494        assert!(matches!(res, Err(Error::Task(_))));
495        assert!(atx.is_closed());
496    }
497
498    #[tokio::test]
499    #[should_panic]
500    async fn test_secondary_panic() {
501        let (_atx, arx) = oneshot::channel::<()>();
502
503        // A secondary task that fails, with a primary task.
504        let snd = Service::new().spawn(async move { panic!("boom") });
505        let svc = Service::new()
506            .spawn_aborting(async move { Ok(arx.await?) })
507            .attach(snd);
508
509        // When the service runs, the panic from the secondary task will be propagated.
510        let _ = svc.wait_for_shutdown(GRACE, std::future::pending).await;
511    }
512
513    #[tokio::test]
514    async fn test_secondary_graceful_shutdown() {
515        let (atx, arx) = oneshot::channel::<()>();
516        let (btx, brx) = oneshot::channel::<()>();
517        let (ctx, crx) = oneshot::channel::<()>();
518
519        let srx = Arc::new(Notify::new());
520        let stx = srx.clone();
521
522        // A secondary task with a shutdown signal.
523        let snd = Service::new()
524            .with_shutdown_signal(async move { atx.send(()).unwrap() })
525            .spawn(async move {
526                let _crx = crx;
527                Ok(arx.await?)
528            });
529
530        // A primary task which aborts when signalled to shutdown.
531        let svc = Service::new()
532            .spawn_aborting(async move { Ok(brx.await?) })
533            .attach(snd);
534
535        // The service is now running in the background.
536        let handle =
537            tokio::spawn(svc.wait_for_shutdown(GRACE, move || srx.clone().notified_owned()));
538
539        // Confirm that each task is still waiting on their respective channels.
540        assert!(!btx.is_closed());
541        assert!(!ctx.is_closed());
542
543        // Send the shutdown signal - both tasks should be unblocked and complete gracefully.
544        stx.notify_one();
545        let res = handle.await.unwrap();
546        assert!(matches!(res, Err(Error::Terminated)));
547        assert!(btx.is_closed());
548        assert!(ctx.is_closed());
549    }
550
551    #[tokio::test]
552    async fn test_merge() {
553        let (atx, arx) = oneshot::channel::<()>();
554        let (btx, brx) = oneshot::channel::<()>();
555        let (ctx, crx) = oneshot::channel::<()>();
556        let (dtx, drx) = oneshot::channel::<()>();
557        let (etx, erx) = oneshot::channel::<()>();
558        let (ftx, frx) = oneshot::channel::<()>();
559
560        let srx = Arc::new(Notify::new());
561        let stx = srx.clone();
562
563        // Set-up two services -- each with a task that can be shutdown, and a task that's paused,
564        // which will send a message once unpaused.
565        let a = Service::new()
566            .spawn(async move { Ok(arx.await?) })
567            .with_shutdown_signal(async move { ctx.send(()).unwrap() })
568            .spawn(async move {
569                crx.await?;
570                dtx.send(()).unwrap();
571                Ok(())
572            });
573
574        let b = Service::new()
575            .spawn(async move { Ok(brx.await?) })
576            .with_shutdown_signal(async move { etx.send(()).unwrap() })
577            .spawn(async move {
578                erx.await?;
579                ftx.send(()).unwrap();
580                Ok(())
581            });
582
583        // Merge them into a larger service and run it.
584        let svc = Service::new().merge(a).merge(b);
585        let handle =
586            tokio::spawn(svc.wait_for_shutdown(GRACE, move || srx.clone().notified_owned()));
587
588        // Unblock the paused tasks, so they terminate.
589        atx.send(()).unwrap();
590        tokio::task::yield_now().await;
591
592        btx.send(()).unwrap();
593        tokio::task::yield_now().await;
594
595        // Send the shutdown signal - triggering graceful shutdown on the remaining tasks --
596        // confirm that those tasks actually go through the graceful shutdown process.
597        stx.notify_one();
598        drx.await.unwrap();
599        frx.await.unwrap();
600
601        // ...and then the service shuts down.
602        let res = handle.await.unwrap();
603        assert!(matches!(res, Err(Error::Terminated)));
604    }
605
606    #[tokio::test]
607    async fn test_drop_abort() {
608        let (mut atx, arx) = oneshot::channel::<()>();
609        let (mut btx, brx) = oneshot::channel::<()>();
610
611        let svc = Service::new()
612            .spawn(async move { Ok(arx.await?) })
613            .spawn_aborting(async move { Ok(brx.await?) });
614
615        assert!(!atx.is_closed());
616        assert!(!btx.is_closed());
617
618        // Once the service is dropped, its tasks will also be dropped, and the receivers will be
619        // dropped, closing the senders.
620        drop(svc);
621        atx.closed().await;
622        btx.closed().await;
623    }
624
625    #[tokio::test]
626    async fn test_shutdown() {
627        let (atx, arx) = oneshot::channel::<()>();
628        let (btx, brx) = oneshot::channel::<()>();
629
630        let svc = Service::new()
631            .with_shutdown_signal(async move { atx.send(()).unwrap() })
632            .spawn(async move { Ok(arx.await?) })
633            .spawn_aborting(async move { Ok(brx.await?) });
634
635        // We don't need to call `Service::run` to kick off the service's tasks -- they are now
636        // running in the background. We can call `shutdown` to trigger a graceful shutdown, which
637        // should exit cleanly and clean up any unused resources.
638        svc.shutdown().await.unwrap();
639        assert!(btx.is_closed());
640    }
641
642    #[tokio::test]
643    async fn test_error_cascade() {
644        let (atx, arx) = oneshot::channel::<()>();
645
646        // The first task errors immediately, and the second task errors during graceful shutdown.
647        let svc = Service::new()
648            .spawn(async move { bail!("boom") })
649            .with_shutdown_signal(async move { atx.send(()).unwrap() })
650            .spawn(async move {
651                arx.await?;
652                bail!("boom, again")
653            });
654
655        // The service will exit with an abort.
656        let res = svc.wait_for_shutdown(GRACE, std::future::pending).await;
657        assert!(matches!(res, Err(Error::Aborted)));
658    }
659
660    #[tokio::test]
661    async fn test_multiple_errors() {
662        // Both tasks produce an error, one will be picked up during normal execution, and the
663        // other will be picked up during shutdown, resulting in an ungraceful shutdown (abort).
664        let svc = Service::new()
665            .spawn(async move { bail!("boom") })
666            .spawn(async move { bail!("boom, again") });
667
668        // The service will exit with an abort.
669        let res = svc.wait_for_shutdown(GRACE, std::future::pending).await;
670        assert!(matches!(res, Err(Error::Aborted)));
671    }
672
673    #[tokio::test]
674    async fn test_termination_cascade() {
675        // A service with a task that ignores graceful shutdown.
676        let svc = Service::new().spawn(std::future::pending());
677
678        let srx = Arc::new(Notify::new());
679        let stx = srx.clone();
680
681        // The service is now running in the background.
682        let handle =
683            tokio::spawn(svc.wait_for_shutdown(GRACE, move || srx.clone().notified_owned()));
684
685        // Trigger the first termination, which the task will ignore.
686        stx.notify_one();
687        tokio::task::yield_now().await;
688
689        // Trigger the second termination, the service takes over.
690        stx.notify_one();
691        tokio::task::yield_now().await;
692
693        let res = handle.await.unwrap();
694        assert!(matches!(res, Err(Error::Aborted)));
695    }
696
697    #[tokio::test]
698    #[should_panic]
699    async fn test_panic_during_shutdown() {
700        let (atx, arx) = oneshot::channel::<()>();
701
702        let srx = Arc::new(Notify::new());
703        let stx = srx.clone();
704
705        let svc = Service::new()
706            .with_shutdown_signal(async move { atx.send(()).unwrap() })
707            .spawn(async move {
708                arx.await?;
709                panic!("boom")
710            });
711
712        // The service is now running in the background.
713        let handle =
714            tokio::spawn(svc.wait_for_shutdown(GRACE, move || srx.clone().notified_owned()));
715
716        // Send the shutdown signal, the panic gets resumed when the service is awaited.
717        stx.notify_one();
718        let _ = handle.await.unwrap();
719    }
720
721    #[tokio::test(start_paused = true)]
722    async fn test_graceful_shutdown_timeout() {
723        let srx = Arc::new(Notify::new());
724        let stx = srx.clone();
725
726        // A service with a task that ignores graceful shutdown.
727        let svc = Service::new().spawn(std::future::pending());
728
729        let handle =
730            tokio::spawn(svc.wait_for_shutdown(GRACE, move || srx.clone().notified_owned()));
731
732        // Trigger cancellation and then let twice the grace period pass, which should be treated
733        // as an abort.
734        stx.notify_one();
735        tokio::time::advance(GRACE * 2).await;
736
737        let res = handle.await.unwrap();
738        assert!(matches!(res, Err(Error::Aborted)));
739    }
740}