sui_graphql_rpc/server/
watermark_task.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::data::{Db, DbConnection, QueryExecutor};
5use crate::error::Error;
6use crate::metrics::Metrics;
7use crate::types::chain_identifier::ChainIdentifier;
8use async_graphql::ServerError;
9use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl};
10use diesel_async::scoped_futures::ScopedFutureExt;
11use std::mem;
12use std::sync::Arc;
13use std::time::Duration;
14use sui_indexer::schema::{checkpoints, watermarks};
15use tokio::sync::{watch, RwLock};
16use tokio::time::Interval;
17use tokio_util::sync::CancellationToken;
18use tracing::{error, info};
19
20/// Watermark task that periodically updates the current checkpoint, checkpoint timestamp, and
21/// epoch values.
22pub(crate) struct WatermarkTask {
23    /// Thread-safe watermark that avoids writer starvation
24    watermark: WatermarkLock,
25    chain_identifier: ChainIdentifierLock,
26    db: Db,
27    metrics: Metrics,
28    sleep: Duration,
29    cancel: CancellationToken,
30    sender: watch::Sender<u64>,
31    receiver: watch::Receiver<u64>,
32}
33
34#[derive(Clone, Default)]
35pub(crate) struct ChainIdentifierLock(pub(crate) Arc<RwLock<ChainIdentifier>>);
36
37pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;
38
39/// Watermark used by GraphQL queries to ensure cross-query consistency and flag epoch-boundary
40/// changes.
41#[derive(Clone, Copy, Default)]
42pub(crate) struct Watermark {
43    /// The current epoch.
44    pub epoch: u64,
45    /// The timestamp of the inclusive upper-bound checkpoint for the query. This is used for the
46    /// health check.
47    pub hi_cp_timestamp_ms: u64,
48    /// The inclusive checkpoint upper-bound for the query.
49    pub hi_cp: u64,
50    /// The inclusive tx_sequence_number upper-bound for the query.
51    pub hi_tx: u64,
52    /// Smallest queryable checkpoint - checkpoints below this value are pruned.
53    pub lo_cp: u64,
54    /// Smallest queryable tx_sequence_number - tx_sequence_numbers below this value are pruned.
55    pub lo_tx: u64,
56}
57
58/// Starts an infinite loop that periodically updates the watermark.
59impl WatermarkTask {
60    pub(crate) fn new(
61        db: Db,
62        metrics: Metrics,
63        sleep: Duration,
64        cancel: CancellationToken,
65    ) -> Self {
66        let (sender, receiver) = watch::channel(0);
67
68        Self {
69            watermark: Default::default(),
70            chain_identifier: Default::default(),
71            db,
72            metrics,
73            sleep,
74            cancel,
75            sender,
76            receiver,
77        }
78    }
79
80    pub(crate) async fn run(&self) {
81        let mut interval = tokio::time::interval(self.sleep);
82        // We start the task by first finding & setting the chain identifier
83        // so that it can be used in all requests.
84        self.get_and_cache_chain_identifier(&mut interval).await;
85
86        loop {
87            tokio::select! {
88                _ = self.cancel.cancelled() => {
89                    info!("Shutdown signal received, terminating watermark update task");
90                    return;
91                },
92                _ = interval.tick() => {
93                    let Watermark {epoch, hi_cp_timestamp_ms, hi_cp, hi_tx, lo_cp, lo_tx } = match Watermark::query(&self.db).await {
94                        Ok(Some(watermark)) => watermark,
95                        Ok(None) => continue,
96                        Err(e) => {
97                            error!("Failed to fetch chain identifier: {}", e);
98                            self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
99                            continue;
100                        }
101                    };
102
103                    // Write the watermark as follows to limit how long we hold the lock
104                    let prev_epoch = {
105                        let mut w = self.watermark.write().await;
106                        w.hi_cp = hi_cp;
107                        w.hi_tx = hi_tx;
108                        w.hi_cp_timestamp_ms = hi_cp_timestamp_ms;
109                        w.lo_cp = lo_cp;
110                        w.lo_tx = lo_tx;
111                        mem::replace(&mut w.epoch, epoch)
112                    };
113
114                    // On epoch boundary, notify subscribers
115                    if epoch > prev_epoch {
116                        self.sender.send(epoch).unwrap();
117                    }
118                }
119            }
120        }
121    }
122
123    pub(crate) fn lock(&self) -> WatermarkLock {
124        self.watermark.clone()
125    }
126
127    pub(crate) fn chain_id_lock(&self) -> ChainIdentifierLock {
128        self.chain_identifier.clone()
129    }
130
131    /// Receiver for subscribing to epoch changes.
132    pub(crate) fn epoch_receiver(&self) -> watch::Receiver<u64> {
133        self.receiver.clone()
134    }
135
136    // Fetch the chain identifier (once) from the database and cache it.
137    async fn get_and_cache_chain_identifier(&self, interval: &mut Interval) {
138        loop {
139            tokio::select! {
140                _ = self.cancel.cancelled() => {
141                    info!("Shutdown signal received, terminating attempt to get chain identifier");
142                    return;
143                },
144                _ = interval.tick() => {
145                    // we only set the chain_identifier once.
146                    let chain = match ChainIdentifier::query(&self.db).await  {
147                        Ok(Some(chain)) => chain,
148                        Ok(None) => continue,
149                        Err(e) => {
150                            error!("{}", e);
151                            self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
152                            continue;
153                        }
154                    };
155
156                    let mut chain_id_lock = self.chain_identifier.0.write().await;
157                    *chain_id_lock = chain.into();
158                    return;
159                }
160            }
161        }
162    }
163}
164
165impl Watermark {
166    pub(crate) async fn new(lock: WatermarkLock) -> Self {
167        let w = lock.read().await;
168        Self {
169            hi_cp: w.hi_cp,
170            hi_cp_timestamp_ms: w.hi_cp_timestamp_ms,
171            hi_tx: w.hi_tx,
172            epoch: w.epoch,
173            lo_cp: w.lo_cp,
174            lo_tx: w.lo_tx,
175        }
176    }
177
178    /// Queries the watermarks table for the `checkpoints` pipeline to determine the available range
179    /// of checkpoints and tx_sequence_numbers. We don't query tables directly as pruning may be in
180    /// progress, which means the lower bound of data will constantly change. The watermarks table
181    /// has a `tx_hi` value, but not a `tx_lo` value, so the query also joins on the `checkpoints`
182    /// table to get the `min_tx_sequence_number` for that lower bound.
183    #[allow(clippy::type_complexity)]
184    pub(crate) async fn query(db: &Db) -> Result<Option<Watermark>, Error> {
185        let (reader_lo_to_tx, cp_hi_to_timestamp) = diesel::alias!(
186            checkpoints as reader_lo_to_tx,
187            checkpoints as cp_hi_to_timestamp
188        );
189
190        let Some(result): Option<(i64, i64, i64, i64, i64, Option<i64>)> = db
191            .execute(move |conn| {
192                async move {
193                    conn.result(move || {
194                        watermarks::table
195                            // Join for reader_lo -> checkpoints (as cp_reader) to get min_tx_sequence_number
196                            .inner_join(
197                                reader_lo_to_tx.on(watermarks::reader_lo
198                                    .eq(reader_lo_to_tx.field(checkpoints::sequence_number))),
199                            )
200                            // Join for checkpoint_hi_inclusive -> checkpoints (as cp_hi) to get
201                            // timestamp_ms of cp_hi
202                            .inner_join(
203                                cp_hi_to_timestamp.on(watermarks::checkpoint_hi_inclusive
204                                    .eq(cp_hi_to_timestamp.field(checkpoints::sequence_number))),
205                            )
206                            .filter(watermarks::pipeline.eq("checkpoints"))
207                            .select((
208                                watermarks::epoch_hi_inclusive,
209                                cp_hi_to_timestamp.field(checkpoints::timestamp_ms),
210                                watermarks::checkpoint_hi_inclusive,
211                                watermarks::tx_hi,
212                                watermarks::reader_lo,
213                                reader_lo_to_tx.field(checkpoints::min_tx_sequence_number),
214                            ))
215                    })
216                    .await
217                    .optional()
218                }
219                .scope_boxed()
220            })
221            .await
222            .map_err(|e| Error::Internal(format!("Failed to fetch watermark data: {e}")))?
223        else {
224            // An empty response from the db is valid when indexer has not committed data to the db
225            // yet.
226            return Ok(None);
227        };
228
229        if let (epoch, hi_cp_timestamp_ms, hi_cp, hi_tx, lo_cp, Some(lo_tx)) = result {
230            Ok(Some(Watermark {
231                hi_cp: hi_cp as u64,
232                hi_cp_timestamp_ms: hi_cp_timestamp_ms as u64,
233                hi_tx: hi_tx as u64,
234                epoch: epoch as u64,
235                lo_cp: lo_cp as u64,
236                lo_tx: lo_tx as u64,
237            }))
238        } else {
239            Err(Error::Internal(
240                "Expected entry for tx lower bound and min_tx_sequence_number to be non-null"
241                    .to_string(),
242            ))
243        }
244    }
245}
246
247impl ChainIdentifierLock {
248    pub(crate) async fn read(&self) -> ChainIdentifier {
249        let w = self.0.read().await;
250        w.0.into()
251    }
252}