1use std::{collections::BTreeMap, sync::Arc};
5
6use futures::StreamExt;
7use futures::stream::FuturesUnordered;
8use sui_futures::service::Service;
9use tokio::{
10 sync::Semaphore,
11 time::{MissedTickBehavior, interval},
12};
13use tracing::{debug, error, info, warn};
14
15use crate::{
16 metrics::IndexerMetrics,
17 pipeline::logging::{LoggerWatermark, WatermarkLogger},
18 store::{Connection, Store},
19};
20
21use super::{Handler, PrunerConfig};
22
23#[derive(Default)]
24struct PendingRanges {
25 ranges: BTreeMap<u64, u64>,
27 last_scheduled_range: Option<(u64, u64)>,
29}
30
31impl PendingRanges {
32 fn schedule(&mut self, mut from: u64, to_exclusive: u64) {
39 let last_scheduled_range = self.last_scheduled_range.unwrap_or((0, 0));
40 if to_exclusive <= last_scheduled_range.1 {
43 return;
44 }
45 from = from.max(last_scheduled_range.1);
47 self.ranges.insert(from, to_exclusive);
48 self.last_scheduled_range = Some((from, to_exclusive));
49 }
50
51 fn len(&self) -> usize {
52 self.ranges.len()
53 }
54
55 fn iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
56 self.ranges
57 .iter()
58 .map(|(from, to_exclusive)| (*from, *to_exclusive))
59 }
60
61 fn remove(&mut self, from: &u64) {
63 self.ranges.remove(from).unwrap();
64 }
65
66 fn get_pruner_hi(&self) -> u64 {
70 self.ranges.keys().next().cloned().unwrap_or(
71 self.last_scheduled_range
72 .map(|(_, t)| t)
73 .unwrap_or_default(),
76 )
77 }
78}
79
80pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
96 handler: Arc<H>,
97 config: Option<PrunerConfig>,
98 store: H::Store,
99 metrics: Arc<IndexerMetrics>,
100) -> Service {
101 Service::new().spawn_aborting(async move {
102 let Some(config) = config else {
103 info!(pipeline = H::NAME, "Skipping pruner task");
104 return Ok(());
105 };
106
107 info!(
108 pipeline = H::NAME,
109 "Starting pruner with config: {:?}", config
110 );
111
112 let mut poll = interval(config.interval());
116 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
117
118 let mut logger = WatermarkLogger::new("pruner");
121
122 let mut pending_prune_ranges = PendingRanges::default();
126
127 loop {
128 poll.tick().await;
129
130 let mut watermark = {
132 let guard = metrics
133 .watermark_pruner_read_latency
134 .with_label_values(&[H::NAME])
135 .start_timer();
136
137 let Ok(mut conn) = store.connect().await else {
138 warn!(
139 pipeline = H::NAME,
140 "Pruner failed to connect, while fetching watermark"
141 );
142 continue;
143 };
144
145 match conn.pruner_watermark(H::NAME, config.delay()).await {
146 Ok(Some(current)) => {
147 guard.stop_and_record();
148 current
149 }
150
151 Ok(None) => {
152 guard.stop_and_record();
153 warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
154 continue;
155 }
156
157 Err(e) => {
158 guard.stop_and_record();
159 warn!(pipeline = H::NAME, "Failed to get watermark: {e}");
160 continue;
161 }
162 }
163 };
164
165 if let Some(wait_for) = watermark.wait_for() {
167 debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune");
168 tokio::time::sleep(wait_for).await;
169 }
170
171 let mut highest_pruned = watermark.pruner_hi;
174 let mut highest_watermarked = watermark.pruner_hi;
176
177 while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
180 pending_prune_ranges.schedule(from, to_exclusive);
181 }
182
183 debug!(
184 pipeline = H::NAME,
185 "Number of chunks to prune: {}",
186 pending_prune_ranges.len()
187 );
188
189 let semaphore = Arc::new(Semaphore::new(config.prune_concurrency as usize));
193 let mut tasks = FuturesUnordered::new();
194 for (from, to_exclusive) in pending_prune_ranges.iter() {
195 let semaphore = semaphore.clone();
196 let metrics = metrics.clone();
197 let handler = handler.clone();
198
199 let db = store.clone();
200
201 tasks.push(tokio::spawn(async move {
202 let _permit = semaphore.acquire().await.unwrap();
203 let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
204 ((from, to_exclusive), result)
205 }));
206 }
207
208 while let Some(r) = tasks.next().await {
214 let ((from, to_exclusive), result) = r.unwrap();
215 match result {
216 Ok(()) => {
217 pending_prune_ranges.remove(&from);
218 let pruner_hi = pending_prune_ranges.get_pruner_hi();
219 highest_pruned = highest_pruned.max(pruner_hi);
220 }
221 Err(e) => {
222 error!(
223 pipeline = H::NAME,
224 "Failed to prune data for range: {from} to {to_exclusive}: {e}"
225 );
226 }
227 }
228
229 if highest_pruned > highest_watermarked {
230 metrics
231 .watermark_pruner_hi
232 .with_label_values(&[H::NAME])
233 .set(highest_pruned as i64);
234
235 let guard = metrics
236 .watermark_pruner_write_latency
237 .with_label_values(&[H::NAME])
238 .start_timer();
239
240 let Ok(mut conn) = store.connect().await else {
241 warn!(
242 pipeline = H::NAME,
243 "Pruner failed to connect while updating watermark"
244 );
245 continue;
246 };
247
248 match conn.set_pruner_watermark(H::NAME, highest_pruned).await {
249 Err(e) => {
250 let elapsed = guard.stop_and_record();
251 error!(
252 pipeline = H::NAME,
253 elapsed_ms = elapsed * 1000.0,
254 "Failed to update pruner watermark: {e}"
255 )
256 }
257 Ok(true) => {
258 highest_watermarked = highest_pruned;
259 let elapsed = guard.stop_and_record();
260 logger.log::<H>(
261 LoggerWatermark::checkpoint(highest_watermarked),
262 elapsed,
263 );
264
265 metrics
266 .watermark_pruner_hi_in_db
267 .with_label_values(&[H::NAME])
268 .set(highest_watermarked as i64);
269 }
270 Ok(false) => {}
271 }
272 }
273 }
274 }
275 })
276}
277
278async fn prune_task_impl<H: Handler + Send + Sync + 'static>(
279 metrics: Arc<IndexerMetrics>,
280 db: H::Store,
281 handler: Arc<H>,
282 from: u64,
283 to_exclusive: u64,
284) -> Result<(), anyhow::Error> {
285 metrics
286 .total_pruner_chunks_attempted
287 .with_label_values(&[H::NAME])
288 .inc();
289
290 let guard = metrics
291 .pruner_delete_latency
292 .with_label_values(&[H::NAME])
293 .start_timer();
294
295 let mut conn = db.connect().await?;
296
297 debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");
298
299 let affected = match handler.prune(from, to_exclusive, &mut conn).await {
300 Ok(affected) => {
301 guard.stop_and_record();
302 affected
303 }
304
305 Err(e) => {
306 guard.stop_and_record();
307 return Err(e);
308 }
309 };
310
311 metrics
312 .total_pruner_chunks_deleted
313 .with_label_values(&[H::NAME])
314 .inc();
315
316 metrics
317 .total_pruner_rows_deleted
318 .with_label_values(&[H::NAME])
319 .inc_by(affected as u64);
320
321 Ok(())
322}
323
324#[cfg(test)]
325mod tests {
326 use std::sync::Arc;
327 use std::{
328 collections::HashMap,
329 time::{SystemTime, UNIX_EPOCH},
330 };
331
332 use async_trait::async_trait;
333 use prometheus::Registry;
334 use sui_types::full_checkpoint_content::Checkpoint;
335 use tokio::time::Duration;
336
337 use crate::{
338 FieldCount,
339 metrics::IndexerMetrics,
340 mocks::store::*,
341 pipeline::{Processor, concurrent::BatchStatus},
342 };
343
344 use super::*;
345
346 #[derive(Clone, FieldCount)]
347 pub struct StoredData;
348
349 pub struct DataPipeline;
350
351 #[async_trait]
352 impl Processor for DataPipeline {
353 const NAME: &'static str = "data";
354
355 type Value = StoredData;
356
357 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
358 Ok(vec![])
359 }
360 }
361
362 #[async_trait]
363 impl Handler for DataPipeline {
364 type Store = MockStore;
365 type Batch = Vec<Self::Value>;
366
367 fn batch(
368 &self,
369 batch: &mut Self::Batch,
370 values: &mut std::vec::IntoIter<Self::Value>,
371 ) -> BatchStatus {
372 batch.extend(values);
373 BatchStatus::Pending
374 }
375
376 async fn commit<'a>(
377 &self,
378 batch: &Self::Batch,
379 _conn: &mut MockConnection<'a>,
380 ) -> anyhow::Result<usize> {
381 Ok(batch.len())
382 }
383
384 async fn prune<'a>(
385 &self,
386 from: u64,
387 to_exclusive: u64,
388 conn: &mut MockConnection<'a>,
389 ) -> anyhow::Result<usize> {
390 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
391 }
392 }
393
394 #[test]
395 fn test_pending_ranges_basic_scheduling() {
396 let mut ranges = PendingRanges::default();
397
398 ranges.schedule(1, 5);
400
401 ranges.schedule(10, 15);
403
404 let scheduled: Vec<_> = ranges.iter().collect();
406 assert_eq!(scheduled, vec![(1, 5), (10, 15)]);
407 }
408
409 #[test]
410 fn test_pending_ranges_double_pruning_prevention() {
411 let mut ranges = PendingRanges::default();
412
413 ranges.schedule(1, 5);
415
416 ranges.schedule(3, 7);
418
419 let scheduled: Vec<_> = ranges.iter().collect();
420 assert_eq!(scheduled, vec![(1, 5), (5, 7)]);
421
422 ranges.schedule(2, 4); assert_eq!(ranges.len(), 2); let scheduled: Vec<_> = ranges.iter().collect();
427 assert_eq!(scheduled, vec![(1, 5), (5, 7)]); }
429
430 #[test]
431 fn test_pending_ranges_exact_duplicate() {
432 let mut ranges = PendingRanges::default();
433
434 ranges.schedule(1, 5);
436 assert_eq!(ranges.len(), 1);
437
438 ranges.schedule(1, 5);
440 assert_eq!(ranges.len(), 1); let scheduled: Vec<_> = ranges.iter().collect();
443 assert_eq!(scheduled, vec![(1, 5)]);
444 }
445
446 #[test]
447 fn test_pending_ranges_adjacent_ranges() {
448 let mut ranges = PendingRanges::default();
449
450 ranges.schedule(1, 5);
452
453 ranges.schedule(5, 10);
455
456 let scheduled: Vec<_> = ranges.iter().collect();
457 assert_eq!(scheduled, vec![(1, 5), (5, 10)]);
458 }
459
460 #[test]
461 fn test_pending_ranges_remove_and_watermark() {
462 let mut ranges = PendingRanges::default();
463
464 ranges.schedule(1, 5);
466 ranges.schedule(10, 15);
467 ranges.schedule(20, 25);
468 assert_eq!(ranges.len(), 3);
469 assert_eq!(ranges.get_pruner_hi(), 1);
470
471 ranges.remove(&1);
473 assert_eq!(ranges.len(), 2);
474 assert_eq!(ranges.get_pruner_hi(), 10); ranges.remove(&10);
478 assert_eq!(ranges.len(), 1);
479 assert_eq!(ranges.get_pruner_hi(), 20);
480
481 ranges.remove(&20);
483 assert_eq!(ranges.len(), 0);
484 assert_eq!(ranges.get_pruner_hi(), 25); }
486
487 #[test]
488 fn test_pending_ranges_remove_and_watermark_out_of_order() {
489 let mut ranges = PendingRanges::default();
490
491 ranges.schedule(1, 5);
493 ranges.schedule(10, 15);
494 ranges.schedule(20, 25);
495 assert_eq!(ranges.len(), 3);
496 assert_eq!(ranges.get_pruner_hi(), 1);
497
498 ranges.remove(&10);
500 assert_eq!(ranges.len(), 2);
501 assert_eq!(ranges.get_pruner_hi(), 1);
502
503 ranges.remove(&1);
505 assert_eq!(ranges.len(), 1);
506 assert_eq!(ranges.get_pruner_hi(), 20);
507
508 ranges.remove(&20);
510 assert_eq!(ranges.len(), 0);
511 assert_eq!(ranges.get_pruner_hi(), 25); }
513
514 #[tokio::test]
515 async fn test_pruner() {
516 let handler = Arc::new(DataPipeline);
517 let pruner_config = PrunerConfig {
518 interval_ms: 10,
519 delay_ms: 2000,
520 retention: 1,
521 max_chunk_size: 100,
522 prune_concurrency: 1,
523 };
524 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
525 let metrics = IndexerMetrics::new(None, ®istry);
526
527 let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
529 let timestamp = SystemTime::now()
531 .duration_since(UNIX_EPOCH)
532 .unwrap()
533 .as_millis() as u64;
534
535 let watermark = MockWatermark {
536 epoch_hi_inclusive: 0,
537 checkpoint_hi_inclusive: 3,
538 tx_hi: 9,
539 timestamp_ms_hi_inclusive: timestamp,
540 reader_lo: 3,
541 pruner_timestamp: timestamp,
542 pruner_hi: 0,
543 };
544 let store = MockStore::new()
545 .with_watermark(DataPipeline::NAME, watermark)
546 .with_data(DataPipeline::NAME, test_data);
547
548 let store_clone = store.clone();
550 let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
551
552 tokio::time::sleep(Duration::from_millis(200)).await;
554 {
555 let data = store.data.get(DataPipeline::NAME).unwrap();
556 assert!(
557 data.contains_key(&1),
558 "Checkpoint 1 shouldn't be pruned before delay"
559 );
560 assert!(
561 data.contains_key(&2),
562 "Checkpoint 2 shouldn't be pruned before delay"
563 );
564 assert!(
565 data.contains_key(&3),
566 "Checkpoint 3 shouldn't be pruned before delay"
567 );
568 }
569
570 tokio::time::sleep(Duration::from_millis(2000)).await;
572
573 {
575 let data = store.data.get(DataPipeline::NAME).unwrap();
576 assert!(
577 !data.contains_key(&1),
578 "Checkpoint 1 should be pruned after delay"
579 );
580
581 assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
583
584 let watermark = store.watermark(DataPipeline::NAME).unwrap();
586 assert!(
587 watermark.pruner_hi > 1,
588 "Pruner watermark should be updated"
589 );
590 }
591 }
592
593 #[tokio::test]
594 async fn test_pruner_timestamp_in_the_past() {
595 let handler = Arc::new(DataPipeline);
596 let pruner_config = PrunerConfig {
597 interval_ms: 10,
598 delay_ms: 20_000,
599 retention: 1,
600 max_chunk_size: 100,
601 prune_concurrency: 1,
602 };
603 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
604 let metrics = IndexerMetrics::new(None, ®istry);
605
606 let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
608 let timestamp = SystemTime::now()
610 .duration_since(UNIX_EPOCH)
611 .unwrap()
612 .as_millis() as u64;
613
614 let watermark = MockWatermark {
615 epoch_hi_inclusive: 0,
616 checkpoint_hi_inclusive: 3,
617 tx_hi: 9,
618 timestamp_ms_hi_inclusive: timestamp,
619 reader_lo: 3,
620 pruner_timestamp: 0,
621 pruner_hi: 0,
622 };
623 let store = MockStore::new()
624 .with_watermark(DataPipeline::NAME, watermark)
625 .with_data(DataPipeline::NAME, test_data);
626
627 let store_clone = store.clone();
629 let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
630
631 tokio::time::sleep(Duration::from_millis(500)).await;
635
636 {
637 let data = store.data.get(DataPipeline::NAME).unwrap();
638 assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
639
640 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
641
642 assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
644
645 let watermark = store.watermark(DataPipeline::NAME).unwrap();
647 assert!(
648 watermark.pruner_hi > 1,
649 "Pruner watermark should be updated"
650 );
651 }
652 }
653
654 #[tokio::test]
655 async fn test_pruner_watermark_update_with_retries() {
656 let handler = Arc::new(DataPipeline);
657 let pruner_config = PrunerConfig {
658 interval_ms: 3_000, delay_ms: 100, retention: 1,
661 max_chunk_size: 1, prune_concurrency: 1,
663 };
664 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
665 let metrics = IndexerMetrics::new(None, ®istry);
666
667 let test_data = HashMap::from([
669 (1, vec![1, 2]),
670 (2, vec![3, 4]),
671 (3, vec![5, 6]),
672 (4, vec![7, 8]),
673 ]);
674
675 let timestamp = SystemTime::now()
676 .duration_since(UNIX_EPOCH)
677 .unwrap()
678 .as_millis() as u64;
679
680 let watermark = MockWatermark {
681 epoch_hi_inclusive: 0,
682 checkpoint_hi_inclusive: 4,
683 tx_hi: 8,
684 timestamp_ms_hi_inclusive: timestamp,
685 reader_lo: 4, pruner_timestamp: 0, pruner_hi: 1,
688 };
689
690 let store = MockStore::new()
692 .with_watermark(DataPipeline::NAME, watermark)
693 .with_data(DataPipeline::NAME, test_data.clone())
694 .with_prune_failures(1, 2, 1);
695
696 let store_clone = store.clone();
698 let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
699
700 tokio::time::sleep(Duration::from_millis(500)).await;
702 {
703 let data = store.data.get(DataPipeline::NAME).unwrap();
704 let watermarks = store.watermark(DataPipeline::NAME).unwrap();
705
706 assert_eq!(
708 watermarks.pruner_hi, 1,
709 "Pruner watermark should remain at 1 because range [1,2) failed"
710 );
711 assert!(data.contains_key(&1), "Checkpoint 1 should be preserved");
712 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
713 assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
714 assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
715 }
716
717 tokio::time::sleep(Duration::from_millis(3000)).await;
719 {
720 let data = store.data.get(DataPipeline::NAME).unwrap();
721 let watermarks = store.watermark(DataPipeline::NAME).unwrap();
722
723 assert_eq!(
725 watermarks.pruner_hi, 4,
726 "Pruner watermark should advance to 4 after all ranges complete"
727 );
728 assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
729 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
730 assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
731 assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
732 }
733 }
734}