sui_indexer_alt_framework/pipeline/concurrent/
pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use futures::stream::FuturesUnordered;
9use sui_futures::service::Service;
10use tokio::sync::Semaphore;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::concurrent::Handler;
20use crate::pipeline::concurrent::PrunerConfig;
21use crate::pipeline::logging::LoggerWatermark;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::ConcurrentConnection;
24use crate::store::Store;
25
26#[derive(Default)]
27struct PendingRanges {
28    /// Maps from `from` to `to_exclusive` for all the ranges that are ready to be pruned.
29    ranges: BTreeMap<u64, u64>,
30    /// The last range that has been scheduled for pruning.
31    last_scheduled_range: Option<(u64, u64)>,
32}
33
34impl PendingRanges {
35    /// Schedule a new range to be pruned.
36    /// Using the last scheduled range to avoid double pruning of the same range.
37    /// This is important because double pruning will not always work since pruning
38    /// may not be idempotent for some pipelines.
39    /// For instance, if handler holds processed data needed for pruning,
40    /// the pruning step may remove those data once done.
41    fn schedule(&mut self, mut from: u64, to_exclusive: u64) {
42        let last_scheduled_range = self.last_scheduled_range.unwrap_or((0, 0));
43        // If the end of the last scheduled range is greater than the end of the new range,
44        // it means the entire new range was already scheduled before.
45        if to_exclusive <= last_scheduled_range.1 {
46            return;
47        }
48        // Otherwise, we make sure the new range starts after the last scheduled range.
49        from = from.max(last_scheduled_range.1);
50        self.ranges.insert(from, to_exclusive);
51        self.last_scheduled_range = Some((from, to_exclusive));
52    }
53
54    fn len(&self) -> usize {
55        self.ranges.len()
56    }
57
58    fn iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
59        self.ranges
60            .iter()
61            .map(|(from, to_exclusive)| (*from, *to_exclusive))
62    }
63
64    /// Remove the range from the pending_prune_ranges.
65    fn remove(&mut self, from: &u64) {
66        self.ranges.remove(from).unwrap();
67    }
68
69    /// Returns the current pruner_hi watermark, i.e. the first checkpoint that has not yet been pruned.
70    /// This will be the first key in the pending_prune_ranges map.
71    /// If the map is empty, then it is the last checkpoint that has been scheduled for pruning.
72    fn get_pruner_hi(&self) -> u64 {
73        self.ranges.keys().next().cloned().unwrap_or(
74            self.last_scheduled_range
75                .map(|(_, t)| t)
76                // get_pruner_hi will generally not be called until we have scheduled something.
77                // But return 0 just in case we called it earlier.
78                .unwrap_or_default(),
79        )
80    }
81}
82
83/// The pruner task is responsible for deleting old data from the database. It will periodically
84/// check the `watermarks` table to see if there is any data that should be pruned between the
85/// `pruner_hi` (inclusive), and `reader_lo` (exclusive) checkpoints. This task will also provide a
86/// mapping of the pruned checkpoints to their corresponding epoch and tx, which the handler can
87/// then use to delete the corresponding data from the database.
88///
89/// To ensure that the pruner does not interfere with reads that are still in flight, it respects
90/// the watermark's `pruner_timestamp`, which records the time that `reader_lo` was last updated.
91/// The task will not prune data until at least `config.delay()` has passed since `pruner_timestamp`
92/// to give in-flight reads time to land.
93///
94/// The task regularly traces its progress, outputting at a higher log level every
95/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
96///
97/// If the `config` is `None`, the task will shutdown immediately.
98pub(super) fn pruner<H: Handler>(
99    handler: Arc<H>,
100    config: Option<PrunerConfig>,
101    store: H::Store,
102    metrics: Arc<IndexerMetrics>,
103) -> Service {
104    Service::new().spawn_aborting(async move {
105        let Some(config) = config else {
106            info!(pipeline = H::NAME, "Skipping pruner task");
107            return Ok(());
108        };
109
110        info!(
111            pipeline = H::NAME,
112            "Starting pruner with config: {:?}", config
113        );
114
115        // The pruner can pause for a while, waiting for the delay imposed by the
116        // `pruner_timestamp` to expire. In that case, the period between ticks should not be
117        // compressed to make up for missed ticks.
118        let mut poll = interval(config.interval());
119        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
120
121        // The pruner task will periodically output a log message at a higher log level to
122        // demonstrate that it is making progress.
123        let mut logger = WatermarkLogger::new("pruner");
124
125        // Maintains the list of chunks that are ready to be pruned but not yet pruned.
126        // This map can contain ranges that were attempted to be pruned in previous iterations,
127        // but failed due to errors.
128        let mut pending_prune_ranges = PendingRanges::default();
129
130        loop {
131            poll.tick().await;
132
133            // (1) Get the latest pruning bounds from the database.
134            let mut watermark = {
135                let guard = metrics
136                    .watermark_pruner_read_latency
137                    .with_label_values(&[H::NAME])
138                    .start_timer();
139
140                let Ok(mut conn) = store.connect().await else {
141                    warn!(
142                        pipeline = H::NAME,
143                        "Pruner failed to connect, while fetching watermark"
144                    );
145                    continue;
146                };
147
148                match conn.pruner_watermark(H::NAME, config.delay()).await {
149                    Ok(Some(current)) => {
150                        guard.stop_and_record();
151                        current
152                    }
153
154                    Ok(None) => {
155                        guard.stop_and_record();
156                        info!(pipeline = H::NAME, "No watermark for pipeline, skipping");
157                        continue;
158                    }
159
160                    Err(e) => {
161                        guard.stop_and_record();
162                        warn!(pipeline = H::NAME, "Failed to get watermark: {e}");
163                        continue;
164                    }
165                }
166            };
167
168            // (2) Wait until this information can be acted upon.
169            if let Some(wait_for) = watermark.wait_for() {
170                debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune");
171                tokio::time::sleep(wait_for).await;
172            }
173
174            // Tracks the current highest `pruner_hi` not yet written to db. This is updated as
175            // chunks complete.
176            let mut highest_pruned = watermark.pruner_hi;
177            // Tracks the `pruner_hi` that has been written to the db.
178            let mut highest_watermarked = watermark.pruner_hi;
179
180            // (3) Collect all the new chunks that are ready to be pruned.
181            // This will also advance the watermark.
182            while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
183                pending_prune_ranges.schedule(from, to_exclusive);
184            }
185
186            debug!(
187                pipeline = H::NAME,
188                "Number of chunks to prune: {}",
189                pending_prune_ranges.len()
190            );
191
192            // (3) Prune chunk by chunk to avoid the task waiting on a long-running database
193            // transaction, between tests for cancellation.
194            // Spawn all tasks in parallel, but limit the number of concurrent tasks.
195            let semaphore = Arc::new(Semaphore::new(config.prune_concurrency as usize));
196            let mut tasks = FuturesUnordered::new();
197            for (from, to_exclusive) in pending_prune_ranges.iter() {
198                let semaphore = semaphore.clone();
199                let metrics = metrics.clone();
200                let handler = handler.clone();
201
202                let db = store.clone();
203
204                tasks.push(tokio::spawn(async move {
205                    let _permit = semaphore.acquire().await.unwrap();
206                    let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
207                    ((from, to_exclusive), result)
208                }));
209            }
210
211            // (4) Wait for all tasks to finish. For each task, if it succeeds, remove the range
212            // from the pending_prune_ranges. Otherwise the range will remain in the map and will be
213            // retried in the next iteration. Update the highest_pruned watermark if the task
214            // succeeds in metrics and in db, to minimize redundant pruner work if the pipeline is
215            // restarted.
216            while let Some(r) = tasks.next().await {
217                let ((from, to_exclusive), result) = r.unwrap();
218                match result {
219                    Ok(()) => {
220                        pending_prune_ranges.remove(&from);
221                        let pruner_hi = pending_prune_ranges.get_pruner_hi();
222                        highest_pruned = highest_pruned.max(pruner_hi);
223                    }
224                    Err(e) => {
225                        error!(
226                            pipeline = H::NAME,
227                            "Failed to prune data for range: {from} to {to_exclusive}: {e}"
228                        );
229                    }
230                }
231
232                if highest_pruned > highest_watermarked {
233                    metrics
234                        .watermark_pruner_hi
235                        .with_label_values(&[H::NAME])
236                        .set(highest_pruned as i64);
237
238                    let guard = metrics
239                        .watermark_pruner_write_latency
240                        .with_label_values(&[H::NAME])
241                        .start_timer();
242
243                    let Ok(mut conn) = store.connect().await else {
244                        warn!(
245                            pipeline = H::NAME,
246                            "Pruner failed to connect while updating watermark"
247                        );
248                        continue;
249                    };
250
251                    match conn.set_pruner_watermark(H::NAME, highest_pruned).await {
252                        Err(e) => {
253                            let elapsed = guard.stop_and_record();
254                            error!(
255                                pipeline = H::NAME,
256                                elapsed_ms = elapsed * 1000.0,
257                                "Failed to update pruner watermark: {e}"
258                            )
259                        }
260                        Ok(true) => {
261                            highest_watermarked = highest_pruned;
262                            let elapsed = guard.stop_and_record();
263                            logger.log::<H>(
264                                LoggerWatermark::checkpoint(highest_watermarked),
265                                elapsed,
266                            );
267
268                            metrics
269                                .watermark_pruner_hi_in_db
270                                .with_label_values(&[H::NAME])
271                                .set(highest_watermarked as i64);
272                        }
273                        Ok(false) => {}
274                    }
275                }
276            }
277        }
278    })
279}
280
281async fn prune_task_impl<H: Handler>(
282    metrics: Arc<IndexerMetrics>,
283    db: H::Store,
284    handler: Arc<H>,
285    from: u64,
286    to_exclusive: u64,
287) -> Result<(), anyhow::Error> {
288    metrics
289        .total_pruner_chunks_attempted
290        .with_label_values(&[H::NAME])
291        .inc();
292
293    let guard = metrics
294        .pruner_delete_latency
295        .with_label_values(&[H::NAME])
296        .start_timer();
297
298    let mut conn = db.connect().await?;
299
300    debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");
301
302    let affected = match handler.prune(from, to_exclusive, &mut conn).await {
303        Ok(affected) => {
304            guard.stop_and_record();
305            affected
306        }
307
308        Err(e) => {
309            guard.stop_and_record();
310            return Err(e);
311        }
312    };
313
314    metrics
315        .total_pruner_chunks_deleted
316        .with_label_values(&[H::NAME])
317        .inc();
318
319    metrics
320        .total_pruner_rows_deleted
321        .with_label_values(&[H::NAME])
322        .inc_by(affected as u64);
323
324    Ok(())
325}
326
327#[cfg(test)]
328mod tests {
329    use std::collections::HashMap;
330    use std::sync::Arc;
331    use std::time::SystemTime;
332    use std::time::UNIX_EPOCH;
333
334    use async_trait::async_trait;
335    use prometheus::Registry;
336    use sui_indexer_alt_framework_store_traits::testing::mock_store::MockWatermark;
337    use sui_types::full_checkpoint_content::Checkpoint;
338    use tokio::time::Duration;
339
340    use crate::FieldCount;
341    use crate::metrics::IndexerMetrics;
342    use crate::mocks::store::FallibleMockConnection;
343    use crate::mocks::store::FallibleMockStore;
344    use crate::pipeline::Processor;
345    use crate::pipeline::concurrent::BatchStatus;
346
347    use super::*;
348
349    #[derive(Clone, FieldCount)]
350    pub struct StoredData;
351
352    pub struct DataPipeline;
353
354    #[async_trait]
355    impl Processor for DataPipeline {
356        const NAME: &'static str = "data";
357
358        type Value = StoredData;
359
360        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
361            Ok(vec![])
362        }
363    }
364
365    #[async_trait]
366    impl Handler for DataPipeline {
367        type Store = FallibleMockStore;
368        type Batch = Vec<Self::Value>;
369
370        fn batch(
371            &self,
372            batch: &mut Self::Batch,
373            values: &mut std::vec::IntoIter<Self::Value>,
374        ) -> BatchStatus {
375            batch.extend(values);
376            BatchStatus::Pending
377        }
378
379        async fn commit<'a>(
380            &self,
381            batch: &Self::Batch,
382            _conn: &mut FallibleMockConnection<'a>,
383        ) -> anyhow::Result<usize> {
384            Ok(batch.len())
385        }
386
387        async fn prune<'a>(
388            &self,
389            from: u64,
390            to_exclusive: u64,
391            conn: &mut FallibleMockConnection<'a>,
392        ) -> anyhow::Result<usize> {
393            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
394        }
395    }
396
397    #[test]
398    fn test_pending_ranges_basic_scheduling() {
399        let mut ranges = PendingRanges::default();
400
401        // Schedule initial range
402        ranges.schedule(1, 5);
403
404        // Schedule non-overlapping range
405        ranges.schedule(10, 15);
406
407        // Verify ranges are stored correctly
408        let scheduled: Vec<_> = ranges.iter().collect();
409        assert_eq!(scheduled, vec![(1, 5), (10, 15)]);
410    }
411
412    #[test]
413    fn test_pending_ranges_double_pruning_prevention() {
414        let mut ranges = PendingRanges::default();
415
416        // Schedule initial range
417        ranges.schedule(1, 5);
418
419        // Try to schedule overlapping range.
420        ranges.schedule(3, 7);
421
422        let scheduled: Vec<_> = ranges.iter().collect();
423        assert_eq!(scheduled, vec![(1, 5), (5, 7)]);
424
425        // Try to schedule range that's entirely covered by previous range
426        ranges.schedule(2, 4); // Entirely within (1,5), should be ignored
427        assert_eq!(ranges.len(), 2); // No change
428
429        let scheduled: Vec<_> = ranges.iter().collect();
430        assert_eq!(scheduled, vec![(1, 5), (5, 7)]); // No change
431    }
432
433    #[test]
434    fn test_pending_ranges_exact_duplicate() {
435        let mut ranges = PendingRanges::default();
436
437        // Schedule initial range
438        ranges.schedule(1, 5);
439        assert_eq!(ranges.len(), 1);
440
441        // Schedule exact same range.
442        ranges.schedule(1, 5);
443        assert_eq!(ranges.len(), 1); // No change
444
445        let scheduled: Vec<_> = ranges.iter().collect();
446        assert_eq!(scheduled, vec![(1, 5)]);
447    }
448
449    #[test]
450    fn test_pending_ranges_adjacent_ranges() {
451        let mut ranges = PendingRanges::default();
452
453        // Schedule initial range
454        ranges.schedule(1, 5);
455
456        // Schedule adjacent range
457        ranges.schedule(5, 10);
458
459        let scheduled: Vec<_> = ranges.iter().collect();
460        assert_eq!(scheduled, vec![(1, 5), (5, 10)]);
461    }
462
463    #[test]
464    fn test_pending_ranges_remove_and_watermark() {
465        let mut ranges = PendingRanges::default();
466
467        // Schedule multiple ranges
468        ranges.schedule(1, 5);
469        ranges.schedule(10, 15);
470        ranges.schedule(20, 25);
471        assert_eq!(ranges.len(), 3);
472        assert_eq!(ranges.get_pruner_hi(), 1);
473
474        // Remove first range - watermark should advance
475        ranges.remove(&1);
476        assert_eq!(ranges.len(), 2);
477        assert_eq!(ranges.get_pruner_hi(), 10); // Next range starts at 10
478
479        // Remove middle range
480        ranges.remove(&10);
481        assert_eq!(ranges.len(), 1);
482        assert_eq!(ranges.get_pruner_hi(), 20);
483
484        // Remove last range - watermark should use last_scheduled_range
485        ranges.remove(&20);
486        assert_eq!(ranges.len(), 0);
487        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
488    }
489
490    #[test]
491    fn test_pending_ranges_remove_and_watermark_out_of_order() {
492        let mut ranges = PendingRanges::default();
493
494        // Schedule multiple ranges
495        ranges.schedule(1, 5);
496        ranges.schedule(10, 15);
497        ranges.schedule(20, 25);
498        assert_eq!(ranges.len(), 3);
499        assert_eq!(ranges.get_pruner_hi(), 1);
500
501        // Remove middle range
502        ranges.remove(&10);
503        assert_eq!(ranges.len(), 2);
504        assert_eq!(ranges.get_pruner_hi(), 1);
505
506        // Remove first range
507        ranges.remove(&1);
508        assert_eq!(ranges.len(), 1);
509        assert_eq!(ranges.get_pruner_hi(), 20);
510
511        // Remove last range - watermark should use last_scheduled_range
512        ranges.remove(&20);
513        assert_eq!(ranges.len(), 0);
514        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
515    }
516
517    #[tokio::test]
518    async fn test_pruner() {
519        let handler = Arc::new(DataPipeline);
520        let pruner_config = PrunerConfig {
521            interval_ms: 10,
522            delay_ms: 2000,
523            retention: 1,
524            max_chunk_size: 100,
525            prune_concurrency: 1,
526        };
527        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
528        let metrics = IndexerMetrics::new(None, &registry);
529
530        // Update data
531        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
532        // Update committer watermark
533        let timestamp = SystemTime::now()
534            .duration_since(UNIX_EPOCH)
535            .unwrap()
536            .as_millis() as u64;
537
538        let watermark = MockWatermark {
539            epoch_hi_inclusive: 0,
540            checkpoint_hi_inclusive: Some(3),
541            tx_hi: 9,
542            timestamp_ms_hi_inclusive: timestamp,
543            reader_lo: 3,
544            pruner_timestamp: timestamp,
545            pruner_hi: 0,
546            chain_id: None,
547        };
548        let store = FallibleMockStore::new()
549            .with_watermark(DataPipeline::NAME, watermark)
550            .with_data(DataPipeline::NAME, test_data);
551
552        // Start the pruner
553        let store_clone = store.clone();
554        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
555
556        // Wait a short time within delay_ms
557        tokio::time::sleep(Duration::from_millis(200)).await;
558        {
559            let data = store.data.get(DataPipeline::NAME).unwrap();
560            assert!(
561                data.contains_key(&1),
562                "Checkpoint 1 shouldn't be pruned before delay"
563            );
564            assert!(
565                data.contains_key(&2),
566                "Checkpoint 2 shouldn't be pruned before delay"
567            );
568            assert!(
569                data.contains_key(&3),
570                "Checkpoint 3 shouldn't be pruned before delay"
571            );
572        }
573
574        // Wait for the delay to expire
575        tokio::time::sleep(Duration::from_millis(2000)).await;
576
577        // Now checkpoint 1 should be pruned
578        {
579            let data = store.data.get(DataPipeline::NAME).unwrap();
580            assert!(
581                !data.contains_key(&1),
582                "Checkpoint 1 should be pruned after delay"
583            );
584
585            // Checkpoint 3 should never be pruned (it's the reader_lo)
586            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
587
588            // Check that the pruner_hi was updated past 1
589            let watermark = store.watermark(DataPipeline::NAME).unwrap();
590            assert!(
591                watermark.pruner_hi > 1,
592                "Pruner watermark should be updated"
593            );
594        }
595    }
596
597    #[tokio::test]
598    async fn test_pruner_timestamp_in_the_past() {
599        let handler = Arc::new(DataPipeline);
600        let pruner_config = PrunerConfig {
601            interval_ms: 10,
602            delay_ms: 20_000,
603            retention: 1,
604            max_chunk_size: 100,
605            prune_concurrency: 1,
606        };
607        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
608        let metrics = IndexerMetrics::new(None, &registry);
609
610        // Update data
611        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
612        // Update committer watermark
613        let timestamp = SystemTime::now()
614            .duration_since(UNIX_EPOCH)
615            .unwrap()
616            .as_millis() as u64;
617
618        let watermark = MockWatermark {
619            epoch_hi_inclusive: 0,
620            checkpoint_hi_inclusive: Some(3),
621            tx_hi: 9,
622            timestamp_ms_hi_inclusive: timestamp,
623            reader_lo: 3,
624            pruner_timestamp: 0,
625            pruner_hi: 0,
626            chain_id: None,
627        };
628        let store = FallibleMockStore::new()
629            .with_watermark(DataPipeline::NAME, watermark)
630            .with_data(DataPipeline::NAME, test_data);
631
632        // Start the pruner
633        let store_clone = store.clone();
634        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
635
636        // Because the `pruner_timestamp` is in the past, even with the delay_ms it should be pruned
637        // close to immediately. To be safe, sleep for 1000ms before checking, which is well under
638        // the delay_ms of 20_000 ms.
639        tokio::time::sleep(Duration::from_millis(500)).await;
640
641        {
642            let data = store.data.get(DataPipeline::NAME).unwrap();
643            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
644
645            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
646
647            // Checkpoint 3 should never be pruned (it's the reader_lo)
648            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
649
650            // Check that the pruner_hi was updated past 1
651            let watermark = store.watermark(DataPipeline::NAME).unwrap();
652            assert!(
653                watermark.pruner_hi > 1,
654                "Pruner watermark should be updated"
655            );
656        }
657    }
658
659    #[tokio::test]
660    async fn test_pruner_watermark_update_with_retries() {
661        let handler = Arc::new(DataPipeline);
662        let pruner_config = PrunerConfig {
663            interval_ms: 3_000, // Long interval to test retried attempts of failed range.
664            delay_ms: 100,      // Short delay to speed up each interval
665            retention: 1,
666            max_chunk_size: 1, // Process one checkpoint at a time
667            prune_concurrency: 1,
668        };
669        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
670        let metrics = IndexerMetrics::new(None, &registry);
671
672        // Set up test data for checkpoints 1-4
673        let test_data = HashMap::from([
674            (1, vec![1, 2]),
675            (2, vec![3, 4]),
676            (3, vec![5, 6]),
677            (4, vec![7, 8]),
678        ]);
679
680        let timestamp = SystemTime::now()
681            .duration_since(UNIX_EPOCH)
682            .unwrap()
683            .as_millis() as u64;
684
685        let watermark = MockWatermark {
686            epoch_hi_inclusive: 0,
687            checkpoint_hi_inclusive: Some(4),
688            tx_hi: 8,
689            timestamp_ms_hi_inclusive: timestamp,
690            reader_lo: 4,        // Allow pruning up to checkpoint 4 (exclusive)
691            pruner_timestamp: 0, // Past timestamp so delay doesn't block
692            pruner_hi: 1,
693            chain_id: None,
694        };
695
696        // Configure failing behavior: range [1,2) should fail once before succeeding
697        let store = FallibleMockStore::new()
698            .with_watermark(DataPipeline::NAME, watermark)
699            .with_data(DataPipeline::NAME, test_data.clone())
700            .with_prune_failures(1, 2, 1);
701
702        // Start the pruner
703        let store_clone = store.clone();
704        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
705
706        // Wait for first pruning cycle - ranges [2,3) and [3,4) should succeed, [1,2) should fail
707        tokio::time::sleep(Duration::from_millis(500)).await;
708        {
709            let data = store.data.get(DataPipeline::NAME).unwrap();
710            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
711
712            // Verify watermark doesn't advance past the failed range [1,2)
713            assert_eq!(
714                watermarks.pruner_hi, 1,
715                "Pruner watermark should remain at 1 because range [1,2) failed"
716            );
717            assert!(data.contains_key(&1), "Checkpoint 1 should be preserved");
718            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
719            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
720            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
721        }
722
723        // Wait for second pruning cycle - range [1,2) should succeed on retry
724        tokio::time::sleep(Duration::from_millis(3000)).await;
725        {
726            let data = store.data.get(DataPipeline::NAME).unwrap();
727            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
728
729            // Verify watermark advances after all ranges complete successfully
730            assert_eq!(
731                watermarks.pruner_hi, 4,
732                "Pruner watermark should advance to 4 after all ranges complete"
733            );
734            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
735            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
736            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
737            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
738        }
739    }
740}