sui_graphql_rpc/server/
exchange_rates_task.rs1use sui_indexer::apis::{governance_api::exchange_rates, GovernanceReadApi};
5use tokio::sync::watch;
6use tokio_util::sync::CancellationToken;
7use tracing::{error, info};
8
9use crate::data::pg::PgExecutor;
10
11pub(crate) struct TriggerExchangeRatesTask {
14 cancel: CancellationToken,
15 db: PgExecutor,
16 epoch_rx: watch::Receiver<u64>,
17}
18
19impl TriggerExchangeRatesTask {
20 pub(crate) fn new(
21 db: PgExecutor,
22 epoch_rx: watch::Receiver<u64>,
23 cancel: CancellationToken,
24 ) -> Self {
25 Self {
26 db,
27 epoch_rx,
28 cancel,
29 }
30 }
31
32 pub(crate) async fn run(&mut self) {
33 loop {
34 tokio::select! {
35 _ = self.cancel.cancelled() => {
36 info!("Shutdown signal received, terminating trigger exchange rates task");
37 return;
38 }
39
40 _ = self.epoch_rx.changed() => {
41 info!("Detected epoch boundary, triggering call to exchange rates");
42 let latest_sui_system_state = self.db.inner
43 .get_latest_sui_system_state()
44 .await.map_err(|_| error!("Failed to fetch latest Sui system state"));
45
46 if let Ok(latest_sui_system_state) = latest_sui_system_state {
47 let db = self.db.clone();
48 let governance_api = GovernanceReadApi::new(db.inner) ;
49 exchange_rates(&governance_api, &latest_sui_system_state)
50 .await
51 .map_err(|e| error!("Failed to fetch exchange rates: {:?}", e))
52 .ok();
53 info!("Finished fetching exchange rates");
54 }
55 }
56 }
57 }
58 }
59}