1use std::{sync::Arc, time::Duration};
5
6use backoff::ExponentialBackoff;
7use sui_futures::{
8 service::Service,
9 stream::{Break, TrySpawnStreamExt},
10};
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tracing::{debug, error, info, warn};
14
15use crate::{
16 metrics::{CheckpointLagMetricReporter, IndexerMetrics},
17 pipeline::{CommitterConfig, WatermarkPart},
18 store::Store,
19};
20
21use super::{BatchedRows, Handler};
22
23const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
25
26const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
28
29pub(super) fn committer<H: Handler + 'static>(
39 handler: Arc<H>,
40 config: CommitterConfig,
41 rx: mpsc::Receiver<BatchedRows<H>>,
42 tx: mpsc::Sender<Vec<WatermarkPart>>,
43 db: H::Store,
44 metrics: Arc<IndexerMetrics>,
45) -> Service {
46 Service::new().spawn_aborting(async move {
47 info!(pipeline = H::NAME, "Starting committer");
48 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
49 &metrics.partially_committed_checkpoint_timestamp_lag,
50 &metrics.latest_partially_committed_checkpoint_timestamp_lag_ms,
51 &metrics.latest_partially_committed_checkpoint,
52 );
53
54 match ReceiverStream::new(rx)
55 .try_for_each_spawned(
56 config.write_concurrency,
57 |BatchedRows {
58 batch,
59 batch_len,
60 watermark,
61 }| {
62 let batch = Arc::new(batch);
63 let handler = handler.clone();
64 let tx = tx.clone();
65 let db = db.clone();
66 let metrics = metrics.clone();
67 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
68
69 let backoff = ExponentialBackoff {
73 initial_interval: INITIAL_RETRY_INTERVAL,
74 current_interval: INITIAL_RETRY_INTERVAL,
75 max_interval: MAX_RETRY_INTERVAL,
76 max_elapsed_time: None,
77 ..Default::default()
78 };
79
80 let highest_checkpoint = watermark.iter().map(|w| w.checkpoint()).max();
81 let highest_checkpoint_timestamp =
82 watermark.iter().map(|w| w.timestamp_ms()).max();
83
84 use backoff::Error as BE;
85 let commit = move || {
86 let batch = batch.clone();
87 let handler = handler.clone();
88 let db = db.clone();
89 let metrics = metrics.clone();
90 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
91 async move {
92 if batch_len == 0 {
93 return Ok(());
94 }
95
96 metrics
97 .total_committer_batches_attempted
98 .with_label_values(&[H::NAME])
99 .inc();
100
101 let guard = metrics
102 .committer_commit_latency
103 .with_label_values(&[H::NAME])
104 .start_timer();
105
106 let mut conn = db.connect().await.map_err(|e| {
107 warn!(
108 pipeline = H::NAME,
109 "Committed failed to get connection for DB"
110 );
111
112 metrics
113 .total_committer_batches_failed
114 .with_label_values(&[H::NAME])
115 .inc();
116
117 BE::transient(Break::Err(e))
118 })?;
119
120 let affected = handler.commit(&batch, &mut conn).await;
121 let elapsed = guard.stop_and_record();
122
123 match affected {
124 Ok(affected) => {
125 debug!(
126 pipeline = H::NAME,
127 elapsed_ms = elapsed * 1000.0,
128 affected,
129 committed = batch_len,
130 "Wrote batch",
131 );
132
133 checkpoint_lag_reporter.report_lag(
134 highest_checkpoint.unwrap(),
136 highest_checkpoint_timestamp.unwrap(),
137 );
138
139 metrics
140 .total_committer_batches_succeeded
141 .with_label_values(&[H::NAME])
142 .inc();
143
144 metrics
145 .total_committer_rows_committed
146 .with_label_values(&[H::NAME])
147 .inc_by(batch_len as u64);
148
149 metrics
150 .total_committer_rows_affected
151 .with_label_values(&[H::NAME])
152 .inc_by(affected as u64);
153
154 metrics
155 .committer_tx_rows
156 .with_label_values(&[H::NAME])
157 .observe(affected as f64);
158
159 Ok(())
160 }
161
162 Err(e) => {
163 warn!(
164 pipeline = H::NAME,
165 elapsed_ms = elapsed * 1000.0,
166 committed = batch_len,
167 "Error writing batch: {e}",
168 );
169
170 metrics
171 .total_committer_batches_failed
172 .with_label_values(&[H::NAME])
173 .inc();
174
175 Err(BE::transient(Break::Err(e)))
176 }
177 }
178 }
179 };
180
181 async move {
182 backoff::future::retry(backoff, commit).await?;
186 if tx.send(watermark).await.is_err() {
187 info!(pipeline = H::NAME, "Watermark closed channel");
188 return Err(Break::<anyhow::Error>::Break);
189 }
190
191 Ok(())
192 }
193 },
194 )
195 .await
196 {
197 Ok(()) => {
198 info!(pipeline = H::NAME, "Batches done, stopping committer");
199 Ok(())
200 }
201
202 Err(Break::Break) => {
203 info!(pipeline = H::NAME, "Channels closed, stopping committer");
204 Ok(())
205 }
206
207 Err(Break::Err(e)) => {
208 error!(pipeline = H::NAME, "Error from committer: {e}");
209 Err(e.context(format!("Error from committer {}", H::NAME)))
210 }
211 }
212 })
213}
214
215#[cfg(test)]
216mod tests {
217 use std::sync::{
218 Arc, Mutex,
219 atomic::{AtomicUsize, Ordering},
220 };
221
222 use anyhow::ensure;
223 use async_trait::async_trait;
224 use sui_types::full_checkpoint_content::Checkpoint;
225 use tokio::sync::mpsc;
226
227 use crate::{
228 FieldCount,
229 metrics::IndexerMetrics,
230 mocks::store::*,
231 pipeline::{
232 Processor, WatermarkPart,
233 concurrent::{BatchStatus, BatchedRows, Handler},
234 },
235 store::CommitterWatermark,
236 };
237
238 use super::*;
239
240 #[derive(Clone, FieldCount, Default)]
241 pub struct StoredData {
242 pub cp_sequence_number: u64,
243 pub tx_sequence_numbers: Vec<u64>,
244 pub commit_failure_remaining: Arc<AtomicUsize>,
248 pub commit_delay_ms: u64,
249 }
250
251 pub struct DataPipeline;
252
253 #[async_trait]
254 impl Processor for DataPipeline {
255 const NAME: &'static str = "data";
256
257 type Value = StoredData;
258
259 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
260 Ok(vec![])
261 }
262 }
263
264 #[async_trait]
265 impl Handler for DataPipeline {
266 type Store = MockStore;
267 type Batch = Vec<Self::Value>;
268
269 fn batch(
270 &self,
271 batch: &mut Self::Batch,
272 values: &mut std::vec::IntoIter<Self::Value>,
273 ) -> BatchStatus {
274 batch.extend(values);
275 BatchStatus::Pending
276 }
277
278 async fn commit<'a>(
279 &self,
280 batch: &Self::Batch,
281 conn: &mut MockConnection<'a>,
282 ) -> anyhow::Result<usize> {
283 for value in batch {
284 if value.commit_delay_ms > 0 {
286 tokio::time::sleep(Duration::from_millis(value.commit_delay_ms)).await;
287 }
288
289 {
291 let remaining = value
292 .commit_failure_remaining
293 .fetch_sub(1, Ordering::Relaxed);
294 ensure!(
295 remaining == 0,
296 "Commit failed, remaining failures: {}",
297 remaining - 1
298 );
299 }
300
301 conn.0
302 .commit_data(
303 DataPipeline::NAME,
304 value.cp_sequence_number,
305 value.tx_sequence_numbers.clone(),
306 )
307 .await?;
308 }
309 Ok(batch.len())
310 }
311 }
312
313 struct TestSetup {
314 store: MockStore,
315 batch_tx: mpsc::Sender<BatchedRows<DataPipeline>>,
316 watermark_rx: mpsc::Receiver<Vec<WatermarkPart>>,
317 committer: Service,
318 }
319
320 async fn setup_test(store: MockStore) -> TestSetup {
327 let config = CommitterConfig::default();
328 let metrics = IndexerMetrics::new(None, &Default::default());
329
330 let (batch_tx, batch_rx) = mpsc::channel::<BatchedRows<DataPipeline>>(10);
331 let (watermark_tx, watermark_rx) = mpsc::channel(10);
332
333 let store_clone = store.clone();
334 let handler = Arc::new(DataPipeline);
335 let committer = committer(
336 handler,
337 config,
338 batch_rx,
339 watermark_tx,
340 store_clone,
341 metrics,
342 );
343
344 TestSetup {
345 store,
346 batch_tx,
347 watermark_rx,
348 committer,
349 }
350 }
351
352 #[tokio::test]
353 async fn test_concurrent_batch_processing() {
354 let mut setup = setup_test(MockStore::default()).await;
355
356 let batch1 = BatchedRows::from_vec(
358 vec![
359 StoredData {
360 cp_sequence_number: 1,
361 tx_sequence_numbers: vec![1, 2, 3],
362 ..Default::default()
363 },
364 StoredData {
365 cp_sequence_number: 2,
366 tx_sequence_numbers: vec![4, 5, 6],
367 ..Default::default()
368 },
369 ],
370 vec![
371 WatermarkPart {
372 watermark: CommitterWatermark {
373 epoch_hi_inclusive: 0,
374 checkpoint_hi_inclusive: 1,
375 tx_hi: 3,
376 timestamp_ms_hi_inclusive: 1000,
377 },
378 batch_rows: 1,
379 total_rows: 1, },
381 WatermarkPart {
382 watermark: CommitterWatermark {
383 epoch_hi_inclusive: 0,
384 checkpoint_hi_inclusive: 2,
385 tx_hi: 6,
386 timestamp_ms_hi_inclusive: 2000,
387 },
388 batch_rows: 1,
389 total_rows: 1, },
391 ],
392 );
393
394 let batch2 = BatchedRows::from_vec(
395 vec![StoredData {
396 cp_sequence_number: 3,
397 tx_sequence_numbers: vec![7, 8, 9],
398 ..Default::default()
399 }],
400 vec![WatermarkPart {
401 watermark: CommitterWatermark {
402 epoch_hi_inclusive: 0,
403 checkpoint_hi_inclusive: 3,
404 tx_hi: 9,
405 timestamp_ms_hi_inclusive: 3000,
406 },
407 batch_rows: 1,
408 total_rows: 1, }],
410 );
411
412 setup.batch_tx.send(batch1).await.unwrap();
413 setup.batch_tx.send(batch2).await.unwrap();
414
415 let watermark1 = setup.watermark_rx.recv().await.unwrap();
417 let watermark2 = setup.watermark_rx.recv().await.unwrap();
418 assert_eq!(watermark1.len(), 2);
419 assert_eq!(watermark2.len(), 1);
420
421 {
423 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
424 assert_eq!(data.len(), 3);
425 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
426 assert_eq!(data.get(&2).unwrap().value(), &vec![4, 5, 6]);
427 assert_eq!(data.get(&3).unwrap().value(), &vec![7, 8, 9]);
428 }
429 }
430
431 #[tokio::test]
432 async fn test_commit_with_retries_for_commit_failure() {
433 let mut setup = setup_test(MockStore::default()).await;
434
435 let batch = BatchedRows::from_vec(
437 vec![StoredData {
438 cp_sequence_number: 1,
439 tx_sequence_numbers: vec![1, 2, 3],
440 commit_failure_remaining: Arc::new(AtomicUsize::new(1)),
441 commit_delay_ms: 1_000, }],
443 vec![WatermarkPart {
444 watermark: CommitterWatermark {
445 epoch_hi_inclusive: 0,
446 checkpoint_hi_inclusive: 1,
447 tx_hi: 3,
448 timestamp_ms_hi_inclusive: 1000,
449 },
450 batch_rows: 1,
451 total_rows: 1,
452 }],
453 );
454
455 setup.batch_tx.send(batch).await.unwrap();
457
458 tokio::time::sleep(Duration::from_millis(1_500)).await;
460
461 {
463 let data = setup.store.data.get(DataPipeline::NAME);
464 assert!(
465 data.is_none(),
466 "Data should not be committed before retry succeeds"
467 );
468 }
469 assert!(
470 setup.watermark_rx.try_recv().is_err(),
471 "No watermark should be received before retry succeeds"
472 );
473
474 tokio::time::sleep(Duration::from_millis(1_500)).await;
476
477 {
479 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
480
481 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
482 }
483 let watermark = setup.watermark_rx.recv().await.unwrap();
484 assert_eq!(watermark.len(), 1);
485 }
486
487 #[tokio::test]
488 async fn test_commit_with_retries_for_connection_failure() {
489 let store = MockStore {
491 connection_failure: Arc::new(Mutex::new(ConnectionFailure {
492 connection_failure_attempts: 1,
493 connection_delay_ms: 1_000, ..Default::default()
495 })),
496 ..Default::default()
497 };
498 let mut setup = setup_test(store).await;
499
500 let batch = BatchedRows::from_vec(
501 vec![StoredData {
502 cp_sequence_number: 1,
503 tx_sequence_numbers: vec![1, 2, 3],
504 ..Default::default()
505 }],
506 vec![WatermarkPart {
507 watermark: CommitterWatermark {
508 epoch_hi_inclusive: 0,
509 checkpoint_hi_inclusive: 1,
510 tx_hi: 3,
511 timestamp_ms_hi_inclusive: 1000,
512 },
513 batch_rows: 1,
514 total_rows: 1,
515 }],
516 );
517
518 setup.batch_tx.send(batch).await.unwrap();
520
521 tokio::time::sleep(Duration::from_millis(1_500)).await;
523
524 {
526 let data = setup.store.data.get(DataPipeline::NAME);
527 assert!(
528 data.is_none(),
529 "Data should not be committed before retry succeeds"
530 );
531 }
532 assert!(
533 setup.watermark_rx.try_recv().is_err(),
534 "No watermark should be received before retry succeeds"
535 );
536
537 tokio::time::sleep(Duration::from_millis(1_500)).await;
539
540 {
542 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
543 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
544 }
545 let watermark = setup.watermark_rx.recv().await.unwrap();
546 assert_eq!(watermark.len(), 1);
547 }
548
549 #[tokio::test]
550 async fn test_empty_batch_handling() {
551 let mut setup = setup_test(MockStore::default()).await;
552
553 let empty_batch = BatchedRows::from_vec(
554 vec![], vec![WatermarkPart {
556 watermark: CommitterWatermark {
557 epoch_hi_inclusive: 0,
558 checkpoint_hi_inclusive: 1,
559 tx_hi: 0,
560 timestamp_ms_hi_inclusive: 1000,
561 },
562 batch_rows: 0,
563 total_rows: 0,
564 }],
565 );
566
567 setup.batch_tx.send(empty_batch).await.unwrap();
569
570 let watermark = setup.watermark_rx.recv().await.unwrap();
572 assert_eq!(watermark.len(), 1);
573 assert_eq!(watermark[0].batch_rows, 0);
574 assert_eq!(watermark[0].total_rows, 0);
575
576 {
578 let data = setup.store.data.get(DataPipeline::NAME);
579 assert!(
580 data.is_none(),
581 "No data should be committed for empty batch"
582 );
583 }
584 }
585
586 #[tokio::test]
587 async fn test_watermark_channel_closed() {
588 let setup = setup_test(MockStore::default()).await;
589
590 let batch = BatchedRows::from_vec(
591 vec![StoredData {
592 cp_sequence_number: 1,
593 tx_sequence_numbers: vec![1, 2, 3],
594 ..Default::default()
595 }],
596 vec![WatermarkPart {
597 watermark: CommitterWatermark {
598 epoch_hi_inclusive: 0,
599 checkpoint_hi_inclusive: 1,
600 tx_hi: 3,
601 timestamp_ms_hi_inclusive: 1000,
602 },
603 batch_rows: 1,
604 total_rows: 1,
605 }],
606 );
607
608 setup.batch_tx.send(batch).await.unwrap();
610
611 tokio::time::sleep(Duration::from_millis(100)).await;
613
614 drop(setup.watermark_rx);
616
617 tokio::time::sleep(Duration::from_millis(200)).await;
619
620 {
622 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
623 assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
624 }
625
626 drop(setup.batch_tx);
628
629 setup.committer.shutdown().await.unwrap();
632 }
633}