1use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::service::Service;
9use sui_futures::stream::Break;
10use sui_futures::stream::TrySpawnStreamExt;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18use crate::metrics::CheckpointLagMetricReporter;
19use crate::metrics::IndexerMetrics;
20use crate::pipeline::CommitterConfig;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::BatchedRows;
23use crate::pipeline::concurrent::Handler;
24use crate::store::Store;
25
26const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
28
29const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
31
32pub(super) fn committer<H: Handler + 'static>(
42 handler: Arc<H>,
43 config: CommitterConfig,
44 rx: mpsc::Receiver<BatchedRows<H>>,
45 tx: mpsc::Sender<Vec<WatermarkPart>>,
46 db: H::Store,
47 metrics: Arc<IndexerMetrics>,
48) -> Service {
49 Service::new().spawn_aborting(async move {
50 info!(pipeline = H::NAME, "Starting committer");
51 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
52 &metrics.partially_committed_checkpoint_timestamp_lag,
53 &metrics.latest_partially_committed_checkpoint_timestamp_lag_ms,
54 &metrics.latest_partially_committed_checkpoint,
55 );
56
57 match ReceiverStream::new(rx)
58 .try_for_each_spawned(
59 config.write_concurrency,
60 |BatchedRows {
61 batch,
62 batch_len,
63 watermark,
64 }| {
65 let batch = Arc::new(batch);
66 let handler = handler.clone();
67 let tx = tx.clone();
68 let db = db.clone();
69 let metrics = metrics.clone();
70 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
71
72 let backoff = ExponentialBackoff {
76 initial_interval: INITIAL_RETRY_INTERVAL,
77 current_interval: INITIAL_RETRY_INTERVAL,
78 max_interval: MAX_RETRY_INTERVAL,
79 max_elapsed_time: None,
80 ..Default::default()
81 };
82
83 let highest_checkpoint = watermark.iter().map(|w| w.checkpoint()).max();
84 let highest_checkpoint_timestamp =
85 watermark.iter().map(|w| w.timestamp_ms()).max();
86
87 use backoff::Error as BE;
88 let commit = move || {
89 let batch = batch.clone();
90 let handler = handler.clone();
91 let db = db.clone();
92 let metrics = metrics.clone();
93 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
94 async move {
95 if batch_len == 0 {
96 return Ok(());
97 }
98
99 metrics
100 .total_committer_batches_attempted
101 .with_label_values(&[H::NAME])
102 .inc();
103
104 let guard = metrics
105 .committer_commit_latency
106 .with_label_values(&[H::NAME])
107 .start_timer();
108
109 let mut conn = db.connect().await.map_err(|e| {
110 warn!(
111 pipeline = H::NAME,
112 "Committed failed to get connection for DB"
113 );
114
115 metrics
116 .total_committer_batches_failed
117 .with_label_values(&[H::NAME])
118 .inc();
119
120 BE::transient(Break::Err(e))
121 })?;
122
123 let affected = handler.commit(&batch, &mut conn).await;
124 let elapsed = guard.stop_and_record();
125
126 match affected {
127 Ok(affected) => {
128 debug!(
129 pipeline = H::NAME,
130 elapsed_ms = elapsed * 1000.0,
131 affected,
132 committed = batch_len,
133 "Wrote batch",
134 );
135
136 checkpoint_lag_reporter.report_lag(
137 highest_checkpoint.unwrap(),
139 highest_checkpoint_timestamp.unwrap(),
140 );
141
142 metrics
143 .total_committer_batches_succeeded
144 .with_label_values(&[H::NAME])
145 .inc();
146
147 metrics
148 .total_committer_rows_committed
149 .with_label_values(&[H::NAME])
150 .inc_by(batch_len as u64);
151
152 metrics
153 .total_committer_rows_affected
154 .with_label_values(&[H::NAME])
155 .inc_by(affected as u64);
156
157 metrics
158 .committer_tx_rows
159 .with_label_values(&[H::NAME])
160 .observe(affected as f64);
161
162 Ok(())
163 }
164
165 Err(e) => {
166 warn!(
167 pipeline = H::NAME,
168 elapsed_ms = elapsed * 1000.0,
169 committed = batch_len,
170 "Error writing batch: {e}",
171 );
172
173 metrics
174 .total_committer_batches_failed
175 .with_label_values(&[H::NAME])
176 .inc();
177
178 Err(BE::transient(Break::Err(e)))
179 }
180 }
181 }
182 };
183
184 async move {
185 backoff::future::retry(backoff, commit).await?;
189 if tx.send(watermark).await.is_err() {
190 info!(pipeline = H::NAME, "Watermark closed channel");
191 return Err(Break::<anyhow::Error>::Break);
192 }
193
194 Ok(())
195 }
196 },
197 )
198 .await
199 {
200 Ok(()) => {
201 info!(pipeline = H::NAME, "Batches done, stopping committer");
202 Ok(())
203 }
204
205 Err(Break::Break) => {
206 info!(pipeline = H::NAME, "Channels closed, stopping committer");
207 Ok(())
208 }
209
210 Err(Break::Err(e)) => {
211 error!(pipeline = H::NAME, "Error from committer: {e}");
212 Err(e.context(format!("Error from committer {}", H::NAME)))
213 }
214 }
215 })
216}
217
218#[cfg(test)]
219mod tests {
220 use std::sync::Arc;
221 use std::sync::Mutex;
222 use std::sync::atomic::AtomicUsize;
223 use std::sync::atomic::Ordering;
224
225 use anyhow::ensure;
226 use async_trait::async_trait;
227 use sui_types::full_checkpoint_content::Checkpoint;
228 use tokio::sync::mpsc;
229
230 use crate::FieldCount;
231 use crate::metrics::IndexerMetrics;
232 use crate::mocks::store::*;
233 use crate::pipeline::Processor;
234 use crate::pipeline::WatermarkPart;
235 use crate::pipeline::concurrent::BatchStatus;
236 use crate::pipeline::concurrent::BatchedRows;
237 use crate::pipeline::concurrent::Handler;
238 use crate::store::CommitterWatermark;
239
240 use super::*;
241
242 #[derive(Clone, FieldCount, Default)]
243 pub struct StoredData {
244 pub cp_sequence_number: u64,
245 pub tx_sequence_numbers: Vec<u64>,
246 pub commit_failure_remaining: Arc<AtomicUsize>,
250 pub commit_delay_ms: u64,
251 }
252
253 pub struct DataPipeline;
254
255 #[async_trait]
256 impl Processor for DataPipeline {
257 const NAME: &'static str = "data";
258
259 type Value = StoredData;
260
261 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
262 Ok(vec![])
263 }
264 }
265
266 #[async_trait]
267 impl Handler for DataPipeline {
268 type Store = MockStore;
269 type Batch = Vec<Self::Value>;
270
271 fn batch(
272 &self,
273 batch: &mut Self::Batch,
274 values: &mut std::vec::IntoIter<Self::Value>,
275 ) -> BatchStatus {
276 batch.extend(values);
277 BatchStatus::Pending
278 }
279
280 async fn commit<'a>(
281 &self,
282 batch: &Self::Batch,
283 conn: &mut MockConnection<'a>,
284 ) -> anyhow::Result<usize> {
285 for value in batch {
286 if value.commit_delay_ms > 0 {
288 tokio::time::sleep(Duration::from_millis(value.commit_delay_ms)).await;
289 }
290
291 {
293 let remaining = value
294 .commit_failure_remaining
295 .fetch_sub(1, Ordering::Relaxed);
296 ensure!(
297 remaining == 0,
298 "Commit failed, remaining failures: {}",
299 remaining - 1
300 );
301 }
302
303 conn.0
304 .commit_data(
305 DataPipeline::NAME,
306 value.cp_sequence_number,
307 value.tx_sequence_numbers.clone(),
308 )
309 .await?;
310 }
311 Ok(batch.len())
312 }
313 }
314
315 struct TestSetup {
316 store: MockStore,
317 batch_tx: mpsc::Sender<BatchedRows<DataPipeline>>,
318 watermark_rx: mpsc::Receiver<Vec<WatermarkPart>>,
319 committer: Service,
320 }
321
322 async fn setup_test(store: MockStore) -> TestSetup {
329 let config = CommitterConfig::default();
330 let metrics = IndexerMetrics::new(None, &Default::default());
331
332 let (batch_tx, batch_rx) = mpsc::channel::<BatchedRows<DataPipeline>>(10);
333 let (watermark_tx, watermark_rx) = mpsc::channel(10);
334
335 let store_clone = store.clone();
336 let handler = Arc::new(DataPipeline);
337 let committer = committer(
338 handler,
339 config,
340 batch_rx,
341 watermark_tx,
342 store_clone,
343 metrics,
344 );
345
346 TestSetup {
347 store,
348 batch_tx,
349 watermark_rx,
350 committer,
351 }
352 }
353
354 #[tokio::test]
355 async fn test_concurrent_batch_processing() {
356 let mut setup = setup_test(MockStore::default()).await;
357
358 let batch1 = BatchedRows::from_vec(
360 vec![
361 StoredData {
362 cp_sequence_number: 1,
363 tx_sequence_numbers: vec![1, 2, 3],
364 ..Default::default()
365 },
366 StoredData {
367 cp_sequence_number: 2,
368 tx_sequence_numbers: vec![4, 5, 6],
369 ..Default::default()
370 },
371 ],
372 vec![
373 WatermarkPart {
374 watermark: CommitterWatermark {
375 epoch_hi_inclusive: 0,
376 checkpoint_hi_inclusive: 1,
377 tx_hi: 3,
378 timestamp_ms_hi_inclusive: 1000,
379 },
380 batch_rows: 1,
381 total_rows: 1, },
383 WatermarkPart {
384 watermark: CommitterWatermark {
385 epoch_hi_inclusive: 0,
386 checkpoint_hi_inclusive: 2,
387 tx_hi: 6,
388 timestamp_ms_hi_inclusive: 2000,
389 },
390 batch_rows: 1,
391 total_rows: 1, },
393 ],
394 );
395
396 let batch2 = BatchedRows::from_vec(
397 vec![StoredData {
398 cp_sequence_number: 3,
399 tx_sequence_numbers: vec![7, 8, 9],
400 ..Default::default()
401 }],
402 vec![WatermarkPart {
403 watermark: CommitterWatermark {
404 epoch_hi_inclusive: 0,
405 checkpoint_hi_inclusive: 3,
406 tx_hi: 9,
407 timestamp_ms_hi_inclusive: 3000,
408 },
409 batch_rows: 1,
410 total_rows: 1, }],
412 );
413
414 setup.batch_tx.send(batch1).await.unwrap();
415 setup.batch_tx.send(batch2).await.unwrap();
416
417 let watermark1 = setup.watermark_rx.recv().await.unwrap();
419 let watermark2 = setup.watermark_rx.recv().await.unwrap();
420 assert_eq!(watermark1.len(), 2);
421 assert_eq!(watermark2.len(), 1);
422
423 {
425 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
426 assert_eq!(data.len(), 3);
427 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
428 assert_eq!(data.get(&2).unwrap().value(), &vec![4, 5, 6]);
429 assert_eq!(data.get(&3).unwrap().value(), &vec![7, 8, 9]);
430 }
431 }
432
433 #[tokio::test]
434 async fn test_commit_with_retries_for_commit_failure() {
435 let mut setup = setup_test(MockStore::default()).await;
436
437 let batch = BatchedRows::from_vec(
439 vec![StoredData {
440 cp_sequence_number: 1,
441 tx_sequence_numbers: vec![1, 2, 3],
442 commit_failure_remaining: Arc::new(AtomicUsize::new(1)),
443 commit_delay_ms: 1_000, }],
445 vec![WatermarkPart {
446 watermark: CommitterWatermark {
447 epoch_hi_inclusive: 0,
448 checkpoint_hi_inclusive: 1,
449 tx_hi: 3,
450 timestamp_ms_hi_inclusive: 1000,
451 },
452 batch_rows: 1,
453 total_rows: 1,
454 }],
455 );
456
457 setup.batch_tx.send(batch).await.unwrap();
459
460 tokio::time::sleep(Duration::from_millis(1_500)).await;
462
463 {
465 let data = setup.store.data.get(DataPipeline::NAME);
466 assert!(
467 data.is_none(),
468 "Data should not be committed before retry succeeds"
469 );
470 }
471 assert!(
472 setup.watermark_rx.try_recv().is_err(),
473 "No watermark should be received before retry succeeds"
474 );
475
476 tokio::time::sleep(Duration::from_millis(1_500)).await;
478
479 {
481 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
482
483 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
484 }
485 let watermark = setup.watermark_rx.recv().await.unwrap();
486 assert_eq!(watermark.len(), 1);
487 }
488
489 #[tokio::test]
490 async fn test_commit_with_retries_for_connection_failure() {
491 let store = MockStore {
493 connection_failure: Arc::new(Mutex::new(ConnectionFailure {
494 connection_failure_attempts: 1,
495 connection_delay_ms: 1_000, ..Default::default()
497 })),
498 ..Default::default()
499 };
500 let mut setup = setup_test(store).await;
501
502 let batch = BatchedRows::from_vec(
503 vec![StoredData {
504 cp_sequence_number: 1,
505 tx_sequence_numbers: vec![1, 2, 3],
506 ..Default::default()
507 }],
508 vec![WatermarkPart {
509 watermark: CommitterWatermark {
510 epoch_hi_inclusive: 0,
511 checkpoint_hi_inclusive: 1,
512 tx_hi: 3,
513 timestamp_ms_hi_inclusive: 1000,
514 },
515 batch_rows: 1,
516 total_rows: 1,
517 }],
518 );
519
520 setup.batch_tx.send(batch).await.unwrap();
522
523 tokio::time::sleep(Duration::from_millis(1_500)).await;
525
526 {
528 let data = setup.store.data.get(DataPipeline::NAME);
529 assert!(
530 data.is_none(),
531 "Data should not be committed before retry succeeds"
532 );
533 }
534 assert!(
535 setup.watermark_rx.try_recv().is_err(),
536 "No watermark should be received before retry succeeds"
537 );
538
539 tokio::time::sleep(Duration::from_millis(1_500)).await;
541
542 {
544 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
545 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
546 }
547 let watermark = setup.watermark_rx.recv().await.unwrap();
548 assert_eq!(watermark.len(), 1);
549 }
550
551 #[tokio::test]
552 async fn test_empty_batch_handling() {
553 let mut setup = setup_test(MockStore::default()).await;
554
555 let empty_batch = BatchedRows::from_vec(
556 vec![], vec![WatermarkPart {
558 watermark: CommitterWatermark {
559 epoch_hi_inclusive: 0,
560 checkpoint_hi_inclusive: 1,
561 tx_hi: 0,
562 timestamp_ms_hi_inclusive: 1000,
563 },
564 batch_rows: 0,
565 total_rows: 0,
566 }],
567 );
568
569 setup.batch_tx.send(empty_batch).await.unwrap();
571
572 let watermark = setup.watermark_rx.recv().await.unwrap();
574 assert_eq!(watermark.len(), 1);
575 assert_eq!(watermark[0].batch_rows, 0);
576 assert_eq!(watermark[0].total_rows, 0);
577
578 {
580 let data = setup.store.data.get(DataPipeline::NAME);
581 assert!(
582 data.is_none(),
583 "No data should be committed for empty batch"
584 );
585 }
586 }
587
588 #[tokio::test]
589 async fn test_watermark_channel_closed() {
590 let setup = setup_test(MockStore::default()).await;
591
592 let batch = BatchedRows::from_vec(
593 vec![StoredData {
594 cp_sequence_number: 1,
595 tx_sequence_numbers: vec![1, 2, 3],
596 ..Default::default()
597 }],
598 vec![WatermarkPart {
599 watermark: CommitterWatermark {
600 epoch_hi_inclusive: 0,
601 checkpoint_hi_inclusive: 1,
602 tx_hi: 3,
603 timestamp_ms_hi_inclusive: 1000,
604 },
605 batch_rows: 1,
606 total_rows: 1,
607 }],
608 );
609
610 setup.batch_tx.send(batch).await.unwrap();
612
613 tokio::time::sleep(Duration::from_millis(100)).await;
615
616 drop(setup.watermark_rx);
618
619 tokio::time::sleep(Duration::from_millis(200)).await;
621
622 {
624 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
625 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
626 }
627
628 drop(setup.batch_tx);
630
631 setup.committer.shutdown().await.unwrap();
634 }
635}