sui_indexer/handlers/
committer.rs1use std::collections::{BTreeMap, HashMap};
5
6use sui_types::messages_checkpoint::CheckpointSequenceNumber;
7use tap::tap::TapFallible;
8use tokio_util::sync::CancellationToken;
9use tracing::instrument;
10use tracing::{error, info};
11
12use crate::metrics::IndexerMetrics;
13use crate::models::raw_checkpoints::StoredRawCheckpoint;
14use crate::store::IndexerStore;
15use crate::types::IndexerResult;
16
17use super::{CheckpointDataToCommit, CommitterTables, CommitterWatermark, EpochToCommit};
18
19pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
20
21pub async fn start_tx_checkpoint_commit_task<S>(
22 state: S,
23 metrics: IndexerMetrics,
24 tx_indexing_receiver: mysten_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
25 cancel: CancellationToken,
26 mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
27 end_checkpoint_opt: Option<CheckpointSequenceNumber>,
28) -> IndexerResult<()>
29where
30 S: IndexerStore + Clone + Sync + Send + 'static,
31{
32 use futures::StreamExt;
33
34 info!("Indexer checkpoint commit task started...");
35 let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
36 .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
37 .parse::<usize>()
38 .unwrap();
39 info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
40
41 let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
42 .ready_chunks(checkpoint_commit_batch_size);
43
44 let mut unprocessed = HashMap::new();
45 let mut batch = vec![];
46
47 while let Some(indexed_checkpoint_batch) = stream.next().await {
48 if cancel.is_cancelled() {
49 break;
50 }
51
52 for checkpoint in indexed_checkpoint_batch {
54 unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
55 }
56 while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
57 let epoch = checkpoint.epoch.clone();
58 batch.push(checkpoint);
59 next_checkpoint_sequence_number += 1;
60 let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch_id());
61 if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
64 commit_checkpoints(&state, batch, epoch, &metrics).await;
65 batch = vec![];
66 }
67 if let Some(epoch_number) = epoch_number_option {
68 state.upload_display(epoch_number).await.tap_err(|e| {
69 error!(
70 "Failed to upload display table before epoch {} with error: {}",
71 epoch_number,
72 e.to_string()
73 );
74 })?;
75 }
76 if let Some(end_checkpoint_sequence_number) = end_checkpoint_opt
78 && next_checkpoint_sequence_number > end_checkpoint_sequence_number
79 {
80 break;
81 }
82 }
83 if !batch.is_empty() {
84 commit_checkpoints(&state, batch, None, &metrics).await;
85 batch = vec![];
86 }
87
88 if let Some(end_checkpoint_sequence_number) = end_checkpoint_opt
90 && next_checkpoint_sequence_number > end_checkpoint_sequence_number
91 {
92 break;
93 }
94 }
95 Ok(())
96}
97
98#[instrument(skip_all, fields(
104 first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
105 last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
106))]
107async fn commit_checkpoints<S>(
108 state: &S,
109 indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
110 epoch: Option<EpochToCommit>,
111 metrics: &IndexerMetrics,
112) where
113 S: IndexerStore + Clone + Sync + Send + 'static,
114{
115 let mut checkpoint_batch = vec![];
116 let mut tx_batch = vec![];
117 let mut events_batch = vec![];
118 let mut tx_indices_batch = vec![];
119 let mut event_indices_batch = vec![];
120 let mut display_updates_batch = BTreeMap::new();
121 let mut object_changes_batch = vec![];
122 let mut object_history_changes_batch = vec![];
123 let mut object_versions_batch = vec![];
124 let mut packages_batch = vec![];
125
126 for indexed_checkpoint in indexed_checkpoint_batch {
127 let CheckpointDataToCommit {
128 checkpoint,
129 transactions,
130 events,
131 event_indices,
132 tx_indices,
133 display_updates,
134 object_changes,
135 object_history_changes,
136 object_versions,
137 packages,
138 epoch: _,
139 } = indexed_checkpoint;
140
141 tx_batch.push(transactions);
142 events_batch.push(events);
143 tx_indices_batch.push(tx_indices);
144 event_indices_batch.push(event_indices);
145 display_updates_batch.extend(display_updates.into_iter());
146 object_changes_batch.push(object_changes);
147 object_versions_batch.push(object_versions);
148 object_history_changes_batch.push(object_history_changes);
149 checkpoint_batch.push(checkpoint);
150 packages_batch.push(packages);
151 }
152
153 let first_checkpoint_seq = checkpoint_batch.first().unwrap().sequence_number;
154 let last_checkpoint = checkpoint_batch.last().unwrap();
155 let committer_watermark = CommitterWatermark::from(last_checkpoint);
156
157 let guard = metrics.checkpoint_db_commit_latency.start_timer();
158 let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
159 let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
160 let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
161 let event_indices_batch = event_indices_batch
162 .into_iter()
163 .flatten()
164 .collect::<Vec<_>>();
165 let object_versions_batch = object_versions_batch
166 .into_iter()
167 .flatten()
168 .collect::<Vec<_>>();
169 let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
170 let checkpoint_num = checkpoint_batch.len();
171 let tx_count = tx_batch.len();
172 let raw_checkpoints_batch = checkpoint_batch
173 .iter()
174 .map(|c| c.into())
175 .collect::<Vec<StoredRawCheckpoint>>();
176
177 {
178 let _step_1_guard: prometheus::HistogramTimer =
179 metrics.checkpoint_db_commit_latency_step_1.start_timer();
180
181 let mut persist_tasks = vec![
182 state.persist_packages(packages_batch),
183 state.persist_object_history(object_history_changes_batch.clone()),
184 state.persist_transactions(tx_batch),
185 state.persist_tx_indices(tx_indices_batch),
186 state.persist_events(events_batch),
187 state.persist_event_indices(event_indices_batch),
188 state.persist_displays(display_updates_batch),
189 state.persist_objects(object_changes_batch.clone()),
190 state.persist_full_objects_history(object_history_changes_batch.clone()),
191 state.persist_objects_version(object_versions_batch.clone()),
192 state.persist_raw_checkpoints(raw_checkpoints_batch),
193 ];
194
195 if let Some(epoch_data) = epoch.clone() {
196 persist_tasks.push(state.persist_epoch(epoch_data));
197 }
198 futures::future::join_all(persist_tasks)
199 .await
200 .into_iter()
201 .map(|res| {
202 if res.is_err() {
203 error!("Failed to persist data with error: {:?}", res);
204 }
205 res
206 })
207 .collect::<IndexerResult<Vec<_>>>()
208 .expect("Persisting data into DB should not fail.");
209 }
210
211 let is_epoch_end = epoch.is_some();
212
213 if let Some(epoch_data) = epoch {
216 state
217 .advance_epoch(epoch_data)
218 .await
219 .tap_err(|e| {
220 error!("Failed to advance epoch with error: {}", e.to_string());
221 })
222 .expect("Advancing epochs in DB should not fail.");
223 metrics.total_epoch_committed.inc();
224 }
225
226 state
227 .persist_checkpoints(checkpoint_batch)
228 .await
229 .tap_err(|e| {
230 error!(
231 "Failed to persist checkpoint data with error: {}",
232 e.to_string()
233 );
234 })
235 .expect("Persisting data into DB should not fail.");
236
237 if is_epoch_end {
238 let chain_id = state
240 .get_chain_identifier()
241 .await
242 .expect("Failed to get chain identifier")
243 .expect("Chain identifier should have been indexed at this point");
244 let _ = state
245 .persist_protocol_configs_and_feature_flags(chain_id)
246 .await;
247 }
248
249 state
250 .update_watermarks_upper_bound::<CommitterTables>(committer_watermark)
251 .await
252 .tap_err(|e| {
253 error!(
254 "Failed to update watermark upper bound with error: {}",
255 e.to_string()
256 );
257 })
258 .expect("Updating watermark upper bound in DB should not fail.");
259
260 let elapsed = guard.stop_and_record();
261
262 info!(
263 elapsed,
264 "Checkpoint {}-{} committed with {} transactions.",
265 first_checkpoint_seq,
266 committer_watermark.checkpoint_hi_inclusive,
267 tx_count,
268 );
269 metrics
270 .latest_tx_checkpoint_sequence_number
271 .set(committer_watermark.checkpoint_hi_inclusive as i64);
272 metrics
273 .total_tx_checkpoint_committed
274 .inc_by(checkpoint_num as u64);
275 metrics.total_transaction_committed.inc_by(tx_count as u64);
276 metrics.transaction_per_checkpoint.observe(
277 tx_count as f64
278 / (committer_watermark.checkpoint_hi_inclusive - first_checkpoint_seq + 1) as f64,
279 );
280 metrics
283 .thousand_transaction_avg_db_commit_latency
284 .observe(elapsed * 1000.0 / tx_count as f64);
285}