1use std::{collections::BTreeMap, sync::Arc};
5
6use futures::StreamExt;
7use futures::stream::FuturesUnordered;
8use tokio::{
9 sync::Semaphore,
10 task::JoinHandle,
11 time::{MissedTickBehavior, interval},
12};
13use tokio_util::sync::CancellationToken;
14use tracing::{debug, error, info, warn};
15
16use crate::{
17 metrics::IndexerMetrics,
18 pipeline::logging::{LoggerWatermark, WatermarkLogger},
19 store::{Connection, Store},
20};
21
22use super::{Handler, PrunerConfig};
23
24#[derive(Default)]
25struct PendingRanges {
26 ranges: BTreeMap<u64, u64>,
28 last_scheduled_range: Option<(u64, u64)>,
30}
31
32impl PendingRanges {
33 fn schedule(&mut self, mut from: u64, to_exclusive: u64) {
40 let last_scheduled_range = self.last_scheduled_range.unwrap_or((0, 0));
41 if to_exclusive <= last_scheduled_range.1 {
44 return;
45 }
46 from = from.max(last_scheduled_range.1);
48 self.ranges.insert(from, to_exclusive);
49 self.last_scheduled_range = Some((from, to_exclusive));
50 }
51
52 fn len(&self) -> usize {
53 self.ranges.len()
54 }
55
56 fn iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
57 self.ranges
58 .iter()
59 .map(|(from, to_exclusive)| (*from, *to_exclusive))
60 }
61
62 fn remove(&mut self, from: &u64) {
64 self.ranges.remove(from).unwrap();
65 }
66
67 fn get_pruner_hi(&self) -> u64 {
71 self.ranges.keys().next().cloned().unwrap_or(
72 self.last_scheduled_range
73 .map(|(_, t)| t)
74 .unwrap_or_default(),
77 )
78 }
79}
80
81pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
98 handler: Arc<H>,
99 config: Option<PrunerConfig>,
100 store: H::Store,
101 metrics: Arc<IndexerMetrics>,
102 cancel: CancellationToken,
103) -> JoinHandle<()> {
104 tokio::spawn(async move {
105 let Some(config) = config else {
106 info!(pipeline = H::NAME, "Skipping pruner task");
107 return;
108 };
109
110 info!(
111 pipeline = H::NAME,
112 "Starting pruner with config: {:?}", config
113 );
114
115 let mut poll = interval(config.interval());
119 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
120
121 let mut logger = WatermarkLogger::new("pruner");
124
125 let mut pending_prune_ranges = PendingRanges::default();
129
130 loop {
131 let mut watermark = tokio::select! {
133 _ = cancel.cancelled() => {
134 info!(pipeline = H::NAME, "Shutdown received");
135 break;
136 }
137
138 _ = poll.tick() => {
139 let guard = metrics
140 .watermark_pruner_read_latency
141 .with_label_values(&[H::NAME])
142 .start_timer();
143
144 let Ok(mut conn) = store.connect().await else {
145 warn!(pipeline = H::NAME, "Pruner failed to connect, while fetching watermark");
146 continue;
147 };
148
149 match conn.pruner_watermark(H::NAME, config.delay()).await {
150 Ok(Some(current)) => {
151 guard.stop_and_record();
152 current
153 }
154
155 Ok(None) => {
156 guard.stop_and_record();
157 warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
158 continue;
159 }
160
161 Err(e) => {
162 guard.stop_and_record();
163 warn!(pipeline = H::NAME, "Failed to get watermark: {e}");
164 continue;
165 }
166 }
167 }
168 };
169
170 if let Some(wait_for) = watermark.wait_for() {
172 debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune");
173 tokio::select! {
174 _ = tokio::time::sleep(wait_for) => {}
175 _ = cancel.cancelled() => {
176 info!(pipeline = H::NAME, "Shutdown received");
177 break;
178 }
179 }
180 }
181
182 let mut highest_pruned = watermark.pruner_hi;
185 let mut highest_watermarked = watermark.pruner_hi;
187
188 while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
191 pending_prune_ranges.schedule(from, to_exclusive);
192 }
193
194 debug!(
195 pipeline = H::NAME,
196 "Number of chunks to prune: {}",
197 pending_prune_ranges.len()
198 );
199
200 let semaphore = Arc::new(Semaphore::new(config.prune_concurrency as usize));
204 let mut tasks = FuturesUnordered::new();
205 for (from, to_exclusive) in pending_prune_ranges.iter() {
206 let semaphore = semaphore.clone();
207 let cancel = cancel.child_token();
208 let metrics = metrics.clone();
209 let handler = handler.clone();
210
211 let db = store.clone();
212
213 tasks.push(tokio::spawn(async move {
214 let _permit = tokio::select! {
215 permit = semaphore.acquire() => {
216 permit.unwrap()
217 }
218 _ = cancel.cancelled() => {
219 return ((from, to_exclusive), Err(anyhow::anyhow!("Cancelled")));
220 }
221 };
222 let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
223 ((from, to_exclusive), result)
224 }));
225 }
226
227 while let Some(r) = tasks.next().await {
233 let ((from, to_exclusive), result) = r.unwrap();
234 match result {
235 Ok(()) => {
236 pending_prune_ranges.remove(&from);
237 let pruner_hi = pending_prune_ranges.get_pruner_hi();
238 highest_pruned = highest_pruned.max(pruner_hi);
239 }
240 Err(e) => {
241 error!(
242 pipeline = H::NAME,
243 "Failed to prune data for range: {from} to {to_exclusive}: {e}"
244 );
245 }
246 }
247
248 if highest_pruned > highest_watermarked {
249 metrics
250 .watermark_pruner_hi
251 .with_label_values(&[H::NAME])
252 .set(highest_pruned as i64);
253
254 let guard = metrics
255 .watermark_pruner_write_latency
256 .with_label_values(&[H::NAME])
257 .start_timer();
258
259 let Ok(mut conn) = store.connect().await else {
260 warn!(
261 pipeline = H::NAME,
262 "Pruner failed to connect while updating watermark"
263 );
264 continue;
265 };
266
267 match conn.set_pruner_watermark(H::NAME, highest_pruned).await {
268 Err(e) => {
269 let elapsed = guard.stop_and_record();
270 error!(
271 pipeline = H::NAME,
272 elapsed_ms = elapsed * 1000.0,
273 "Failed to update pruner watermark: {e}"
274 )
275 }
276 Ok(true) => {
277 highest_watermarked = highest_pruned;
278 let elapsed = guard.stop_and_record();
279 logger.log::<H>(
280 LoggerWatermark::checkpoint(highest_watermarked),
281 elapsed,
282 );
283
284 metrics
285 .watermark_pruner_hi_in_db
286 .with_label_values(&[H::NAME])
287 .set(highest_watermarked as i64);
288 }
289 Ok(false) => {}
290 }
291 }
292 }
293 }
294
295 info!(pipeline = H::NAME, "Stopping pruner");
296 })
297}
298
299async fn prune_task_impl<H: Handler + Send + Sync + 'static>(
300 metrics: Arc<IndexerMetrics>,
301 db: H::Store,
302 handler: Arc<H>,
303 from: u64,
304 to_exclusive: u64,
305) -> Result<(), anyhow::Error> {
306 metrics
307 .total_pruner_chunks_attempted
308 .with_label_values(&[H::NAME])
309 .inc();
310
311 let guard = metrics
312 .pruner_delete_latency
313 .with_label_values(&[H::NAME])
314 .start_timer();
315
316 let mut conn = db.connect().await?;
317
318 debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");
319
320 let affected = match handler.prune(from, to_exclusive, &mut conn).await {
321 Ok(affected) => {
322 guard.stop_and_record();
323 affected
324 }
325
326 Err(e) => {
327 guard.stop_and_record();
328 return Err(e);
329 }
330 };
331
332 metrics
333 .total_pruner_chunks_deleted
334 .with_label_values(&[H::NAME])
335 .inc();
336
337 metrics
338 .total_pruner_rows_deleted
339 .with_label_values(&[H::NAME])
340 .inc_by(affected as u64);
341
342 Ok(())
343}
344
345#[cfg(test)]
346mod tests {
347 use std::sync::Arc;
348 use std::{
349 collections::HashMap,
350 time::{SystemTime, UNIX_EPOCH},
351 };
352
353 use async_trait::async_trait;
354 use prometheus::Registry;
355 use sui_types::full_checkpoint_content::CheckpointData;
356 use tokio::time::Duration;
357 use tokio_util::sync::CancellationToken;
358
359 use crate::{FieldCount, metrics::IndexerMetrics, mocks::store::*, pipeline::Processor};
360
361 use super::*;
362
363 #[derive(Clone, FieldCount)]
364 pub struct StoredData;
365
366 pub struct DataPipeline;
367
368 #[async_trait]
369 impl Processor for DataPipeline {
370 const NAME: &'static str = "data";
371
372 type Value = StoredData;
373
374 async fn process(
375 &self,
376 _checkpoint: &Arc<CheckpointData>,
377 ) -> anyhow::Result<Vec<Self::Value>> {
378 Ok(vec![])
379 }
380 }
381
382 #[async_trait]
383 impl Handler for DataPipeline {
384 type Store = MockStore;
385
386 async fn commit<'a>(
387 values: &[Self::Value],
388 _conn: &mut MockConnection<'a>,
389 ) -> anyhow::Result<usize> {
390 Ok(values.len())
391 }
392
393 async fn prune<'a>(
394 &self,
395 from: u64,
396 to_exclusive: u64,
397 conn: &mut MockConnection<'a>,
398 ) -> anyhow::Result<usize> {
399 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
400 }
401 }
402
403 #[test]
404 fn test_pending_ranges_basic_scheduling() {
405 let mut ranges = PendingRanges::default();
406
407 ranges.schedule(1, 5);
409
410 ranges.schedule(10, 15);
412
413 let scheduled: Vec<_> = ranges.iter().collect();
415 assert_eq!(scheduled, vec![(1, 5), (10, 15)]);
416 }
417
418 #[test]
419 fn test_pending_ranges_double_pruning_prevention() {
420 let mut ranges = PendingRanges::default();
421
422 ranges.schedule(1, 5);
424
425 ranges.schedule(3, 7);
427
428 let scheduled: Vec<_> = ranges.iter().collect();
429 assert_eq!(scheduled, vec![(1, 5), (5, 7)]);
430
431 ranges.schedule(2, 4); assert_eq!(ranges.len(), 2); let scheduled: Vec<_> = ranges.iter().collect();
436 assert_eq!(scheduled, vec![(1, 5), (5, 7)]); }
438
439 #[test]
440 fn test_pending_ranges_exact_duplicate() {
441 let mut ranges = PendingRanges::default();
442
443 ranges.schedule(1, 5);
445 assert_eq!(ranges.len(), 1);
446
447 ranges.schedule(1, 5);
449 assert_eq!(ranges.len(), 1); let scheduled: Vec<_> = ranges.iter().collect();
452 assert_eq!(scheduled, vec![(1, 5)]);
453 }
454
455 #[test]
456 fn test_pending_ranges_adjacent_ranges() {
457 let mut ranges = PendingRanges::default();
458
459 ranges.schedule(1, 5);
461
462 ranges.schedule(5, 10);
464
465 let scheduled: Vec<_> = ranges.iter().collect();
466 assert_eq!(scheduled, vec![(1, 5), (5, 10)]);
467 }
468
469 #[test]
470 fn test_pending_ranges_remove_and_watermark() {
471 let mut ranges = PendingRanges::default();
472
473 ranges.schedule(1, 5);
475 ranges.schedule(10, 15);
476 ranges.schedule(20, 25);
477 assert_eq!(ranges.len(), 3);
478 assert_eq!(ranges.get_pruner_hi(), 1);
479
480 ranges.remove(&1);
482 assert_eq!(ranges.len(), 2);
483 assert_eq!(ranges.get_pruner_hi(), 10); ranges.remove(&10);
487 assert_eq!(ranges.len(), 1);
488 assert_eq!(ranges.get_pruner_hi(), 20);
489
490 ranges.remove(&20);
492 assert_eq!(ranges.len(), 0);
493 assert_eq!(ranges.get_pruner_hi(), 25); }
495
496 #[test]
497 fn test_pending_ranges_remove_and_watermark_out_of_order() {
498 let mut ranges = PendingRanges::default();
499
500 ranges.schedule(1, 5);
502 ranges.schedule(10, 15);
503 ranges.schedule(20, 25);
504 assert_eq!(ranges.len(), 3);
505 assert_eq!(ranges.get_pruner_hi(), 1);
506
507 ranges.remove(&10);
509 assert_eq!(ranges.len(), 2);
510 assert_eq!(ranges.get_pruner_hi(), 1);
511
512 ranges.remove(&1);
514 assert_eq!(ranges.len(), 1);
515 assert_eq!(ranges.get_pruner_hi(), 20);
516
517 ranges.remove(&20);
519 assert_eq!(ranges.len(), 0);
520 assert_eq!(ranges.get_pruner_hi(), 25); }
522
523 #[tokio::test]
524 async fn test_pruner() {
525 let handler = Arc::new(DataPipeline);
526 let pruner_config = PrunerConfig {
527 interval_ms: 10,
528 delay_ms: 2000,
529 retention: 1,
530 max_chunk_size: 100,
531 prune_concurrency: 1,
532 };
533 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
534 let metrics = IndexerMetrics::new(None, ®istry);
535 let cancel = CancellationToken::new();
536
537 let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
539 let timestamp = SystemTime::now()
541 .duration_since(UNIX_EPOCH)
542 .unwrap()
543 .as_millis() as u64;
544
545 let watermark = MockWatermark {
546 epoch_hi_inclusive: 0,
547 checkpoint_hi_inclusive: 3,
548 tx_hi: 9,
549 timestamp_ms_hi_inclusive: timestamp,
550 reader_lo: 3,
551 pruner_timestamp: timestamp,
552 pruner_hi: 0,
553 };
554 let store = MockStore::new()
555 .with_watermark(DataPipeline::NAME, watermark)
556 .with_data(DataPipeline::NAME, test_data);
557
558 let store_clone = store.clone();
560 let cancel_clone = cancel.clone();
561 let pruner_handle = tokio::spawn(async move {
562 pruner(
563 handler,
564 Some(pruner_config),
565 store_clone,
566 metrics,
567 cancel_clone,
568 )
569 .await
570 });
571
572 tokio::time::sleep(Duration::from_millis(200)).await;
574 {
575 let data = store.data.get(DataPipeline::NAME).unwrap();
576 assert!(
577 data.contains_key(&1),
578 "Checkpoint 1 shouldn't be pruned before delay"
579 );
580 assert!(
581 data.contains_key(&2),
582 "Checkpoint 2 shouldn't be pruned before delay"
583 );
584 assert!(
585 data.contains_key(&3),
586 "Checkpoint 3 shouldn't be pruned before delay"
587 );
588 }
589
590 tokio::time::sleep(Duration::from_millis(2000)).await;
592
593 {
595 let data = store.data.get(DataPipeline::NAME).unwrap();
596 assert!(
597 !data.contains_key(&1),
598 "Checkpoint 1 should be pruned after delay"
599 );
600
601 assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
603
604 let watermark = store.watermark(DataPipeline::NAME).unwrap();
606 assert!(
607 watermark.pruner_hi > 1,
608 "Pruner watermark should be updated"
609 );
610 }
611
612 cancel.cancel();
614 let _ = pruner_handle.await;
615 }
616
617 #[tokio::test]
618 async fn test_pruner_timestamp_in_the_past() {
619 let handler = Arc::new(DataPipeline);
620 let pruner_config = PrunerConfig {
621 interval_ms: 10,
622 delay_ms: 20_000,
623 retention: 1,
624 max_chunk_size: 100,
625 prune_concurrency: 1,
626 };
627 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
628 let metrics = IndexerMetrics::new(None, ®istry);
629 let cancel = CancellationToken::new();
630
631 let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
633 let timestamp = SystemTime::now()
635 .duration_since(UNIX_EPOCH)
636 .unwrap()
637 .as_millis() as u64;
638
639 let watermark = MockWatermark {
640 epoch_hi_inclusive: 0,
641 checkpoint_hi_inclusive: 3,
642 tx_hi: 9,
643 timestamp_ms_hi_inclusive: timestamp,
644 reader_lo: 3,
645 pruner_timestamp: 0,
646 pruner_hi: 0,
647 };
648 let store = MockStore::new()
649 .with_watermark(DataPipeline::NAME, watermark)
650 .with_data(DataPipeline::NAME, test_data);
651
652 let store_clone = store.clone();
654 let cancel_clone = cancel.clone();
655 let pruner_handle = tokio::spawn(async move {
656 pruner(
657 handler,
658 Some(pruner_config),
659 store_clone,
660 metrics,
661 cancel_clone,
662 )
663 .await
664 });
665
666 tokio::time::sleep(Duration::from_millis(500)).await;
670
671 {
672 let data = store.data.get(DataPipeline::NAME).unwrap();
673 assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
674
675 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
676
677 assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
679
680 let watermark = store.watermark(DataPipeline::NAME).unwrap();
682 assert!(
683 watermark.pruner_hi > 1,
684 "Pruner watermark should be updated"
685 );
686 }
687
688 cancel.cancel();
690 let _ = pruner_handle.await;
691 }
692
693 #[tokio::test]
694 async fn test_pruner_watermark_update_with_retries() {
695 let handler = Arc::new(DataPipeline);
696 let pruner_config = PrunerConfig {
697 interval_ms: 3_000, delay_ms: 100, retention: 1,
700 max_chunk_size: 1, prune_concurrency: 1,
702 };
703 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
704 let metrics = IndexerMetrics::new(None, ®istry);
705 let cancel = CancellationToken::new();
706
707 let test_data = HashMap::from([
709 (1, vec![1, 2]),
710 (2, vec![3, 4]),
711 (3, vec![5, 6]),
712 (4, vec![7, 8]),
713 ]);
714
715 let timestamp = SystemTime::now()
716 .duration_since(UNIX_EPOCH)
717 .unwrap()
718 .as_millis() as u64;
719
720 let watermark = MockWatermark {
721 epoch_hi_inclusive: 0,
722 checkpoint_hi_inclusive: 4,
723 tx_hi: 8,
724 timestamp_ms_hi_inclusive: timestamp,
725 reader_lo: 4, pruner_timestamp: 0, pruner_hi: 1,
728 };
729
730 let store = MockStore::new()
732 .with_watermark(DataPipeline::NAME, watermark)
733 .with_data(DataPipeline::NAME, test_data.clone())
734 .with_prune_failures(1, 2, 1);
735
736 let store_clone = store.clone();
738 let cancel_clone = cancel.clone();
739 let pruner_handle = tokio::spawn(async move {
740 pruner(
741 handler,
742 Some(pruner_config),
743 store_clone,
744 metrics,
745 cancel_clone,
746 )
747 .await
748 });
749
750 tokio::time::sleep(Duration::from_millis(500)).await;
752 {
753 let data = store.data.get(DataPipeline::NAME).unwrap();
754 let watermarks = store.watermark(DataPipeline::NAME).unwrap();
755
756 assert_eq!(
758 watermarks.pruner_hi, 1,
759 "Pruner watermark should remain at 1 because range [1,2) failed"
760 );
761 assert!(data.contains_key(&1), "Checkpoint 1 should be preserved");
762 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
763 assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
764 assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
765 }
766
767 tokio::time::sleep(Duration::from_millis(3000)).await;
769 {
770 let data = store.data.get(DataPipeline::NAME).unwrap();
771 let watermarks = store.watermark(DataPipeline::NAME).unwrap();
772
773 assert_eq!(
775 watermarks.pruner_hi, 4,
776 "Pruner watermark should advance to 4 after all ranges complete"
777 );
778 assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
779 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
780 assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
781 assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
782 }
783
784 cancel.cancel();
786 let _ = pruner_handle.await;
787 }
788}