1use crate::data::{Db, DbConnection, QueryExecutor};
5use crate::error::Error;
6use crate::metrics::Metrics;
7use crate::types::chain_identifier::ChainIdentifier;
8use async_graphql::ServerError;
9use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl};
10use diesel_async::scoped_futures::ScopedFutureExt;
11use std::mem;
12use std::sync::Arc;
13use std::time::Duration;
14use sui_indexer::schema::{checkpoints, watermarks};
15use tokio::sync::{watch, RwLock};
16use tokio::time::Interval;
17use tokio_util::sync::CancellationToken;
18use tracing::{error, info};
19
20pub(crate) struct WatermarkTask {
23 watermark: WatermarkLock,
25 chain_identifier: ChainIdentifierLock,
26 db: Db,
27 metrics: Metrics,
28 sleep: Duration,
29 cancel: CancellationToken,
30 sender: watch::Sender<u64>,
31 receiver: watch::Receiver<u64>,
32}
33
34#[derive(Clone, Default)]
35pub(crate) struct ChainIdentifierLock(pub(crate) Arc<RwLock<ChainIdentifier>>);
36
37pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;
38
39#[derive(Clone, Copy, Default)]
42pub(crate) struct Watermark {
43 pub epoch: u64,
45 pub hi_cp_timestamp_ms: u64,
48 pub hi_cp: u64,
50 pub hi_tx: u64,
52 pub lo_cp: u64,
54 pub lo_tx: u64,
56}
57
58impl WatermarkTask {
60 pub(crate) fn new(
61 db: Db,
62 metrics: Metrics,
63 sleep: Duration,
64 cancel: CancellationToken,
65 ) -> Self {
66 let (sender, receiver) = watch::channel(0);
67
68 Self {
69 watermark: Default::default(),
70 chain_identifier: Default::default(),
71 db,
72 metrics,
73 sleep,
74 cancel,
75 sender,
76 receiver,
77 }
78 }
79
80 pub(crate) async fn run(&self) {
81 let mut interval = tokio::time::interval(self.sleep);
82 self.get_and_cache_chain_identifier(&mut interval).await;
85
86 loop {
87 tokio::select! {
88 _ = self.cancel.cancelled() => {
89 info!("Shutdown signal received, terminating watermark update task");
90 return;
91 },
92 _ = interval.tick() => {
93 let Watermark {epoch, hi_cp_timestamp_ms, hi_cp, hi_tx, lo_cp, lo_tx } = match Watermark::query(&self.db).await {
94 Ok(Some(watermark)) => watermark,
95 Ok(None) => continue,
96 Err(e) => {
97 error!("Failed to fetch chain identifier: {}", e);
98 self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
99 continue;
100 }
101 };
102
103 let prev_epoch = {
105 let mut w = self.watermark.write().await;
106 w.hi_cp = hi_cp;
107 w.hi_tx = hi_tx;
108 w.hi_cp_timestamp_ms = hi_cp_timestamp_ms;
109 w.lo_cp = lo_cp;
110 w.lo_tx = lo_tx;
111 mem::replace(&mut w.epoch, epoch)
112 };
113
114 if epoch > prev_epoch {
116 self.sender.send(epoch).unwrap();
117 }
118 }
119 }
120 }
121 }
122
123 pub(crate) fn lock(&self) -> WatermarkLock {
124 self.watermark.clone()
125 }
126
127 pub(crate) fn chain_id_lock(&self) -> ChainIdentifierLock {
128 self.chain_identifier.clone()
129 }
130
131 pub(crate) fn epoch_receiver(&self) -> watch::Receiver<u64> {
133 self.receiver.clone()
134 }
135
136 async fn get_and_cache_chain_identifier(&self, interval: &mut Interval) {
138 loop {
139 tokio::select! {
140 _ = self.cancel.cancelled() => {
141 info!("Shutdown signal received, terminating attempt to get chain identifier");
142 return;
143 },
144 _ = interval.tick() => {
145 let chain = match ChainIdentifier::query(&self.db).await {
147 Ok(Some(chain)) => chain,
148 Ok(None) => continue,
149 Err(e) => {
150 error!("{}", e);
151 self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
152 continue;
153 }
154 };
155
156 let mut chain_id_lock = self.chain_identifier.0.write().await;
157 *chain_id_lock = chain.into();
158 return;
159 }
160 }
161 }
162 }
163}
164
165impl Watermark {
166 pub(crate) async fn new(lock: WatermarkLock) -> Self {
167 let w = lock.read().await;
168 Self {
169 hi_cp: w.hi_cp,
170 hi_cp_timestamp_ms: w.hi_cp_timestamp_ms,
171 hi_tx: w.hi_tx,
172 epoch: w.epoch,
173 lo_cp: w.lo_cp,
174 lo_tx: w.lo_tx,
175 }
176 }
177
178 #[allow(clippy::type_complexity)]
184 pub(crate) async fn query(db: &Db) -> Result<Option<Watermark>, Error> {
185 let (reader_lo_to_tx, cp_hi_to_timestamp) = diesel::alias!(
186 checkpoints as reader_lo_to_tx,
187 checkpoints as cp_hi_to_timestamp
188 );
189
190 let Some(result): Option<(i64, i64, i64, i64, i64, Option<i64>)> = db
191 .execute(move |conn| {
192 async move {
193 conn.result(move || {
194 watermarks::table
195 .inner_join(
197 reader_lo_to_tx.on(watermarks::reader_lo
198 .eq(reader_lo_to_tx.field(checkpoints::sequence_number))),
199 )
200 .inner_join(
203 cp_hi_to_timestamp.on(watermarks::checkpoint_hi_inclusive
204 .eq(cp_hi_to_timestamp.field(checkpoints::sequence_number))),
205 )
206 .filter(watermarks::pipeline.eq("checkpoints"))
207 .select((
208 watermarks::epoch_hi_inclusive,
209 cp_hi_to_timestamp.field(checkpoints::timestamp_ms),
210 watermarks::checkpoint_hi_inclusive,
211 watermarks::tx_hi,
212 watermarks::reader_lo,
213 reader_lo_to_tx.field(checkpoints::min_tx_sequence_number),
214 ))
215 })
216 .await
217 .optional()
218 }
219 .scope_boxed()
220 })
221 .await
222 .map_err(|e| Error::Internal(format!("Failed to fetch watermark data: {e}")))?
223 else {
224 return Ok(None);
227 };
228
229 if let (epoch, hi_cp_timestamp_ms, hi_cp, hi_tx, lo_cp, Some(lo_tx)) = result {
230 Ok(Some(Watermark {
231 hi_cp: hi_cp as u64,
232 hi_cp_timestamp_ms: hi_cp_timestamp_ms as u64,
233 hi_tx: hi_tx as u64,
234 epoch: epoch as u64,
235 lo_cp: lo_cp as u64,
236 lo_tx: lo_tx as u64,
237 }))
238 } else {
239 Err(Error::Internal(
240 "Expected entry for tx lower bound and min_tx_sequence_number to be non-null"
241 .to_string(),
242 ))
243 }
244 }
245}
246
247impl ChainIdentifierLock {
248 pub(crate) async fn read(&self) -> ChainIdentifier {
249 let w = self.0.read().await;
250 w.0.into()
251 }
252}