sui_indexer/handlers/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{BTreeMap, HashMap};
5
6use sui_types::messages_checkpoint::CheckpointSequenceNumber;
7use tap::tap::TapFallible;
8use tokio_util::sync::CancellationToken;
9use tracing::instrument;
10use tracing::{error, info};
11
12use crate::metrics::IndexerMetrics;
13use crate::models::raw_checkpoints::StoredRawCheckpoint;
14use crate::store::IndexerStore;
15use crate::types::IndexerResult;
16
17use super::{CheckpointDataToCommit, CommitterTables, CommitterWatermark, EpochToCommit};
18
19pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
20
21pub async fn start_tx_checkpoint_commit_task<S>(
22    state: S,
23    metrics: IndexerMetrics,
24    tx_indexing_receiver: mysten_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
25    cancel: CancellationToken,
26    mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
27    end_checkpoint_opt: Option<CheckpointSequenceNumber>,
28) -> IndexerResult<()>
29where
30    S: IndexerStore + Clone + Sync + Send + 'static,
31{
32    use futures::StreamExt;
33
34    info!("Indexer checkpoint commit task started...");
35    let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
36        .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
37        .parse::<usize>()
38        .unwrap();
39    info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
40
41    let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
42        .ready_chunks(checkpoint_commit_batch_size);
43
44    let mut unprocessed = HashMap::new();
45    let mut batch = vec![];
46
47    while let Some(indexed_checkpoint_batch) = stream.next().await {
48        if cancel.is_cancelled() {
49            break;
50        }
51
52        // split the batch into smaller batches per epoch to handle partitioning
53        for checkpoint in indexed_checkpoint_batch {
54            unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
55        }
56        while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
57            let epoch = checkpoint.epoch.clone();
58            batch.push(checkpoint);
59            next_checkpoint_sequence_number += 1;
60            let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch_id());
61            // The batch will consist of contiguous checkpoints and at most one epoch boundary at
62            // the end.
63            if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
64                commit_checkpoints(&state, batch, epoch, &metrics).await;
65                batch = vec![];
66            }
67            if let Some(epoch_number) = epoch_number_option {
68                state.upload_display(epoch_number).await.tap_err(|e| {
69                    error!(
70                        "Failed to upload display table before epoch {} with error: {}",
71                        epoch_number,
72                        e.to_string()
73                    );
74                })?;
75            }
76            // stop adding to the commit batch if we've reached the end checkpoint
77            if let Some(end_checkpoint_sequence_number) = end_checkpoint_opt
78                && next_checkpoint_sequence_number > end_checkpoint_sequence_number
79            {
80                break;
81            }
82        }
83        if !batch.is_empty() {
84            commit_checkpoints(&state, batch, None, &metrics).await;
85            batch = vec![];
86        }
87
88        // stop the commit task if we've reached the end checkpoint
89        if let Some(end_checkpoint_sequence_number) = end_checkpoint_opt
90            && next_checkpoint_sequence_number > end_checkpoint_sequence_number
91        {
92            break;
93        }
94    }
95    Ok(())
96}
97
98/// Writes indexed checkpoint data to the database, and then update watermark upper bounds and
99/// metrics. Expects `indexed_checkpoint_batch` to be non-empty, and contain contiguous checkpoints.
100/// There can be at most one epoch boundary at the end. If an epoch boundary is detected,
101/// epoch-partitioned tables must be advanced.
102// Unwrap: Caller needs to make sure indexed_checkpoint_batch is not empty
103#[instrument(skip_all, fields(
104    first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
105    last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
106))]
107async fn commit_checkpoints<S>(
108    state: &S,
109    indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
110    epoch: Option<EpochToCommit>,
111    metrics: &IndexerMetrics,
112) where
113    S: IndexerStore + Clone + Sync + Send + 'static,
114{
115    let mut checkpoint_batch = vec![];
116    let mut tx_batch = vec![];
117    let mut events_batch = vec![];
118    let mut tx_indices_batch = vec![];
119    let mut event_indices_batch = vec![];
120    let mut display_updates_batch = BTreeMap::new();
121    let mut object_changes_batch = vec![];
122    let mut object_history_changes_batch = vec![];
123    let mut object_versions_batch = vec![];
124    let mut packages_batch = vec![];
125
126    for indexed_checkpoint in indexed_checkpoint_batch {
127        let CheckpointDataToCommit {
128            checkpoint,
129            transactions,
130            events,
131            event_indices,
132            tx_indices,
133            display_updates,
134            object_changes,
135            object_history_changes,
136            object_versions,
137            packages,
138            epoch: _,
139        } = indexed_checkpoint;
140
141        tx_batch.push(transactions);
142        events_batch.push(events);
143        tx_indices_batch.push(tx_indices);
144        event_indices_batch.push(event_indices);
145        display_updates_batch.extend(display_updates.into_iter());
146        object_changes_batch.push(object_changes);
147        object_versions_batch.push(object_versions);
148        object_history_changes_batch.push(object_history_changes);
149        checkpoint_batch.push(checkpoint);
150        packages_batch.push(packages);
151    }
152
153    let first_checkpoint_seq = checkpoint_batch.first().unwrap().sequence_number;
154    let last_checkpoint = checkpoint_batch.last().unwrap();
155    let committer_watermark = CommitterWatermark::from(last_checkpoint);
156
157    let guard = metrics.checkpoint_db_commit_latency.start_timer();
158    let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
159    let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
160    let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
161    let event_indices_batch = event_indices_batch
162        .into_iter()
163        .flatten()
164        .collect::<Vec<_>>();
165    let object_versions_batch = object_versions_batch
166        .into_iter()
167        .flatten()
168        .collect::<Vec<_>>();
169    let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
170    let checkpoint_num = checkpoint_batch.len();
171    let tx_count = tx_batch.len();
172    let raw_checkpoints_batch = checkpoint_batch
173        .iter()
174        .map(|c| c.into())
175        .collect::<Vec<StoredRawCheckpoint>>();
176
177    {
178        let _step_1_guard: prometheus::HistogramTimer =
179            metrics.checkpoint_db_commit_latency_step_1.start_timer();
180
181        let mut persist_tasks = vec![
182            state.persist_packages(packages_batch),
183            state.persist_object_history(object_history_changes_batch.clone()),
184            state.persist_transactions(tx_batch),
185            state.persist_tx_indices(tx_indices_batch),
186            state.persist_events(events_batch),
187            state.persist_event_indices(event_indices_batch),
188            state.persist_displays(display_updates_batch),
189            state.persist_objects(object_changes_batch.clone()),
190            state.persist_full_objects_history(object_history_changes_batch.clone()),
191            state.persist_objects_version(object_versions_batch.clone()),
192            state.persist_raw_checkpoints(raw_checkpoints_batch),
193        ];
194
195        if let Some(epoch_data) = epoch.clone() {
196            persist_tasks.push(state.persist_epoch(epoch_data));
197        }
198        futures::future::join_all(persist_tasks)
199            .await
200            .into_iter()
201            .map(|res| {
202                if res.is_err() {
203                    error!("Failed to persist data with error: {:?}", res);
204                }
205                res
206            })
207            .collect::<IndexerResult<Vec<_>>>()
208            .expect("Persisting data into DB should not fail.");
209    }
210
211    let is_epoch_end = epoch.is_some();
212
213    // On epoch boundary, we need to modify the existing partitions' upper bound, and introduce a
214    // new partition for incoming data for the upcoming epoch.
215    if let Some(epoch_data) = epoch {
216        state
217            .advance_epoch(epoch_data)
218            .await
219            .tap_err(|e| {
220                error!("Failed to advance epoch with error: {}", e.to_string());
221            })
222            .expect("Advancing epochs in DB should not fail.");
223        metrics.total_epoch_committed.inc();
224    }
225
226    state
227        .persist_checkpoints(checkpoint_batch)
228        .await
229        .tap_err(|e| {
230            error!(
231                "Failed to persist checkpoint data with error: {}",
232                e.to_string()
233            );
234        })
235        .expect("Persisting data into DB should not fail.");
236
237    if is_epoch_end {
238        // The epoch has advanced so we update the configs for the new protocol version, if it has changed.
239        let chain_id = state
240            .get_chain_identifier()
241            .await
242            .expect("Failed to get chain identifier")
243            .expect("Chain identifier should have been indexed at this point");
244        let _ = state
245            .persist_protocol_configs_and_feature_flags(chain_id)
246            .await;
247    }
248
249    state
250        .update_watermarks_upper_bound::<CommitterTables>(committer_watermark)
251        .await
252        .tap_err(|e| {
253            error!(
254                "Failed to update watermark upper bound with error: {}",
255                e.to_string()
256            );
257        })
258        .expect("Updating watermark upper bound in DB should not fail.");
259
260    let elapsed = guard.stop_and_record();
261
262    info!(
263        elapsed,
264        "Checkpoint {}-{} committed with {} transactions.",
265        first_checkpoint_seq,
266        committer_watermark.checkpoint_hi_inclusive,
267        tx_count,
268    );
269    metrics
270        .latest_tx_checkpoint_sequence_number
271        .set(committer_watermark.checkpoint_hi_inclusive as i64);
272    metrics
273        .total_tx_checkpoint_committed
274        .inc_by(checkpoint_num as u64);
275    metrics.total_transaction_committed.inc_by(tx_count as u64);
276    metrics.transaction_per_checkpoint.observe(
277        tx_count as f64
278            / (committer_watermark.checkpoint_hi_inclusive - first_checkpoint_seq + 1) as f64,
279    );
280    // 1000.0 is not necessarily the batch size, it's to roughly map average tx commit latency to [0.1, 1] seconds,
281    // which is well covered by DB_COMMIT_LATENCY_SEC_BUCKETS.
282    metrics
283        .thousand_transaction_avg_db_commit_latency
284        .observe(elapsed * 1000.0 / tx_count as f64);
285}