1use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use futures::stream::FuturesUnordered;
9use sui_futures::service::Service;
10use tokio::sync::Semaphore;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::concurrent::Handler;
20use crate::pipeline::concurrent::PrunerConfig;
21use crate::pipeline::logging::LoggerWatermark;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::Connection;
24use crate::store::Store;
25
26#[derive(Default)]
27struct PendingRanges {
28 ranges: BTreeMap<u64, u64>,
30 last_scheduled_range: Option<(u64, u64)>,
32}
33
34impl PendingRanges {
35 fn schedule(&mut self, mut from: u64, to_exclusive: u64) {
42 let last_scheduled_range = self.last_scheduled_range.unwrap_or((0, 0));
43 if to_exclusive <= last_scheduled_range.1 {
46 return;
47 }
48 from = from.max(last_scheduled_range.1);
50 self.ranges.insert(from, to_exclusive);
51 self.last_scheduled_range = Some((from, to_exclusive));
52 }
53
54 fn len(&self) -> usize {
55 self.ranges.len()
56 }
57
58 fn iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
59 self.ranges
60 .iter()
61 .map(|(from, to_exclusive)| (*from, *to_exclusive))
62 }
63
64 fn remove(&mut self, from: &u64) {
66 self.ranges.remove(from).unwrap();
67 }
68
69 fn get_pruner_hi(&self) -> u64 {
73 self.ranges.keys().next().cloned().unwrap_or(
74 self.last_scheduled_range
75 .map(|(_, t)| t)
76 .unwrap_or_default(),
79 )
80 }
81}
82
83pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
99 handler: Arc<H>,
100 config: Option<PrunerConfig>,
101 store: H::Store,
102 metrics: Arc<IndexerMetrics>,
103) -> Service {
104 Service::new().spawn_aborting(async move {
105 let Some(config) = config else {
106 info!(pipeline = H::NAME, "Skipping pruner task");
107 return Ok(());
108 };
109
110 info!(
111 pipeline = H::NAME,
112 "Starting pruner with config: {:?}", config
113 );
114
115 let mut poll = interval(config.interval());
119 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
120
121 let mut logger = WatermarkLogger::new("pruner");
124
125 let mut pending_prune_ranges = PendingRanges::default();
129
130 loop {
131 poll.tick().await;
132
133 let mut watermark = {
135 let guard = metrics
136 .watermark_pruner_read_latency
137 .with_label_values(&[H::NAME])
138 .start_timer();
139
140 let Ok(mut conn) = store.connect().await else {
141 warn!(
142 pipeline = H::NAME,
143 "Pruner failed to connect, while fetching watermark"
144 );
145 continue;
146 };
147
148 match conn.pruner_watermark(H::NAME, config.delay()).await {
149 Ok(Some(current)) => {
150 guard.stop_and_record();
151 current
152 }
153
154 Ok(None) => {
155 guard.stop_and_record();
156 warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
157 continue;
158 }
159
160 Err(e) => {
161 guard.stop_and_record();
162 warn!(pipeline = H::NAME, "Failed to get watermark: {e}");
163 continue;
164 }
165 }
166 };
167
168 if let Some(wait_for) = watermark.wait_for() {
170 debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune");
171 tokio::time::sleep(wait_for).await;
172 }
173
174 let mut highest_pruned = watermark.pruner_hi;
177 let mut highest_watermarked = watermark.pruner_hi;
179
180 while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
183 pending_prune_ranges.schedule(from, to_exclusive);
184 }
185
186 debug!(
187 pipeline = H::NAME,
188 "Number of chunks to prune: {}",
189 pending_prune_ranges.len()
190 );
191
192 let semaphore = Arc::new(Semaphore::new(config.prune_concurrency as usize));
196 let mut tasks = FuturesUnordered::new();
197 for (from, to_exclusive) in pending_prune_ranges.iter() {
198 let semaphore = semaphore.clone();
199 let metrics = metrics.clone();
200 let handler = handler.clone();
201
202 let db = store.clone();
203
204 tasks.push(tokio::spawn(async move {
205 let _permit = semaphore.acquire().await.unwrap();
206 let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
207 ((from, to_exclusive), result)
208 }));
209 }
210
211 while let Some(r) = tasks.next().await {
217 let ((from, to_exclusive), result) = r.unwrap();
218 match result {
219 Ok(()) => {
220 pending_prune_ranges.remove(&from);
221 let pruner_hi = pending_prune_ranges.get_pruner_hi();
222 highest_pruned = highest_pruned.max(pruner_hi);
223 }
224 Err(e) => {
225 error!(
226 pipeline = H::NAME,
227 "Failed to prune data for range: {from} to {to_exclusive}: {e}"
228 );
229 }
230 }
231
232 if highest_pruned > highest_watermarked {
233 metrics
234 .watermark_pruner_hi
235 .with_label_values(&[H::NAME])
236 .set(highest_pruned as i64);
237
238 let guard = metrics
239 .watermark_pruner_write_latency
240 .with_label_values(&[H::NAME])
241 .start_timer();
242
243 let Ok(mut conn) = store.connect().await else {
244 warn!(
245 pipeline = H::NAME,
246 "Pruner failed to connect while updating watermark"
247 );
248 continue;
249 };
250
251 match conn.set_pruner_watermark(H::NAME, highest_pruned).await {
252 Err(e) => {
253 let elapsed = guard.stop_and_record();
254 error!(
255 pipeline = H::NAME,
256 elapsed_ms = elapsed * 1000.0,
257 "Failed to update pruner watermark: {e}"
258 )
259 }
260 Ok(true) => {
261 highest_watermarked = highest_pruned;
262 let elapsed = guard.stop_and_record();
263 logger.log::<H>(
264 LoggerWatermark::checkpoint(highest_watermarked),
265 elapsed,
266 );
267
268 metrics
269 .watermark_pruner_hi_in_db
270 .with_label_values(&[H::NAME])
271 .set(highest_watermarked as i64);
272 }
273 Ok(false) => {}
274 }
275 }
276 }
277 }
278 })
279}
280
281async fn prune_task_impl<H: Handler + Send + Sync + 'static>(
282 metrics: Arc<IndexerMetrics>,
283 db: H::Store,
284 handler: Arc<H>,
285 from: u64,
286 to_exclusive: u64,
287) -> Result<(), anyhow::Error> {
288 metrics
289 .total_pruner_chunks_attempted
290 .with_label_values(&[H::NAME])
291 .inc();
292
293 let guard = metrics
294 .pruner_delete_latency
295 .with_label_values(&[H::NAME])
296 .start_timer();
297
298 let mut conn = db.connect().await?;
299
300 debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");
301
302 let affected = match handler.prune(from, to_exclusive, &mut conn).await {
303 Ok(affected) => {
304 guard.stop_and_record();
305 affected
306 }
307
308 Err(e) => {
309 guard.stop_and_record();
310 return Err(e);
311 }
312 };
313
314 metrics
315 .total_pruner_chunks_deleted
316 .with_label_values(&[H::NAME])
317 .inc();
318
319 metrics
320 .total_pruner_rows_deleted
321 .with_label_values(&[H::NAME])
322 .inc_by(affected as u64);
323
324 Ok(())
325}
326
327#[cfg(test)]
328mod tests {
329 use std::collections::HashMap;
330 use std::sync::Arc;
331 use std::time::SystemTime;
332 use std::time::UNIX_EPOCH;
333
334 use async_trait::async_trait;
335 use prometheus::Registry;
336 use sui_types::full_checkpoint_content::Checkpoint;
337 use tokio::time::Duration;
338
339 use crate::FieldCount;
340 use crate::metrics::IndexerMetrics;
341 use crate::mocks::store::*;
342 use crate::pipeline::Processor;
343 use crate::pipeline::concurrent::BatchStatus;
344
345 use super::*;
346
347 #[derive(Clone, FieldCount)]
348 pub struct StoredData;
349
350 pub struct DataPipeline;
351
352 #[async_trait]
353 impl Processor for DataPipeline {
354 const NAME: &'static str = "data";
355
356 type Value = StoredData;
357
358 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
359 Ok(vec![])
360 }
361 }
362
363 #[async_trait]
364 impl Handler for DataPipeline {
365 type Store = MockStore;
366 type Batch = Vec<Self::Value>;
367
368 fn batch(
369 &self,
370 batch: &mut Self::Batch,
371 values: &mut std::vec::IntoIter<Self::Value>,
372 ) -> BatchStatus {
373 batch.extend(values);
374 BatchStatus::Pending
375 }
376
377 async fn commit<'a>(
378 &self,
379 batch: &Self::Batch,
380 _conn: &mut MockConnection<'a>,
381 ) -> anyhow::Result<usize> {
382 Ok(batch.len())
383 }
384
385 async fn prune<'a>(
386 &self,
387 from: u64,
388 to_exclusive: u64,
389 conn: &mut MockConnection<'a>,
390 ) -> anyhow::Result<usize> {
391 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
392 }
393 }
394
395 #[test]
396 fn test_pending_ranges_basic_scheduling() {
397 let mut ranges = PendingRanges::default();
398
399 ranges.schedule(1, 5);
401
402 ranges.schedule(10, 15);
404
405 let scheduled: Vec<_> = ranges.iter().collect();
407 assert_eq!(scheduled, vec![(1, 5), (10, 15)]);
408 }
409
410 #[test]
411 fn test_pending_ranges_double_pruning_prevention() {
412 let mut ranges = PendingRanges::default();
413
414 ranges.schedule(1, 5);
416
417 ranges.schedule(3, 7);
419
420 let scheduled: Vec<_> = ranges.iter().collect();
421 assert_eq!(scheduled, vec![(1, 5), (5, 7)]);
422
423 ranges.schedule(2, 4); assert_eq!(ranges.len(), 2); let scheduled: Vec<_> = ranges.iter().collect();
428 assert_eq!(scheduled, vec![(1, 5), (5, 7)]); }
430
431 #[test]
432 fn test_pending_ranges_exact_duplicate() {
433 let mut ranges = PendingRanges::default();
434
435 ranges.schedule(1, 5);
437 assert_eq!(ranges.len(), 1);
438
439 ranges.schedule(1, 5);
441 assert_eq!(ranges.len(), 1); let scheduled: Vec<_> = ranges.iter().collect();
444 assert_eq!(scheduled, vec![(1, 5)]);
445 }
446
447 #[test]
448 fn test_pending_ranges_adjacent_ranges() {
449 let mut ranges = PendingRanges::default();
450
451 ranges.schedule(1, 5);
453
454 ranges.schedule(5, 10);
456
457 let scheduled: Vec<_> = ranges.iter().collect();
458 assert_eq!(scheduled, vec![(1, 5), (5, 10)]);
459 }
460
461 #[test]
462 fn test_pending_ranges_remove_and_watermark() {
463 let mut ranges = PendingRanges::default();
464
465 ranges.schedule(1, 5);
467 ranges.schedule(10, 15);
468 ranges.schedule(20, 25);
469 assert_eq!(ranges.len(), 3);
470 assert_eq!(ranges.get_pruner_hi(), 1);
471
472 ranges.remove(&1);
474 assert_eq!(ranges.len(), 2);
475 assert_eq!(ranges.get_pruner_hi(), 10); ranges.remove(&10);
479 assert_eq!(ranges.len(), 1);
480 assert_eq!(ranges.get_pruner_hi(), 20);
481
482 ranges.remove(&20);
484 assert_eq!(ranges.len(), 0);
485 assert_eq!(ranges.get_pruner_hi(), 25); }
487
488 #[test]
489 fn test_pending_ranges_remove_and_watermark_out_of_order() {
490 let mut ranges = PendingRanges::default();
491
492 ranges.schedule(1, 5);
494 ranges.schedule(10, 15);
495 ranges.schedule(20, 25);
496 assert_eq!(ranges.len(), 3);
497 assert_eq!(ranges.get_pruner_hi(), 1);
498
499 ranges.remove(&10);
501 assert_eq!(ranges.len(), 2);
502 assert_eq!(ranges.get_pruner_hi(), 1);
503
504 ranges.remove(&1);
506 assert_eq!(ranges.len(), 1);
507 assert_eq!(ranges.get_pruner_hi(), 20);
508
509 ranges.remove(&20);
511 assert_eq!(ranges.len(), 0);
512 assert_eq!(ranges.get_pruner_hi(), 25); }
514
515 #[tokio::test]
516 async fn test_pruner() {
517 let handler = Arc::new(DataPipeline);
518 let pruner_config = PrunerConfig {
519 interval_ms: 10,
520 delay_ms: 2000,
521 retention: 1,
522 max_chunk_size: 100,
523 prune_concurrency: 1,
524 };
525 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
526 let metrics = IndexerMetrics::new(None, ®istry);
527
528 let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
530 let timestamp = SystemTime::now()
532 .duration_since(UNIX_EPOCH)
533 .unwrap()
534 .as_millis() as u64;
535
536 let watermark = MockWatermark {
537 epoch_hi_inclusive: 0,
538 checkpoint_hi_inclusive: 3,
539 tx_hi: 9,
540 timestamp_ms_hi_inclusive: timestamp,
541 reader_lo: 3,
542 pruner_timestamp: timestamp,
543 pruner_hi: 0,
544 };
545 let store = MockStore::new()
546 .with_watermark(DataPipeline::NAME, watermark)
547 .with_data(DataPipeline::NAME, test_data);
548
549 let store_clone = store.clone();
551 let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
552
553 tokio::time::sleep(Duration::from_millis(200)).await;
555 {
556 let data = store.data.get(DataPipeline::NAME).unwrap();
557 assert!(
558 data.contains_key(&1),
559 "Checkpoint 1 shouldn't be pruned before delay"
560 );
561 assert!(
562 data.contains_key(&2),
563 "Checkpoint 2 shouldn't be pruned before delay"
564 );
565 assert!(
566 data.contains_key(&3),
567 "Checkpoint 3 shouldn't be pruned before delay"
568 );
569 }
570
571 tokio::time::sleep(Duration::from_millis(2000)).await;
573
574 {
576 let data = store.data.get(DataPipeline::NAME).unwrap();
577 assert!(
578 !data.contains_key(&1),
579 "Checkpoint 1 should be pruned after delay"
580 );
581
582 assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
584
585 let watermark = store.watermark(DataPipeline::NAME).unwrap();
587 assert!(
588 watermark.pruner_hi > 1,
589 "Pruner watermark should be updated"
590 );
591 }
592 }
593
594 #[tokio::test]
595 async fn test_pruner_timestamp_in_the_past() {
596 let handler = Arc::new(DataPipeline);
597 let pruner_config = PrunerConfig {
598 interval_ms: 10,
599 delay_ms: 20_000,
600 retention: 1,
601 max_chunk_size: 100,
602 prune_concurrency: 1,
603 };
604 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
605 let metrics = IndexerMetrics::new(None, ®istry);
606
607 let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
609 let timestamp = SystemTime::now()
611 .duration_since(UNIX_EPOCH)
612 .unwrap()
613 .as_millis() as u64;
614
615 let watermark = MockWatermark {
616 epoch_hi_inclusive: 0,
617 checkpoint_hi_inclusive: 3,
618 tx_hi: 9,
619 timestamp_ms_hi_inclusive: timestamp,
620 reader_lo: 3,
621 pruner_timestamp: 0,
622 pruner_hi: 0,
623 };
624 let store = MockStore::new()
625 .with_watermark(DataPipeline::NAME, watermark)
626 .with_data(DataPipeline::NAME, test_data);
627
628 let store_clone = store.clone();
630 let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
631
632 tokio::time::sleep(Duration::from_millis(500)).await;
636
637 {
638 let data = store.data.get(DataPipeline::NAME).unwrap();
639 assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
640
641 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
642
643 assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
645
646 let watermark = store.watermark(DataPipeline::NAME).unwrap();
648 assert!(
649 watermark.pruner_hi > 1,
650 "Pruner watermark should be updated"
651 );
652 }
653 }
654
655 #[tokio::test]
656 async fn test_pruner_watermark_update_with_retries() {
657 let handler = Arc::new(DataPipeline);
658 let pruner_config = PrunerConfig {
659 interval_ms: 3_000, delay_ms: 100, retention: 1,
662 max_chunk_size: 1, prune_concurrency: 1,
664 };
665 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
666 let metrics = IndexerMetrics::new(None, ®istry);
667
668 let test_data = HashMap::from([
670 (1, vec![1, 2]),
671 (2, vec![3, 4]),
672 (3, vec![5, 6]),
673 (4, vec![7, 8]),
674 ]);
675
676 let timestamp = SystemTime::now()
677 .duration_since(UNIX_EPOCH)
678 .unwrap()
679 .as_millis() as u64;
680
681 let watermark = MockWatermark {
682 epoch_hi_inclusive: 0,
683 checkpoint_hi_inclusive: 4,
684 tx_hi: 8,
685 timestamp_ms_hi_inclusive: timestamp,
686 reader_lo: 4, pruner_timestamp: 0, pruner_hi: 1,
689 };
690
691 let store = MockStore::new()
693 .with_watermark(DataPipeline::NAME, watermark)
694 .with_data(DataPipeline::NAME, test_data.clone())
695 .with_prune_failures(1, 2, 1);
696
697 let store_clone = store.clone();
699 let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
700
701 tokio::time::sleep(Duration::from_millis(500)).await;
703 {
704 let data = store.data.get(DataPipeline::NAME).unwrap();
705 let watermarks = store.watermark(DataPipeline::NAME).unwrap();
706
707 assert_eq!(
709 watermarks.pruner_hi, 1,
710 "Pruner watermark should remain at 1 because range [1,2) failed"
711 );
712 assert!(data.contains_key(&1), "Checkpoint 1 should be preserved");
713 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
714 assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
715 assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
716 }
717
718 tokio::time::sleep(Duration::from_millis(3000)).await;
720 {
721 let data = store.data.get(DataPipeline::NAME).unwrap();
722 let watermarks = store.watermark(DataPipeline::NAME).unwrap();
723
724 assert_eq!(
726 watermarks.pruner_hi, 4,
727 "Pruner watermark should advance to 4 after all ranges complete"
728 );
729 assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
730 assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
731 assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
732 assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
733 }
734 }
735}