1use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::service::Service;
9use sui_futures::stream::Break;
10use sui_futures::stream::TrySpawnStreamExt;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18use crate::metrics::CheckpointLagMetricReporter;
19use crate::metrics::IndexerMetrics;
20use crate::pipeline::CommitterConfig;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::BatchedRows;
23use crate::pipeline::concurrent::Handler;
24use crate::store::Store;
25
26const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
28
29const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
31
32pub(super) fn committer<H: Handler>(
42 handler: Arc<H>,
43 config: CommitterConfig,
44 rx: mpsc::Receiver<BatchedRows<H>>,
45 tx: mpsc::Sender<Vec<WatermarkPart>>,
46 db: H::Store,
47 metrics: Arc<IndexerMetrics>,
48) -> Service {
49 Service::new().spawn_aborting(async move {
50 info!(pipeline = H::NAME, "Starting committer");
51 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
52 &metrics.partially_committed_checkpoint_timestamp_lag,
53 &metrics.latest_partially_committed_checkpoint_timestamp_lag_ms,
54 &metrics.latest_partially_committed_checkpoint,
55 );
56
57 match ReceiverStream::new(rx)
58 .try_for_each_spawned(
59 config.write_concurrency,
60 |BatchedRows {
61 batch,
62 batch_len,
63 watermark,
64 }| {
65 let batch = Arc::new(batch);
66 let handler = handler.clone();
67 let tx = tx.clone();
68 let db = db.clone();
69 let metrics = metrics.clone();
70 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
71
72 let backoff = ExponentialBackoff {
76 initial_interval: INITIAL_RETRY_INTERVAL,
77 current_interval: INITIAL_RETRY_INTERVAL,
78 max_interval: MAX_RETRY_INTERVAL,
79 max_elapsed_time: None,
80 ..Default::default()
81 };
82
83 let highest_checkpoint = watermark.iter().map(|w| w.checkpoint()).max();
84 let highest_checkpoint_timestamp =
85 watermark.iter().map(|w| w.timestamp_ms()).max();
86
87 use backoff::Error as BE;
88 let commit = move || {
89 let batch = batch.clone();
90 let handler = handler.clone();
91 let db = db.clone();
92 let metrics = metrics.clone();
93 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
94 async move {
95 if batch_len == 0 {
96 return Ok(());
97 }
98
99 metrics
100 .total_committer_batches_attempted
101 .with_label_values(&[H::NAME])
102 .inc();
103
104 let guard = metrics
105 .committer_commit_latency
106 .with_label_values(&[H::NAME])
107 .start_timer();
108
109 let mut conn = db.connect().await.map_err(|e| {
110 warn!(
111 pipeline = H::NAME,
112 "Committed failed to get connection for DB"
113 );
114
115 metrics
116 .total_committer_batches_failed
117 .with_label_values(&[H::NAME])
118 .inc();
119
120 BE::transient(Break::Err(e))
121 })?;
122
123 let affected = handler.commit(&batch, &mut conn).await;
124 let elapsed = guard.stop_and_record();
125
126 match affected {
127 Ok(affected) => {
128 debug!(
129 pipeline = H::NAME,
130 elapsed_ms = elapsed * 1000.0,
131 affected,
132 committed = batch_len,
133 "Wrote batch",
134 );
135
136 checkpoint_lag_reporter.report_lag(
137 highest_checkpoint.unwrap(),
139 highest_checkpoint_timestamp.unwrap(),
140 );
141
142 metrics
143 .total_committer_batches_succeeded
144 .with_label_values(&[H::NAME])
145 .inc();
146
147 metrics
148 .total_committer_rows_committed
149 .with_label_values(&[H::NAME])
150 .inc_by(batch_len as u64);
151
152 metrics
153 .total_committer_rows_affected
154 .with_label_values(&[H::NAME])
155 .inc_by(affected as u64);
156
157 metrics
158 .committer_tx_rows
159 .with_label_values(&[H::NAME])
160 .observe(affected as f64);
161
162 Ok(())
163 }
164
165 Err(e) => {
166 warn!(
167 pipeline = H::NAME,
168 elapsed_ms = elapsed * 1000.0,
169 committed = batch_len,
170 "Error writing batch: {e}",
171 );
172
173 metrics
174 .total_committer_batches_failed
175 .with_label_values(&[H::NAME])
176 .inc();
177
178 Err(BE::transient(Break::Err(e)))
179 }
180 }
181 }
182 };
183
184 async move {
185 backoff::future::retry(backoff, commit).await?;
189 if tx.send(watermark).await.is_err() {
190 info!(pipeline = H::NAME, "Watermark closed channel");
191 return Err(Break::<anyhow::Error>::Break);
192 }
193
194 Ok(())
195 }
196 },
197 )
198 .await
199 {
200 Ok(()) => {
201 info!(pipeline = H::NAME, "Batches done, stopping committer");
202 Ok(())
203 }
204
205 Err(Break::Break) => {
206 info!(pipeline = H::NAME, "Channels closed, stopping committer");
207 Ok(())
208 }
209
210 Err(Break::Err(e)) => {
211 error!(pipeline = H::NAME, "Error from committer: {e}");
212 Err(e.context(format!("Error from committer {}", H::NAME)))
213 }
214 }
215 })
216}
217
218#[cfg(test)]
219mod tests {
220 use std::sync::Arc;
221 use std::sync::Mutex;
222 use std::sync::atomic::AtomicUsize;
223 use std::sync::atomic::Ordering;
224
225 use anyhow::ensure;
226 use async_trait::async_trait;
227 use sui_types::full_checkpoint_content::Checkpoint;
228 use tokio::sync::mpsc;
229
230 use crate::FieldCount;
231 use crate::metrics::IndexerMetrics;
232 use crate::mocks::store::ConnectionFailure;
233 use crate::mocks::store::FallibleMockConnection;
234 use crate::mocks::store::FallibleMockStore;
235 use crate::pipeline::Processor;
236 use crate::pipeline::WatermarkPart;
237 use crate::pipeline::concurrent::BatchStatus;
238 use crate::pipeline::concurrent::BatchedRows;
239 use crate::pipeline::concurrent::Handler;
240 use crate::store::CommitterWatermark;
241
242 use super::*;
243
244 #[derive(Clone, FieldCount, Default)]
245 pub struct StoredData {
246 pub cp_sequence_number: u64,
247 pub tx_sequence_numbers: Vec<u64>,
248 pub commit_failure_remaining: Arc<AtomicUsize>,
252 pub commit_delay_ms: u64,
253 }
254
255 pub struct DataPipeline;
256
257 #[async_trait]
258 impl Processor for DataPipeline {
259 const NAME: &'static str = "data";
260
261 type Value = StoredData;
262
263 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
264 Ok(vec![])
265 }
266 }
267
268 #[async_trait]
269 impl Handler for DataPipeline {
270 type Store = FallibleMockStore;
271 type Batch = Vec<Self::Value>;
272
273 fn batch(
274 &self,
275 batch: &mut Self::Batch,
276 values: &mut std::vec::IntoIter<Self::Value>,
277 ) -> BatchStatus {
278 batch.extend(values);
279 BatchStatus::Pending
280 }
281
282 async fn commit<'a>(
283 &self,
284 batch: &Self::Batch,
285 conn: &mut FallibleMockConnection<'a>,
286 ) -> anyhow::Result<usize> {
287 for value in batch {
288 if value.commit_delay_ms > 0 {
290 tokio::time::sleep(Duration::from_millis(value.commit_delay_ms)).await;
291 }
292
293 {
295 let remaining = value
296 .commit_failure_remaining
297 .fetch_sub(1, Ordering::Relaxed);
298 ensure!(
299 remaining == 0,
300 "Commit failed, remaining failures: {}",
301 remaining - 1
302 );
303 }
304
305 conn.0
306 .commit_data(
307 DataPipeline::NAME,
308 value.cp_sequence_number,
309 value.tx_sequence_numbers.clone(),
310 )
311 .await?;
312 }
313 Ok(batch.len())
314 }
315 }
316
317 struct TestSetup {
318 store: FallibleMockStore,
319 batch_tx: mpsc::Sender<BatchedRows<DataPipeline>>,
320 watermark_rx: mpsc::Receiver<Vec<WatermarkPart>>,
321 committer: Service,
322 }
323
324 async fn setup_test(store: FallibleMockStore) -> TestSetup {
331 let config = CommitterConfig::default();
332 let metrics = IndexerMetrics::new(None, &Default::default());
333
334 let (batch_tx, batch_rx) = mpsc::channel::<BatchedRows<DataPipeline>>(10);
335 let (watermark_tx, watermark_rx) = mpsc::channel(10);
336
337 let store_clone = store.clone();
338 let handler = Arc::new(DataPipeline);
339 let committer = committer(
340 handler,
341 config,
342 batch_rx,
343 watermark_tx,
344 store_clone,
345 metrics,
346 );
347
348 TestSetup {
349 store,
350 batch_tx,
351 watermark_rx,
352 committer,
353 }
354 }
355
356 #[tokio::test]
357 async fn test_concurrent_batch_processing() {
358 let mut setup = setup_test(FallibleMockStore::default()).await;
359
360 let batch1 = BatchedRows::from_vec(
362 vec![
363 StoredData {
364 cp_sequence_number: 1,
365 tx_sequence_numbers: vec![1, 2, 3],
366 ..Default::default()
367 },
368 StoredData {
369 cp_sequence_number: 2,
370 tx_sequence_numbers: vec![4, 5, 6],
371 ..Default::default()
372 },
373 ],
374 vec![
375 WatermarkPart {
376 watermark: CommitterWatermark {
377 epoch_hi_inclusive: 0,
378 checkpoint_hi_inclusive: 1,
379 tx_hi: 3,
380 timestamp_ms_hi_inclusive: 1000,
381 },
382 batch_rows: 1,
383 total_rows: 1, },
385 WatermarkPart {
386 watermark: CommitterWatermark {
387 epoch_hi_inclusive: 0,
388 checkpoint_hi_inclusive: 2,
389 tx_hi: 6,
390 timestamp_ms_hi_inclusive: 2000,
391 },
392 batch_rows: 1,
393 total_rows: 1, },
395 ],
396 );
397
398 let batch2 = BatchedRows::from_vec(
399 vec![StoredData {
400 cp_sequence_number: 3,
401 tx_sequence_numbers: vec![7, 8, 9],
402 ..Default::default()
403 }],
404 vec![WatermarkPart {
405 watermark: CommitterWatermark {
406 epoch_hi_inclusive: 0,
407 checkpoint_hi_inclusive: 3,
408 tx_hi: 9,
409 timestamp_ms_hi_inclusive: 3000,
410 },
411 batch_rows: 1,
412 total_rows: 1, }],
414 );
415
416 setup.batch_tx.send(batch1).await.unwrap();
417 setup.batch_tx.send(batch2).await.unwrap();
418
419 let watermark1 = setup.watermark_rx.recv().await.unwrap();
421 let watermark2 = setup.watermark_rx.recv().await.unwrap();
422 assert_eq!(watermark1.len(), 2);
423 assert_eq!(watermark2.len(), 1);
424
425 {
427 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
428 assert_eq!(data.len(), 3);
429 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
430 assert_eq!(data.get(&2).unwrap().value(), &vec![4, 5, 6]);
431 assert_eq!(data.get(&3).unwrap().value(), &vec![7, 8, 9]);
432 }
433 }
434
435 #[tokio::test]
436 async fn test_commit_with_retries_for_commit_failure() {
437 let mut setup = setup_test(FallibleMockStore::default()).await;
438
439 let batch = BatchedRows::from_vec(
441 vec![StoredData {
442 cp_sequence_number: 1,
443 tx_sequence_numbers: vec![1, 2, 3],
444 commit_failure_remaining: Arc::new(AtomicUsize::new(1)),
445 commit_delay_ms: 1_000, }],
447 vec![WatermarkPart {
448 watermark: CommitterWatermark {
449 epoch_hi_inclusive: 0,
450 checkpoint_hi_inclusive: 1,
451 tx_hi: 3,
452 timestamp_ms_hi_inclusive: 1000,
453 },
454 batch_rows: 1,
455 total_rows: 1,
456 }],
457 );
458
459 setup.batch_tx.send(batch).await.unwrap();
461
462 tokio::time::sleep(Duration::from_millis(1_500)).await;
464
465 {
467 let data = setup.store.data.get(DataPipeline::NAME);
468 assert!(
469 data.is_none(),
470 "Data should not be committed before retry succeeds"
471 );
472 }
473 assert!(
474 setup.watermark_rx.try_recv().is_err(),
475 "No watermark should be received before retry succeeds"
476 );
477
478 tokio::time::sleep(Duration::from_millis(1_500)).await;
480
481 {
483 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
484
485 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
486 }
487 let watermark = setup.watermark_rx.recv().await.unwrap();
488 assert_eq!(watermark.len(), 1);
489 }
490
491 #[tokio::test]
492 async fn test_commit_with_retries_for_connection_failure() {
493 let store = FallibleMockStore {
495 connection_failure: Arc::new(Mutex::new(ConnectionFailure {
496 connection_failure_attempts: 1,
497 connection_delay_ms: 1_000, ..Default::default()
499 })),
500 ..Default::default()
501 };
502 let mut setup = setup_test(store).await;
503
504 let batch = BatchedRows::from_vec(
505 vec![StoredData {
506 cp_sequence_number: 1,
507 tx_sequence_numbers: vec![1, 2, 3],
508 ..Default::default()
509 }],
510 vec![WatermarkPart {
511 watermark: CommitterWatermark {
512 epoch_hi_inclusive: 0,
513 checkpoint_hi_inclusive: 1,
514 tx_hi: 3,
515 timestamp_ms_hi_inclusive: 1000,
516 },
517 batch_rows: 1,
518 total_rows: 1,
519 }],
520 );
521
522 setup.batch_tx.send(batch).await.unwrap();
524
525 tokio::time::sleep(Duration::from_millis(1_500)).await;
527
528 {
530 let data = setup.store.data.get(DataPipeline::NAME);
531 assert!(
532 data.is_none(),
533 "Data should not be committed before retry succeeds"
534 );
535 }
536 assert!(
537 setup.watermark_rx.try_recv().is_err(),
538 "No watermark should be received before retry succeeds"
539 );
540
541 tokio::time::sleep(Duration::from_millis(1_500)).await;
543
544 {
546 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
547 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
548 }
549 let watermark = setup.watermark_rx.recv().await.unwrap();
550 assert_eq!(watermark.len(), 1);
551 }
552
553 #[tokio::test]
554 async fn test_empty_batch_handling() {
555 let mut setup = setup_test(FallibleMockStore::default()).await;
556
557 let empty_batch = BatchedRows::from_vec(
558 vec![], vec![WatermarkPart {
560 watermark: CommitterWatermark {
561 epoch_hi_inclusive: 0,
562 checkpoint_hi_inclusive: 1,
563 tx_hi: 0,
564 timestamp_ms_hi_inclusive: 1000,
565 },
566 batch_rows: 0,
567 total_rows: 0,
568 }],
569 );
570
571 setup.batch_tx.send(empty_batch).await.unwrap();
573
574 let watermark = setup.watermark_rx.recv().await.unwrap();
576 assert_eq!(watermark.len(), 1);
577 assert_eq!(watermark[0].batch_rows, 0);
578 assert_eq!(watermark[0].total_rows, 0);
579
580 {
582 let data = setup.store.data.get(DataPipeline::NAME);
583 assert!(
584 data.is_none(),
585 "No data should be committed for empty batch"
586 );
587 }
588 }
589
590 #[tokio::test]
591 async fn test_watermark_channel_closed() {
592 let setup = setup_test(FallibleMockStore::default()).await;
593
594 let batch = BatchedRows::from_vec(
595 vec![StoredData {
596 cp_sequence_number: 1,
597 tx_sequence_numbers: vec![1, 2, 3],
598 ..Default::default()
599 }],
600 vec![WatermarkPart {
601 watermark: CommitterWatermark {
602 epoch_hi_inclusive: 0,
603 checkpoint_hi_inclusive: 1,
604 tx_hi: 3,
605 timestamp_ms_hi_inclusive: 1000,
606 },
607 batch_rows: 1,
608 total_rows: 1,
609 }],
610 );
611
612 setup.batch_tx.send(batch).await.unwrap();
614
615 tokio::time::sleep(Duration::from_millis(100)).await;
617
618 drop(setup.watermark_rx);
620
621 tokio::time::sleep(Duration::from_millis(200)).await;
623
624 {
626 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
627 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
628 }
629
630 drop(setup.batch_tx);
632
633 setup.committer.shutdown().await.unwrap();
636 }
637}