1use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use futures::stream::FuturesUnordered;
9use sui_futures::service::Service;
10use tokio::sync::Semaphore;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::concurrent::Handler;
20use crate::pipeline::concurrent::PrunerConfig;
21use crate::pipeline::logging::LoggerWatermark;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::ConcurrentConnection;
24use crate::store::Store;
25
26#[derive(Default)]
27struct PendingRanges {
28 ranges: BTreeMap<u64, u64>,
30 last_scheduled_range: Option<(u64, u64)>,
32}
33
34impl PendingRanges {
35 fn schedule(&mut self, mut from: u64, to_exclusive: u64) {
42 let last_scheduled_range = self.last_scheduled_range.unwrap_or((0, 0));
43 if to_exclusive <= last_scheduled_range.1 {
46 return;
47 }
48 from = from.max(last_scheduled_range.1);
50 self.ranges.insert(from, to_exclusive);
51 self.last_scheduled_range = Some((from, to_exclusive));
52 }
53
54 fn len(&self) -> usize {
55 self.ranges.len()
56 }
57
58 fn iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
59 self.ranges
60 .iter()
61 .map(|(from, to_exclusive)| (*from, *to_exclusive))
62 }
63
64 fn remove(&mut self, from: &u64) {
66 self.ranges.remove(from).unwrap();
67 }
68
69 fn get_pruner_hi(&self) -> u64 {
73 self.ranges.keys().next().cloned().unwrap_or(
74 self.last_scheduled_range
75 .map(|(_, t)| t)
76 .unwrap_or_default(),
79 )
80 }
81}
82
83pub(super) fn pruner<H: Handler>(
99 handler: Arc<H>,
100 config: Option<PrunerConfig>,
101 store: H::Store,
102 metrics: Arc<IndexerMetrics>,
103) -> Service {
104 Service::new().spawn_aborting(async move {
105 let Some(config) = config else {
106 info!(pipeline = H::NAME, "Skipping pruner task");
107 return Ok(());
108 };
109
110 info!(
111 pipeline = H::NAME,
112 "Starting pruner with config: {:?}", config
113 );
114
115 let mut poll = interval(config.interval());
119 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
120
121 let mut logger = WatermarkLogger::new("pruner");
124
125 let mut pending_prune_ranges = PendingRanges::default();
129
130 loop {
131 poll.tick().await;
132
133 let mut watermark = {
135 let guard = metrics
136 .watermark_pruner_read_latency
137 .with_label_values(&[H::NAME])
138 .start_timer();
139
140 let Ok(mut conn) = store.connect().await else {
141 warn!(
142 pipeline = H::NAME,
143 "Pruner failed to connect, while fetching watermark"
144 );
145 continue;
146 };
147
148 match conn.pruner_watermark(H::NAME, config.delay()).await {
149 Ok(Some(current)) => {
150 guard.stop_and_record();
151 current
152 }
153
154 Ok(None) => {
155 guard.stop_and_record();
156 info!(pipeline = H::NAME, "No watermark for pipeline, skipping");
157 continue;
158 }
159
160 Err(e) => {
161 guard.stop_and_record();
162 warn!(pipeline = H::NAME, "Failed to get watermark: {e}");
163 continue;
164 }
165 }
166 };
167
168 if let Some(wait_for) = watermark.wait_for() {
170 debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune");
171 tokio::time::sleep(wait_for).await;
172 }
173
174 let mut highest_pruned = watermark.pruner_hi;
177 let mut highest_watermarked = watermark.pruner_hi;
179
180 while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
183 pending_prune_ranges.schedule(from, to_exclusive);
184 }
185
186 debug!(
187 pipeline = H::NAME,
188 "Number of chunks to prune: {}",
189 pending_prune_ranges.len()
190 );
191
192 let semaphore = Arc::new(Semaphore::new(config.prune_concurrency as usize));
196 let mut tasks = FuturesUnordered::new();
197 for (from, to_exclusive) in pending_prune_ranges.iter() {
198 let semaphore = semaphore.clone();
199 let metrics = metrics.clone();
200 let handler = handler.clone();
201
202 let db = store.clone();
203
204 tasks.push(tokio::spawn(async move {
205 let _permit = semaphore.acquire().await.unwrap();
206 let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
207 ((from, to_exclusive), result)
208 }));
209 }
210
211 while let Some(r) = tasks.next().await {
217 let ((from, to_exclusive), result) = r.unwrap();
218 match result {
219 Ok(()) => {
220 pending_prune_ranges.remove(&from);
221 let pruner_hi = pending_prune_ranges.get_pruner_hi();
222 highest_pruned = highest_pruned.max(pruner_hi);
223 }
224 Err(e) => {
225 error!(
226 pipeline = H::NAME,
227 "Failed to prune data for range: {from} to {to_exclusive}: {e}"
228 );
229 }
230 }
231
232 if highest_pruned > highest_watermarked {
233 metrics
234 .watermark_pruner_hi
235 .with_label_values(&[H::NAME])
236 .set(highest_pruned as i64);
237
238 let guard = metrics
239 .watermark_pruner_write_latency
240 .with_label_values(&[H::NAME])
241 .start_timer();
242
243 let Ok(mut conn) = store.connect().await else {
244 warn!(
245 pipeline = H::NAME,
246 "Pruner failed to connect while updating watermark"
247 );
248 continue;
249 };
250
251 match conn.set_pruner_watermark(H::NAME, highest_pruned).await {
252 Err(e) => {
253 let elapsed = guard.stop_and_record();
254 error!(
255 pipeline = H::NAME,
256 elapsed_ms = elapsed * 1000.0,
257 "Failed to update pruner watermark: {e}"
258 )
259 }
260 Ok(true) => {
261 highest_watermarked = highest_pruned;
262 let elapsed = guard.stop_and_record();
263 logger.log::<H>(
264 LoggerWatermark::checkpoint(highest_watermarked),
265 elapsed,
266 );
267
268 metrics
269 .watermark_pruner_hi_in_db
270 .with_label_values(&[H::NAME])
271 .set(highest_watermarked as i64);
272 }
273 Ok(false) => {}
274 }
275 }
276 }
277 }
278 })
279}
280
281async fn prune_task_impl<H: Handler>(
282 metrics: Arc<IndexerMetrics>,
283 db: H::Store,
284 handler: Arc<H>,
285 from: u64,
286 to_exclusive: u64,
287) -> Result<(), anyhow::Error> {
288 metrics
289 .total_pruner_chunks_attempted
290 .with_label_values(&[H::NAME])
291 .inc();
292
293 let guard = metrics
294 .pruner_delete_latency
295 .with_label_values(&[H::NAME])
296 .start_timer();
297
298 let mut conn = db.connect().await?;
299
300 debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");
301
302 let affected = match handler.prune(from, to_exclusive, &mut conn).await {
303 Ok(affected) => {
304 guard.stop_and_record();
305 affected
306 }
307
308 Err(e) => {
309 guard.stop_and_record();
310 return Err(e);
311 }
312 };
313
314 metrics
315 .total_pruner_chunks_deleted
316 .with_label_values(&[H::NAME])
317 .inc();
318
319 metrics
320 .total_pruner_rows_deleted
321 .with_label_values(&[H::NAME])
322 .inc_by(affected as u64);
323
324 Ok(())
325}
326
327#[cfg(test)]
328mod tests {
329 use std::collections::HashMap;
330 use std::sync::Arc;
331 use std::time::SystemTime;
332 use std::time::UNIX_EPOCH;
333
334 use async_trait::async_trait;
335 use prometheus::Registry;
336 use sui_indexer_alt_framework_store_traits::testing::mock_store::MockWatermark;
337 use sui_types::full_checkpoint_content::Checkpoint;
338 use tokio::time::Duration;
339
340 use crate::FieldCount;
341 use crate::metrics::IndexerMetrics;
342 use crate::mocks::store::FallibleMockConnection;
343 use crate::mocks::store::FallibleMockStore;
344 use crate::pipeline::Processor;
345 use crate::pipeline::concurrent::BatchStatus;
346
347 use super::*;
348
349 #[derive(Clone, FieldCount)]
350 pub struct StoredData;
351
352 pub struct DataPipeline;
353
354 #[async_trait]
355 impl Processor for DataPipeline {
356 const NAME: &'static str = "data";
357
358 type Value = StoredData;
359
360 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
361 Ok(vec![])
362 }
363 }
364
365 #[async_trait]
366 impl Handler for DataPipeline {
367 type Store = FallibleMockStore;
368 type Batch = Vec<Self::Value>;
369
370 fn batch(
371 &self,
372 batch: &mut Self::Batch,
373 values: &mut std::vec::IntoIter<Self::Value>,
374 ) -> BatchStatus {
375 batch.extend(values);
376 BatchStatus::Pending
377 }
378
379 async fn commit<'a>(
380 &self,
381 batch: &Self::Batch,
382 _conn: &mut FallibleMockConnection<'a>,
383 ) -> anyhow::Result<usize> {
384 Ok(batch.len())
385 }
386
387 async fn prune<'a>(
388 &self,
389 from: u64,
390 to_exclusive: u64,
391 conn: &mut FallibleMockConnection<'a>,
392 ) -> anyhow::Result<usize> {
393 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
394 }
395 }
396
397 #[test]
398 fn test_pending_ranges_basic_scheduling() {
399 let mut ranges = PendingRanges::default();
400
401 ranges.schedule(1, 5);
403
404 ranges.schedule(10, 15);
406
407 let scheduled: Vec<_> = ranges.iter().collect();
409 assert_eq!(scheduled, vec![(1, 5), (10, 15)]);
410 }
411
412 #[test]
413 fn test_pending_ranges_double_pruning_prevention() {
414 let mut ranges = PendingRanges::default();
415
416 ranges.schedule(1, 5);
418
419 ranges.schedule(3, 7);
421
422 let scheduled: Vec<_> = ranges.iter().collect();
423 assert_eq!(scheduled, vec![(1, 5), (5, 7)]);
424
425 ranges.schedule(2, 4); assert_eq!(ranges.len(), 2); let scheduled: Vec<_> = ranges.iter().collect();
430 assert_eq!(scheduled, vec![(1, 5), (5, 7)]); }
432
433 #[test]
434 fn test_pending_ranges_exact_duplicate() {
435 let mut ranges = PendingRanges::default();
436
437 ranges.schedule(1, 5);
439 assert_eq!(ranges.len(), 1);
440
441 ranges.schedule(1, 5);
443 assert_eq!(ranges.len(), 1); let scheduled: Vec<_> = ranges.iter().collect();
446 assert_eq!(scheduled, vec![(1, 5)]);
447 }
448
449 #[test]
450 fn test_pending_ranges_adjacent_ranges() {
451 let mut ranges = PendingRanges::default();
452
453 ranges.schedule(1, 5);
455
456 ranges.schedule(5, 10);
458
459 let scheduled: Vec<_> = ranges.iter().collect();
460 assert_eq!(scheduled, vec![(1, 5), (5, 10)]);
461 }
462
463 #[test]
464 fn test_pending_ranges_remove_and_watermark() {
465 let mut ranges = PendingRanges::default();
466
467 ranges.schedule(1, 5);
469 ranges.schedule(10, 15);
470 ranges.schedule(20, 25);
471 assert_eq!(ranges.len(), 3);
472 assert_eq!(ranges.get_pruner_hi(), 1);
473
474 ranges.remove(&1);
476 assert_eq!(ranges.len(), 2);
477 assert_eq!(ranges.get_pruner_hi(), 10); ranges.remove(&10);
481 assert_eq!(ranges.len(), 1);
482 assert_eq!(ranges.get_pruner_hi(), 20);
483
484 ranges.remove(&20);
486 assert_eq!(ranges.len(), 0);
487 assert_eq!(ranges.get_pruner_hi(), 25); }
489
490 #[test]
491 fn test_pending_ranges_remove_and_watermark_out_of_order() {
492 let mut ranges = PendingRanges::default();
493
494 ranges.schedule(1, 5);
496 ranges.schedule(10, 15);
497 ranges.schedule(20, 25);
498 assert_eq!(ranges.len(), 3);
499 assert_eq!(ranges.get_pruner_hi(), 1);
500
501 ranges.remove(&10);
503 assert_eq!(ranges.len(), 2);
504 assert_eq!(ranges.get_pruner_hi(), 1);
505
506 ranges.remove(&1);
508 assert_eq!(ranges.len(), 1);
509 assert_eq!(ranges.get_pruner_hi(), 20);
510
511 ranges.remove(&20);
513 assert_eq!(ranges.len(), 0);
514 assert_eq!(ranges.get_pruner_hi(), 25); }
516
517 #[tokio::test]
518 async fn test_pruner() {
519 let handler = Arc::new(DataPipeline);
520 let pruner_config = PrunerConfig {
521 interval_ms: 10,
522 delay_ms: 2000,
523 retention: 1,
524 max_chunk_size: 100,
525 prune_concurrency: 1,
526 };
527 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
528 let metrics = IndexerMetrics::new(None, ®istry);
529
530 let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
532 let timestamp = SystemTime::now()
534 .duration_since(UNIX_EPOCH)
535 .unwrap()
536 .as_millis() as u64;
537
538 let watermark = MockWatermark {
539 epoch_hi_inclusive: 0,
540 checkpoint_hi_inclusive: Some(3),
541 tx_hi: 9,
542 timestamp_ms_hi_inclusive: timestamp,
543 reader_lo: 3,
544 pruner_timestamp: timestamp,
545 pruner_hi: 0,
546 chain_id: None,
547 };
548 let store = FallibleMockStore::new()
549 .with_watermark(DataPipeline::NAME, watermark)
550 .with_data(DataPipeline::NAME, test_data);
551
552 let store_clone = store.clone();
554 let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
555
556 tokio::time::sleep(Duration::from_millis(200)).await;
558 {
559 let data = store.data.get(DataPipeline::NAME).unwrap();
560 assert!(
561 data.contains_key(&1),
562 "Checkpoint 1 shouldn't be pruned before delay"
563 );
564 assert!(
565 data.contains_key(&2),
566 "Checkpoint 2 shouldn't be pruned before delay"
567 );
568 assert!(
569 data.contains_key(&3),
570 "Checkpoint 3 shouldn't be pruned before delay"
571 );
572 }
573
574 tokio::time::sleep(Duration::from_millis(2000)).await;
576
577 {
579 let data = store.data.get(DataPipeline::NAME).unwrap();
580 assert!(
581 !data.contains_key(&1),
582 "Checkpoint 1 should be pruned after delay"
583 );
584
585 assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
587
588 let watermark = store.watermark(DataPipeline::NAME).unwrap();
590 assert!(
591 watermark.pruner_hi > 1,
592 "Pruner watermark should be updated"
593 );
594 }
595 }
596
597 #[tokio::test]
598 async fn test_pruner_timestamp_in_the_past() {
599 let handler = Arc::new(DataPipeline);
600 let pruner_config = PrunerConfig {
601 interval_ms: 10,
602 delay_ms: 20_000,
603 retention: 1,
604 max_chunk_size: 100,
605 prune_concurrency: 1,
606 };
607 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
608 let metrics = IndexerMetrics::new(None, ®istry);
609
610 let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
612 let timestamp = SystemTime::now()
614 .duration_since(UNIX_EPOCH)
615 .unwrap()
616 .as_millis() as u64;
617
618 let watermark = MockWatermark {
619 epoch_hi_inclusive: 0,
620 checkpoint_hi_inclusive: Some(3),
621 tx_hi: 9,
622 timestamp_ms_hi_inclusive: timestamp,
623 reader_lo: 3,
624 pruner_timestamp: 0,
625 pruner_hi: 0,
626 chain_id: None,
627 };
628 let store = FallibleMockStore::new()
629 .with_watermark(DataPipeline::NAME, watermark)
630 .with_data(DataPipeline::NAME, test_data);
631
632 let store_clone = store.clone();
634 let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
635
636 tokio::time::sleep(Duration::from_millis(500)).await;
640
641 {
642 let data = store.data.get(DataPipeline::NAME).unwrap();
643 assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
644
645 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
646
647 assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
649
650 let watermark = store.watermark(DataPipeline::NAME).unwrap();
652 assert!(
653 watermark.pruner_hi > 1,
654 "Pruner watermark should be updated"
655 );
656 }
657 }
658
659 #[tokio::test]
660 async fn test_pruner_watermark_update_with_retries() {
661 let handler = Arc::new(DataPipeline);
662 let pruner_config = PrunerConfig {
663 interval_ms: 3_000, delay_ms: 100, retention: 1,
666 max_chunk_size: 1, prune_concurrency: 1,
668 };
669 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
670 let metrics = IndexerMetrics::new(None, ®istry);
671
672 let test_data = HashMap::from([
674 (1, vec![1, 2]),
675 (2, vec![3, 4]),
676 (3, vec![5, 6]),
677 (4, vec![7, 8]),
678 ]);
679
680 let timestamp = SystemTime::now()
681 .duration_since(UNIX_EPOCH)
682 .unwrap()
683 .as_millis() as u64;
684
685 let watermark = MockWatermark {
686 epoch_hi_inclusive: 0,
687 checkpoint_hi_inclusive: Some(4),
688 tx_hi: 8,
689 timestamp_ms_hi_inclusive: timestamp,
690 reader_lo: 4, pruner_timestamp: 0, pruner_hi: 1,
693 chain_id: None,
694 };
695
696 let store = FallibleMockStore::new()
698 .with_watermark(DataPipeline::NAME, watermark)
699 .with_data(DataPipeline::NAME, test_data.clone())
700 .with_prune_failures(1, 2, 1);
701
702 let store_clone = store.clone();
704 let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
705
706 tokio::time::sleep(Duration::from_millis(500)).await;
708 {
709 let data = store.data.get(DataPipeline::NAME).unwrap();
710 let watermarks = store.watermark(DataPipeline::NAME).unwrap();
711
712 assert_eq!(
714 watermarks.pruner_hi, 1,
715 "Pruner watermark should remain at 1 because range [1,2) failed"
716 );
717 assert!(data.contains_key(&1), "Checkpoint 1 should be preserved");
718 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
719 assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
720 assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
721 }
722
723 tokio::time::sleep(Duration::from_millis(3000)).await;
725 {
726 let data = store.data.get(DataPipeline::NAME).unwrap();
727 let watermarks = store.watermark(DataPipeline::NAME).unwrap();
728
729 assert_eq!(
731 watermarks.pruner_hi, 4,
732 "Pruner watermark should advance to 4 after all ranges complete"
733 );
734 assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
735 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
736 assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
737 assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
738 }
739 }
740}