sui_bridge/sui_bridge_watchdog/
mod.rs1use anyhow::Result;
9use async_trait::async_trait;
10use mysten_metrics::spawn_logged_monitored_task;
11use tokio::time::Duration;
12use tokio::time::MissedTickBehavior;
13use tracing::{Instrument, error_span, info};
14
15pub mod eth_bridge_status;
16pub mod eth_vault_balance;
17pub mod metrics;
18pub mod sui_bridge_status;
19pub mod total_supplies;
20
21pub struct BridgeWatchDog {
22 observables: Vec<Box<dyn Observable + Send + Sync>>,
23}
24
25impl BridgeWatchDog {
26 pub fn new(observables: Vec<Box<dyn Observable + Send + Sync>>) -> Self {
27 Self { observables }
28 }
29
30 pub async fn run(self) {
31 let mut handles = vec![];
32 for observable in self.observables.into_iter() {
33 let handle = spawn_logged_monitored_task!(Self::run_observable(observable));
34 handles.push(handle);
35 }
36 futures::future::try_join_all(handles).await.unwrap();
38 unreachable!("watch dog tasks should not exit");
39 }
40
41 async fn run_observable(observable: Box<dyn Observable + Send + Sync>) -> Result<()> {
42 let mut interval = tokio::time::interval(observable.interval());
43 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
44 let name = observable.name();
45 let span = error_span!("observable", name);
46 loop {
47 info!("Running observable {}", name);
48 observable
49 .observe_and_report()
50 .instrument(span.clone())
51 .await;
52 interval.tick().await;
53 }
54 }
55}
56
57#[async_trait]
58pub trait Observable {
59 fn name(&self) -> &str;
60 async fn observe_and_report(&self);
61 fn interval(&self) -> Duration;
62}