sui_bridge/sui_bridge_watchdog/
mod.rs1use anyhow::Result;
9use async_trait::async_trait;
10use mysten_metrics::spawn_logged_monitored_task;
11use tokio::time::Duration;
12use tokio::time::MissedTickBehavior;
13use tracing::{Instrument, error_span, info};
14
15pub mod eth_bridge_status;
16pub mod eth_vault_balance;
17pub mod metrics;
18pub mod sui_bridge_status;
19pub mod total_supplies;
20
21pub struct BridgeWatchDog {
22    observables: Vec<Box<dyn Observable + Send + Sync>>,
23}
24
25impl BridgeWatchDog {
26    pub fn new(observables: Vec<Box<dyn Observable + Send + Sync>>) -> Self {
27        Self { observables }
28    }
29
30    pub async fn run(self) {
31        let mut handles = vec![];
32        for observable in self.observables.into_iter() {
33            let handle = spawn_logged_monitored_task!(Self::run_observable(observable));
34            handles.push(handle);
35        }
36        futures::future::try_join_all(handles).await.unwrap();
38        unreachable!("watch dog tasks should not exit");
39    }
40
41    async fn run_observable(observable: Box<dyn Observable + Send + Sync>) -> Result<()> {
42        let mut interval = tokio::time::interval(observable.interval());
43        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
44        let name = observable.name();
45        let span = error_span!("observable", name);
46        loop {
47            info!("Running observable {}", name);
48            observable
49                .observe_and_report()
50                .instrument(span.clone())
51                .await;
52            interval.tick().await;
53        }
54    }
55}
56
57#[async_trait]
58pub trait Observable {
59    fn name(&self) -> &str;
60    async fn observe_and_report(&self);
61    fn interval(&self) -> Duration;
62}