sui_indexer_alt_framework/pipeline/concurrent/
pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, sync::Arc};
5
6use futures::StreamExt;
7use futures::stream::FuturesUnordered;
8use tokio::{
9    sync::Semaphore,
10    task::JoinHandle,
11    time::{MissedTickBehavior, interval},
12};
13use tokio_util::sync::CancellationToken;
14use tracing::{debug, error, info, warn};
15
16use crate::{
17    metrics::IndexerMetrics,
18    pipeline::logging::{LoggerWatermark, WatermarkLogger},
19    store::{Connection, Store},
20};
21
22use super::{Handler, PrunerConfig};
23
24#[derive(Default)]
25struct PendingRanges {
26    /// Maps from `from` to `to_exclusive` for all the ranges that are ready to be pruned.
27    ranges: BTreeMap<u64, u64>,
28    /// The last range that has been scheduled for pruning.
29    last_scheduled_range: Option<(u64, u64)>,
30}
31
32impl PendingRanges {
33    /// Schedule a new range to be pruned.
34    /// Using the last scheduled range to avoid double pruning of the same range.
35    /// This is important because double pruning will not always work since pruning
36    /// may not be idempotent for some pipelines.
37    /// For instance, if handler holds processed data needed for pruning,
38    /// the pruning step may remove those data once done.
39    fn schedule(&mut self, mut from: u64, to_exclusive: u64) {
40        let last_scheduled_range = self.last_scheduled_range.unwrap_or((0, 0));
41        // If the end of the last scheduled range is greater than the end of the new range,
42        // it means the entire new range was already scheduled before.
43        if to_exclusive <= last_scheduled_range.1 {
44            return;
45        }
46        // Otherwise, we make sure the new range starts after the last scheduled range.
47        from = from.max(last_scheduled_range.1);
48        self.ranges.insert(from, to_exclusive);
49        self.last_scheduled_range = Some((from, to_exclusive));
50    }
51
52    fn len(&self) -> usize {
53        self.ranges.len()
54    }
55
56    fn iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
57        self.ranges
58            .iter()
59            .map(|(from, to_exclusive)| (*from, *to_exclusive))
60    }
61
62    /// Remove the range from the pending_prune_ranges.
63    fn remove(&mut self, from: &u64) {
64        self.ranges.remove(from).unwrap();
65    }
66
67    /// Returns the current pruner_hi watermark, i.e. the first checkpoint that has not yet been pruned.
68    /// This will be the first key in the pending_prune_ranges map.
69    /// If the map is empty, then it is the last checkpoint that has been scheduled for pruning.
70    fn get_pruner_hi(&self) -> u64 {
71        self.ranges.keys().next().cloned().unwrap_or(
72            self.last_scheduled_range
73                .map(|(_, t)| t)
74                // get_pruner_hi will generally not be called until we have scheduled something.
75                // But return 0 just in case we called it earlier.
76                .unwrap_or_default(),
77        )
78    }
79}
80
81/// The pruner task is responsible for deleting old data from the database. It will periodically
82/// check the `watermarks` table to see if there is any data that should be pruned between the
83/// `pruner_hi` (inclusive), and `reader_lo` (exclusive) checkpoints. This task will also provide a
84/// mapping of the pruned checkpoints to their corresponding epoch and tx, which the handler can
85/// then use to delete the corresponding data from the database.
86///
87/// To ensure that the pruner does not interfere with reads that are still in flight, it respects
88/// the watermark's `pruner_timestamp`, which records the time that `reader_lo` was last updated.
89/// The task will not prune data until at least `config.delay()` has passed since `pruner_timestamp`
90/// to give in-flight reads time to land.
91///
92/// The task regularly traces its progress, outputting at a higher log level every
93/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
94///
95/// The task will shutdown if the `cancel` token is signalled. If the `config` is `None`, the task
96/// will shutdown immediately.
97pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
98    handler: Arc<H>,
99    config: Option<PrunerConfig>,
100    store: H::Store,
101    metrics: Arc<IndexerMetrics>,
102    cancel: CancellationToken,
103) -> JoinHandle<()> {
104    tokio::spawn(async move {
105        let Some(config) = config else {
106            info!(pipeline = H::NAME, "Skipping pruner task");
107            return;
108        };
109
110        info!(
111            pipeline = H::NAME,
112            "Starting pruner with config: {:?}", config
113        );
114
115        // The pruner can pause for a while, waiting for the delay imposed by the
116        // `pruner_timestamp` to expire. In that case, the period between ticks should not be
117        // compressed to make up for missed ticks.
118        let mut poll = interval(config.interval());
119        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
120
121        // The pruner task will periodically output a log message at a higher log level to
122        // demonstrate that it is making progress.
123        let mut logger = WatermarkLogger::new("pruner");
124
125        // Maintains the list of chunks that are ready to be pruned but not yet pruned.
126        // This map can contain ranges that were attempted to be pruned in previous iterations,
127        // but failed due to errors.
128        let mut pending_prune_ranges = PendingRanges::default();
129
130        loop {
131            // (1) Get the latest pruning bounds from the database.
132            let mut watermark = tokio::select! {
133                _ = cancel.cancelled() => {
134                    info!(pipeline = H::NAME, "Shutdown received");
135                    break;
136                }
137
138                _ = poll.tick() => {
139                    let guard = metrics
140                        .watermark_pruner_read_latency
141                        .with_label_values(&[H::NAME])
142                        .start_timer();
143
144                    let Ok(mut conn) = store.connect().await else {
145                        warn!(pipeline = H::NAME, "Pruner failed to connect, while fetching watermark");
146                        continue;
147                    };
148
149                    match conn.pruner_watermark(H::NAME, config.delay()).await {
150                        Ok(Some(current)) => {
151                            guard.stop_and_record();
152                            current
153                        }
154
155                        Ok(None) => {
156                            guard.stop_and_record();
157                            warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
158                            continue;
159                        }
160
161                        Err(e) => {
162                            guard.stop_and_record();
163                            warn!(pipeline = H::NAME, "Failed to get watermark: {e}");
164                            continue;
165                        }
166                    }
167                }
168            };
169
170            // (2) Wait until this information can be acted upon.
171            if let Some(wait_for) = watermark.wait_for() {
172                debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune");
173                tokio::select! {
174                    _ = tokio::time::sleep(wait_for) => {}
175                    _ = cancel.cancelled() => {
176                        info!(pipeline = H::NAME, "Shutdown received");
177                        break;
178                    }
179                }
180            }
181
182            // Tracks the current highest `pruner_hi` not yet written to db. This is updated as
183            // chunks complete.
184            let mut highest_pruned = watermark.pruner_hi;
185            // Tracks the `pruner_hi` that has been written to the db.
186            let mut highest_watermarked = watermark.pruner_hi;
187
188            // (3) Collect all the new chunks that are ready to be pruned.
189            // This will also advance the watermark.
190            while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
191                pending_prune_ranges.schedule(from, to_exclusive);
192            }
193
194            debug!(
195                pipeline = H::NAME,
196                "Number of chunks to prune: {}",
197                pending_prune_ranges.len()
198            );
199
200            // (3) Prune chunk by chunk to avoid the task waiting on a long-running database
201            // transaction, between tests for cancellation.
202            // Spawn all tasks in parallel, but limit the number of concurrent tasks.
203            let semaphore = Arc::new(Semaphore::new(config.prune_concurrency as usize));
204            let mut tasks = FuturesUnordered::new();
205            for (from, to_exclusive) in pending_prune_ranges.iter() {
206                let semaphore = semaphore.clone();
207                let cancel = cancel.child_token();
208                let metrics = metrics.clone();
209                let handler = handler.clone();
210
211                let db = store.clone();
212
213                tasks.push(tokio::spawn(async move {
214                    let _permit = tokio::select! {
215                        permit = semaphore.acquire() => {
216                            permit.unwrap()
217                        }
218                        _ = cancel.cancelled() => {
219                            return ((from, to_exclusive), Err(anyhow::anyhow!("Cancelled")));
220                        }
221                    };
222                    let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
223                    ((from, to_exclusive), result)
224                }));
225            }
226
227            // (4) Wait for all tasks to finish. For each task, if it succeeds, remove the range
228            // from the pending_prune_ranges. Otherwise the range will remain in the map and will be
229            // retried in the next iteration. Update the highest_pruned watermark if the task
230            // succeeds in metrics and in db, to minimize redundant pruner work if the pipeline is
231            // restarted.
232            while let Some(r) = tasks.next().await {
233                let ((from, to_exclusive), result) = r.unwrap();
234                match result {
235                    Ok(()) => {
236                        pending_prune_ranges.remove(&from);
237                        let pruner_hi = pending_prune_ranges.get_pruner_hi();
238                        highest_pruned = highest_pruned.max(pruner_hi);
239                    }
240                    Err(e) => {
241                        error!(
242                            pipeline = H::NAME,
243                            "Failed to prune data for range: {from} to {to_exclusive}: {e}"
244                        );
245                    }
246                }
247
248                if highest_pruned > highest_watermarked {
249                    metrics
250                        .watermark_pruner_hi
251                        .with_label_values(&[H::NAME])
252                        .set(highest_pruned as i64);
253
254                    let guard = metrics
255                        .watermark_pruner_write_latency
256                        .with_label_values(&[H::NAME])
257                        .start_timer();
258
259                    let Ok(mut conn) = store.connect().await else {
260                        warn!(
261                            pipeline = H::NAME,
262                            "Pruner failed to connect while updating watermark"
263                        );
264                        continue;
265                    };
266
267                    match conn.set_pruner_watermark(H::NAME, highest_pruned).await {
268                        Err(e) => {
269                            let elapsed = guard.stop_and_record();
270                            error!(
271                                pipeline = H::NAME,
272                                elapsed_ms = elapsed * 1000.0,
273                                "Failed to update pruner watermark: {e}"
274                            )
275                        }
276                        Ok(true) => {
277                            highest_watermarked = highest_pruned;
278                            let elapsed = guard.stop_and_record();
279                            logger.log::<H>(
280                                LoggerWatermark::checkpoint(highest_watermarked),
281                                elapsed,
282                            );
283
284                            metrics
285                                .watermark_pruner_hi_in_db
286                                .with_label_values(&[H::NAME])
287                                .set(highest_watermarked as i64);
288                        }
289                        Ok(false) => {}
290                    }
291                }
292            }
293        }
294
295        info!(pipeline = H::NAME, "Stopping pruner");
296    })
297}
298
299async fn prune_task_impl<H: Handler + Send + Sync + 'static>(
300    metrics: Arc<IndexerMetrics>,
301    db: H::Store,
302    handler: Arc<H>,
303    from: u64,
304    to_exclusive: u64,
305) -> Result<(), anyhow::Error> {
306    metrics
307        .total_pruner_chunks_attempted
308        .with_label_values(&[H::NAME])
309        .inc();
310
311    let guard = metrics
312        .pruner_delete_latency
313        .with_label_values(&[H::NAME])
314        .start_timer();
315
316    let mut conn = db.connect().await?;
317
318    debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");
319
320    let affected = match handler.prune(from, to_exclusive, &mut conn).await {
321        Ok(affected) => {
322            guard.stop_and_record();
323            affected
324        }
325
326        Err(e) => {
327            guard.stop_and_record();
328            return Err(e);
329        }
330    };
331
332    metrics
333        .total_pruner_chunks_deleted
334        .with_label_values(&[H::NAME])
335        .inc();
336
337    metrics
338        .total_pruner_rows_deleted
339        .with_label_values(&[H::NAME])
340        .inc_by(affected as u64);
341
342    Ok(())
343}
344
345#[cfg(test)]
346mod tests {
347    use std::sync::Arc;
348    use std::{
349        collections::HashMap,
350        time::{SystemTime, UNIX_EPOCH},
351    };
352
353    use async_trait::async_trait;
354    use prometheus::Registry;
355    use sui_types::full_checkpoint_content::CheckpointData;
356    use tokio::time::Duration;
357    use tokio_util::sync::CancellationToken;
358
359    use crate::{FieldCount, metrics::IndexerMetrics, mocks::store::*, pipeline::Processor};
360
361    use super::*;
362
363    #[derive(Clone, FieldCount)]
364    pub struct StoredData;
365
366    pub struct DataPipeline;
367
368    #[async_trait]
369    impl Processor for DataPipeline {
370        const NAME: &'static str = "data";
371
372        type Value = StoredData;
373
374        async fn process(
375            &self,
376            _checkpoint: &Arc<CheckpointData>,
377        ) -> anyhow::Result<Vec<Self::Value>> {
378            Ok(vec![])
379        }
380    }
381
382    #[async_trait]
383    impl Handler for DataPipeline {
384        type Store = MockStore;
385
386        async fn commit<'a>(
387            values: &[Self::Value],
388            _conn: &mut MockConnection<'a>,
389        ) -> anyhow::Result<usize> {
390            Ok(values.len())
391        }
392
393        async fn prune<'a>(
394            &self,
395            from: u64,
396            to_exclusive: u64,
397            conn: &mut MockConnection<'a>,
398        ) -> anyhow::Result<usize> {
399            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
400        }
401    }
402
403    #[test]
404    fn test_pending_ranges_basic_scheduling() {
405        let mut ranges = PendingRanges::default();
406
407        // Schedule initial range
408        ranges.schedule(1, 5);
409
410        // Schedule non-overlapping range
411        ranges.schedule(10, 15);
412
413        // Verify ranges are stored correctly
414        let scheduled: Vec<_> = ranges.iter().collect();
415        assert_eq!(scheduled, vec![(1, 5), (10, 15)]);
416    }
417
418    #[test]
419    fn test_pending_ranges_double_pruning_prevention() {
420        let mut ranges = PendingRanges::default();
421
422        // Schedule initial range
423        ranges.schedule(1, 5);
424
425        // Try to schedule overlapping range.
426        ranges.schedule(3, 7);
427
428        let scheduled: Vec<_> = ranges.iter().collect();
429        assert_eq!(scheduled, vec![(1, 5), (5, 7)]);
430
431        // Try to schedule range that's entirely covered by previous range
432        ranges.schedule(2, 4); // Entirely within (1,5), should be ignored
433        assert_eq!(ranges.len(), 2); // No change
434
435        let scheduled: Vec<_> = ranges.iter().collect();
436        assert_eq!(scheduled, vec![(1, 5), (5, 7)]); // No change
437    }
438
439    #[test]
440    fn test_pending_ranges_exact_duplicate() {
441        let mut ranges = PendingRanges::default();
442
443        // Schedule initial range
444        ranges.schedule(1, 5);
445        assert_eq!(ranges.len(), 1);
446
447        // Schedule exact same range.
448        ranges.schedule(1, 5);
449        assert_eq!(ranges.len(), 1); // No change
450
451        let scheduled: Vec<_> = ranges.iter().collect();
452        assert_eq!(scheduled, vec![(1, 5)]);
453    }
454
455    #[test]
456    fn test_pending_ranges_adjacent_ranges() {
457        let mut ranges = PendingRanges::default();
458
459        // Schedule initial range
460        ranges.schedule(1, 5);
461
462        // Schedule adjacent range
463        ranges.schedule(5, 10);
464
465        let scheduled: Vec<_> = ranges.iter().collect();
466        assert_eq!(scheduled, vec![(1, 5), (5, 10)]);
467    }
468
469    #[test]
470    fn test_pending_ranges_remove_and_watermark() {
471        let mut ranges = PendingRanges::default();
472
473        // Schedule multiple ranges
474        ranges.schedule(1, 5);
475        ranges.schedule(10, 15);
476        ranges.schedule(20, 25);
477        assert_eq!(ranges.len(), 3);
478        assert_eq!(ranges.get_pruner_hi(), 1);
479
480        // Remove first range - watermark should advance
481        ranges.remove(&1);
482        assert_eq!(ranges.len(), 2);
483        assert_eq!(ranges.get_pruner_hi(), 10); // Next range starts at 10
484
485        // Remove middle range
486        ranges.remove(&10);
487        assert_eq!(ranges.len(), 1);
488        assert_eq!(ranges.get_pruner_hi(), 20);
489
490        // Remove last range - watermark should use last_scheduled_range
491        ranges.remove(&20);
492        assert_eq!(ranges.len(), 0);
493        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
494    }
495
496    #[test]
497    fn test_pending_ranges_remove_and_watermark_out_of_order() {
498        let mut ranges = PendingRanges::default();
499
500        // Schedule multiple ranges
501        ranges.schedule(1, 5);
502        ranges.schedule(10, 15);
503        ranges.schedule(20, 25);
504        assert_eq!(ranges.len(), 3);
505        assert_eq!(ranges.get_pruner_hi(), 1);
506
507        // Remove middle range
508        ranges.remove(&10);
509        assert_eq!(ranges.len(), 2);
510        assert_eq!(ranges.get_pruner_hi(), 1);
511
512        // Remove first range
513        ranges.remove(&1);
514        assert_eq!(ranges.len(), 1);
515        assert_eq!(ranges.get_pruner_hi(), 20);
516
517        // Remove last range - watermark should use last_scheduled_range
518        ranges.remove(&20);
519        assert_eq!(ranges.len(), 0);
520        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
521    }
522
523    #[tokio::test]
524    async fn test_pruner() {
525        let handler = Arc::new(DataPipeline);
526        let pruner_config = PrunerConfig {
527            interval_ms: 10,
528            delay_ms: 2000,
529            retention: 1,
530            max_chunk_size: 100,
531            prune_concurrency: 1,
532        };
533        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
534        let metrics = IndexerMetrics::new(None, &registry);
535        let cancel = CancellationToken::new();
536
537        // Update data
538        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
539        // Update committer watermark
540        let timestamp = SystemTime::now()
541            .duration_since(UNIX_EPOCH)
542            .unwrap()
543            .as_millis() as u64;
544
545        let watermark = MockWatermark {
546            epoch_hi_inclusive: 0,
547            checkpoint_hi_inclusive: 3,
548            tx_hi: 9,
549            timestamp_ms_hi_inclusive: timestamp,
550            reader_lo: 3,
551            pruner_timestamp: timestamp,
552            pruner_hi: 0,
553        };
554        let store = MockStore::new()
555            .with_watermark(DataPipeline::NAME, watermark)
556            .with_data(DataPipeline::NAME, test_data);
557
558        // Start the pruner
559        let store_clone = store.clone();
560        let cancel_clone = cancel.clone();
561        let pruner_handle = tokio::spawn(async move {
562            pruner(
563                handler,
564                Some(pruner_config),
565                store_clone,
566                metrics,
567                cancel_clone,
568            )
569            .await
570        });
571
572        // Wait a short time within delay_ms
573        tokio::time::sleep(Duration::from_millis(200)).await;
574        {
575            let data = store.data.get(DataPipeline::NAME).unwrap();
576            assert!(
577                data.contains_key(&1),
578                "Checkpoint 1 shouldn't be pruned before delay"
579            );
580            assert!(
581                data.contains_key(&2),
582                "Checkpoint 2 shouldn't be pruned before delay"
583            );
584            assert!(
585                data.contains_key(&3),
586                "Checkpoint 3 shouldn't be pruned before delay"
587            );
588        }
589
590        // Wait for the delay to expire
591        tokio::time::sleep(Duration::from_millis(2000)).await;
592
593        // Now checkpoint 1 should be pruned
594        {
595            let data = store.data.get(DataPipeline::NAME).unwrap();
596            assert!(
597                !data.contains_key(&1),
598                "Checkpoint 1 should be pruned after delay"
599            );
600
601            // Checkpoint 3 should never be pruned (it's the reader_lo)
602            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
603
604            // Check that the pruner_hi was updated past 1
605            let watermark = store.watermark(DataPipeline::NAME).unwrap();
606            assert!(
607                watermark.pruner_hi > 1,
608                "Pruner watermark should be updated"
609            );
610        }
611
612        // Clean up
613        cancel.cancel();
614        let _ = pruner_handle.await;
615    }
616
617    #[tokio::test]
618    async fn test_pruner_timestamp_in_the_past() {
619        let handler = Arc::new(DataPipeline);
620        let pruner_config = PrunerConfig {
621            interval_ms: 10,
622            delay_ms: 20_000,
623            retention: 1,
624            max_chunk_size: 100,
625            prune_concurrency: 1,
626        };
627        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
628        let metrics = IndexerMetrics::new(None, &registry);
629        let cancel = CancellationToken::new();
630
631        // Update data
632        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
633        // Update committer watermark
634        let timestamp = SystemTime::now()
635            .duration_since(UNIX_EPOCH)
636            .unwrap()
637            .as_millis() as u64;
638
639        let watermark = MockWatermark {
640            epoch_hi_inclusive: 0,
641            checkpoint_hi_inclusive: 3,
642            tx_hi: 9,
643            timestamp_ms_hi_inclusive: timestamp,
644            reader_lo: 3,
645            pruner_timestamp: 0,
646            pruner_hi: 0,
647        };
648        let store = MockStore::new()
649            .with_watermark(DataPipeline::NAME, watermark)
650            .with_data(DataPipeline::NAME, test_data);
651
652        // Start the pruner
653        let store_clone = store.clone();
654        let cancel_clone = cancel.clone();
655        let pruner_handle = tokio::spawn(async move {
656            pruner(
657                handler,
658                Some(pruner_config),
659                store_clone,
660                metrics,
661                cancel_clone,
662            )
663            .await
664        });
665
666        // Because the `pruner_timestamp` is in the past, even with the delay_ms it should be pruned
667        // close to immediately. To be safe, sleep for 1000ms before checking, which is well under
668        // the delay_ms of 20_000 ms.
669        tokio::time::sleep(Duration::from_millis(500)).await;
670
671        {
672            let data = store.data.get(DataPipeline::NAME).unwrap();
673            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
674
675            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
676
677            // Checkpoint 3 should never be pruned (it's the reader_lo)
678            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
679
680            // Check that the pruner_hi was updated past 1
681            let watermark = store.watermark(DataPipeline::NAME).unwrap();
682            assert!(
683                watermark.pruner_hi > 1,
684                "Pruner watermark should be updated"
685            );
686        }
687
688        // Clean up
689        cancel.cancel();
690        let _ = pruner_handle.await;
691    }
692
693    #[tokio::test]
694    async fn test_pruner_watermark_update_with_retries() {
695        let handler = Arc::new(DataPipeline);
696        let pruner_config = PrunerConfig {
697            interval_ms: 3_000, // Long interval to test retried attempts of failed range.
698            delay_ms: 100,      // Short delay to speed up each interval
699            retention: 1,
700            max_chunk_size: 1, // Process one checkpoint at a time
701            prune_concurrency: 1,
702        };
703        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
704        let metrics = IndexerMetrics::new(None, &registry);
705        let cancel = CancellationToken::new();
706
707        // Set up test data for checkpoints 1-4
708        let test_data = HashMap::from([
709            (1, vec![1, 2]),
710            (2, vec![3, 4]),
711            (3, vec![5, 6]),
712            (4, vec![7, 8]),
713        ]);
714
715        let timestamp = SystemTime::now()
716            .duration_since(UNIX_EPOCH)
717            .unwrap()
718            .as_millis() as u64;
719
720        let watermark = MockWatermark {
721            epoch_hi_inclusive: 0,
722            checkpoint_hi_inclusive: 4,
723            tx_hi: 8,
724            timestamp_ms_hi_inclusive: timestamp,
725            reader_lo: 4,        // Allow pruning up to checkpoint 4 (exclusive)
726            pruner_timestamp: 0, // Past timestamp so delay doesn't block
727            pruner_hi: 1,
728        };
729
730        // Configure failing behavior: range [1,2) should fail once before succeeding
731        let store = MockStore::new()
732            .with_watermark(DataPipeline::NAME, watermark)
733            .with_data(DataPipeline::NAME, test_data.clone())
734            .with_prune_failures(1, 2, 1);
735
736        // Start the pruner
737        let store_clone = store.clone();
738        let cancel_clone = cancel.clone();
739        let pruner_handle = tokio::spawn(async move {
740            pruner(
741                handler,
742                Some(pruner_config),
743                store_clone,
744                metrics,
745                cancel_clone,
746            )
747            .await
748        });
749
750        // Wait for first pruning cycle - ranges [2,3) and [3,4) should succeed, [1,2) should fail
751        tokio::time::sleep(Duration::from_millis(500)).await;
752        {
753            let data = store.data.get(DataPipeline::NAME).unwrap();
754            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
755
756            // Verify watermark doesn't advance past the failed range [1,2)
757            assert_eq!(
758                watermarks.pruner_hi, 1,
759                "Pruner watermark should remain at 1 because range [1,2) failed"
760            );
761            assert!(data.contains_key(&1), "Checkpoint 1 should be preserved");
762            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
763            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
764            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
765        }
766
767        // Wait for second pruning cycle - range [1,2) should succeed on retry
768        tokio::time::sleep(Duration::from_millis(3000)).await;
769        {
770            let data = store.data.get(DataPipeline::NAME).unwrap();
771            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
772
773            // Verify watermark advances after all ranges complete successfully
774            assert_eq!(
775                watermarks.pruner_hi, 4,
776                "Pruner watermark should advance to 4 after all ranges complete"
777            );
778            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
779            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
780            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
781            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
782        }
783
784        // Clean up
785        cancel.cancel();
786        let _ = pruner_handle.await;
787    }
788}