sui_indexer_alt_framework/pipeline/concurrent/
pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use futures::stream::FuturesUnordered;
9use sui_futures::service::Service;
10use tokio::sync::Semaphore;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::concurrent::Handler;
20use crate::pipeline::concurrent::PrunerConfig;
21use crate::pipeline::logging::LoggerWatermark;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::Connection;
24use crate::store::Store;
25
26#[derive(Default)]
27struct PendingRanges {
28    /// Maps from `from` to `to_exclusive` for all the ranges that are ready to be pruned.
29    ranges: BTreeMap<u64, u64>,
30    /// The last range that has been scheduled for pruning.
31    last_scheduled_range: Option<(u64, u64)>,
32}
33
34impl PendingRanges {
35    /// Schedule a new range to be pruned.
36    /// Using the last scheduled range to avoid double pruning of the same range.
37    /// This is important because double pruning will not always work since pruning
38    /// may not be idempotent for some pipelines.
39    /// For instance, if handler holds processed data needed for pruning,
40    /// the pruning step may remove those data once done.
41    fn schedule(&mut self, mut from: u64, to_exclusive: u64) {
42        let last_scheduled_range = self.last_scheduled_range.unwrap_or((0, 0));
43        // If the end of the last scheduled range is greater than the end of the new range,
44        // it means the entire new range was already scheduled before.
45        if to_exclusive <= last_scheduled_range.1 {
46            return;
47        }
48        // Otherwise, we make sure the new range starts after the last scheduled range.
49        from = from.max(last_scheduled_range.1);
50        self.ranges.insert(from, to_exclusive);
51        self.last_scheduled_range = Some((from, to_exclusive));
52    }
53
54    fn len(&self) -> usize {
55        self.ranges.len()
56    }
57
58    fn iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
59        self.ranges
60            .iter()
61            .map(|(from, to_exclusive)| (*from, *to_exclusive))
62    }
63
64    /// Remove the range from the pending_prune_ranges.
65    fn remove(&mut self, from: &u64) {
66        self.ranges.remove(from).unwrap();
67    }
68
69    /// Returns the current pruner_hi watermark, i.e. the first checkpoint that has not yet been pruned.
70    /// This will be the first key in the pending_prune_ranges map.
71    /// If the map is empty, then it is the last checkpoint that has been scheduled for pruning.
72    fn get_pruner_hi(&self) -> u64 {
73        self.ranges.keys().next().cloned().unwrap_or(
74            self.last_scheduled_range
75                .map(|(_, t)| t)
76                // get_pruner_hi will generally not be called until we have scheduled something.
77                // But return 0 just in case we called it earlier.
78                .unwrap_or_default(),
79        )
80    }
81}
82
83/// The pruner task is responsible for deleting old data from the database. It will periodically
84/// check the `watermarks` table to see if there is any data that should be pruned between the
85/// `pruner_hi` (inclusive), and `reader_lo` (exclusive) checkpoints. This task will also provide a
86/// mapping of the pruned checkpoints to their corresponding epoch and tx, which the handler can
87/// then use to delete the corresponding data from the database.
88///
89/// To ensure that the pruner does not interfere with reads that are still in flight, it respects
90/// the watermark's `pruner_timestamp`, which records the time that `reader_lo` was last updated.
91/// The task will not prune data until at least `config.delay()` has passed since `pruner_timestamp`
92/// to give in-flight reads time to land.
93///
94/// The task regularly traces its progress, outputting at a higher log level every
95/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
96///
97/// If the `config` is `None`, the task will shutdown immediately.
98pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
99    handler: Arc<H>,
100    config: Option<PrunerConfig>,
101    store: H::Store,
102    metrics: Arc<IndexerMetrics>,
103) -> Service {
104    Service::new().spawn_aborting(async move {
105        let Some(config) = config else {
106            info!(pipeline = H::NAME, "Skipping pruner task");
107            return Ok(());
108        };
109
110        info!(
111            pipeline = H::NAME,
112            "Starting pruner with config: {:?}", config
113        );
114
115        // The pruner can pause for a while, waiting for the delay imposed by the
116        // `pruner_timestamp` to expire. In that case, the period between ticks should not be
117        // compressed to make up for missed ticks.
118        let mut poll = interval(config.interval());
119        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
120
121        // The pruner task will periodically output a log message at a higher log level to
122        // demonstrate that it is making progress.
123        let mut logger = WatermarkLogger::new("pruner");
124
125        // Maintains the list of chunks that are ready to be pruned but not yet pruned.
126        // This map can contain ranges that were attempted to be pruned in previous iterations,
127        // but failed due to errors.
128        let mut pending_prune_ranges = PendingRanges::default();
129
130        loop {
131            poll.tick().await;
132
133            // (1) Get the latest pruning bounds from the database.
134            let mut watermark = {
135                let guard = metrics
136                    .watermark_pruner_read_latency
137                    .with_label_values(&[H::NAME])
138                    .start_timer();
139
140                let Ok(mut conn) = store.connect().await else {
141                    warn!(
142                        pipeline = H::NAME,
143                        "Pruner failed to connect, while fetching watermark"
144                    );
145                    continue;
146                };
147
148                match conn.pruner_watermark(H::NAME, config.delay()).await {
149                    Ok(Some(current)) => {
150                        guard.stop_and_record();
151                        current
152                    }
153
154                    Ok(None) => {
155                        guard.stop_and_record();
156                        warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
157                        continue;
158                    }
159
160                    Err(e) => {
161                        guard.stop_and_record();
162                        warn!(pipeline = H::NAME, "Failed to get watermark: {e}");
163                        continue;
164                    }
165                }
166            };
167
168            // (2) Wait until this information can be acted upon.
169            if let Some(wait_for) = watermark.wait_for() {
170                debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune");
171                tokio::time::sleep(wait_for).await;
172            }
173
174            // Tracks the current highest `pruner_hi` not yet written to db. This is updated as
175            // chunks complete.
176            let mut highest_pruned = watermark.pruner_hi;
177            // Tracks the `pruner_hi` that has been written to the db.
178            let mut highest_watermarked = watermark.pruner_hi;
179
180            // (3) Collect all the new chunks that are ready to be pruned.
181            // This will also advance the watermark.
182            while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
183                pending_prune_ranges.schedule(from, to_exclusive);
184            }
185
186            debug!(
187                pipeline = H::NAME,
188                "Number of chunks to prune: {}",
189                pending_prune_ranges.len()
190            );
191
192            // (3) Prune chunk by chunk to avoid the task waiting on a long-running database
193            // transaction, between tests for cancellation.
194            // Spawn all tasks in parallel, but limit the number of concurrent tasks.
195            let semaphore = Arc::new(Semaphore::new(config.prune_concurrency as usize));
196            let mut tasks = FuturesUnordered::new();
197            for (from, to_exclusive) in pending_prune_ranges.iter() {
198                let semaphore = semaphore.clone();
199                let metrics = metrics.clone();
200                let handler = handler.clone();
201
202                let db = store.clone();
203
204                tasks.push(tokio::spawn(async move {
205                    let _permit = semaphore.acquire().await.unwrap();
206                    let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
207                    ((from, to_exclusive), result)
208                }));
209            }
210
211            // (4) Wait for all tasks to finish. For each task, if it succeeds, remove the range
212            // from the pending_prune_ranges. Otherwise the range will remain in the map and will be
213            // retried in the next iteration. Update the highest_pruned watermark if the task
214            // succeeds in metrics and in db, to minimize redundant pruner work if the pipeline is
215            // restarted.
216            while let Some(r) = tasks.next().await {
217                let ((from, to_exclusive), result) = r.unwrap();
218                match result {
219                    Ok(()) => {
220                        pending_prune_ranges.remove(&from);
221                        let pruner_hi = pending_prune_ranges.get_pruner_hi();
222                        highest_pruned = highest_pruned.max(pruner_hi);
223                    }
224                    Err(e) => {
225                        error!(
226                            pipeline = H::NAME,
227                            "Failed to prune data for range: {from} to {to_exclusive}: {e}"
228                        );
229                    }
230                }
231
232                if highest_pruned > highest_watermarked {
233                    metrics
234                        .watermark_pruner_hi
235                        .with_label_values(&[H::NAME])
236                        .set(highest_pruned as i64);
237
238                    let guard = metrics
239                        .watermark_pruner_write_latency
240                        .with_label_values(&[H::NAME])
241                        .start_timer();
242
243                    let Ok(mut conn) = store.connect().await else {
244                        warn!(
245                            pipeline = H::NAME,
246                            "Pruner failed to connect while updating watermark"
247                        );
248                        continue;
249                    };
250
251                    match conn.set_pruner_watermark(H::NAME, highest_pruned).await {
252                        Err(e) => {
253                            let elapsed = guard.stop_and_record();
254                            error!(
255                                pipeline = H::NAME,
256                                elapsed_ms = elapsed * 1000.0,
257                                "Failed to update pruner watermark: {e}"
258                            )
259                        }
260                        Ok(true) => {
261                            highest_watermarked = highest_pruned;
262                            let elapsed = guard.stop_and_record();
263                            logger.log::<H>(
264                                LoggerWatermark::checkpoint(highest_watermarked),
265                                elapsed,
266                            );
267
268                            metrics
269                                .watermark_pruner_hi_in_db
270                                .with_label_values(&[H::NAME])
271                                .set(highest_watermarked as i64);
272                        }
273                        Ok(false) => {}
274                    }
275                }
276            }
277        }
278    })
279}
280
281async fn prune_task_impl<H: Handler + Send + Sync + 'static>(
282    metrics: Arc<IndexerMetrics>,
283    db: H::Store,
284    handler: Arc<H>,
285    from: u64,
286    to_exclusive: u64,
287) -> Result<(), anyhow::Error> {
288    metrics
289        .total_pruner_chunks_attempted
290        .with_label_values(&[H::NAME])
291        .inc();
292
293    let guard = metrics
294        .pruner_delete_latency
295        .with_label_values(&[H::NAME])
296        .start_timer();
297
298    let mut conn = db.connect().await?;
299
300    debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");
301
302    let affected = match handler.prune(from, to_exclusive, &mut conn).await {
303        Ok(affected) => {
304            guard.stop_and_record();
305            affected
306        }
307
308        Err(e) => {
309            guard.stop_and_record();
310            return Err(e);
311        }
312    };
313
314    metrics
315        .total_pruner_chunks_deleted
316        .with_label_values(&[H::NAME])
317        .inc();
318
319    metrics
320        .total_pruner_rows_deleted
321        .with_label_values(&[H::NAME])
322        .inc_by(affected as u64);
323
324    Ok(())
325}
326
327#[cfg(test)]
328mod tests {
329    use std::collections::HashMap;
330    use std::sync::Arc;
331    use std::time::SystemTime;
332    use std::time::UNIX_EPOCH;
333
334    use async_trait::async_trait;
335    use prometheus::Registry;
336    use sui_types::full_checkpoint_content::Checkpoint;
337    use tokio::time::Duration;
338
339    use crate::FieldCount;
340    use crate::metrics::IndexerMetrics;
341    use crate::mocks::store::*;
342    use crate::pipeline::Processor;
343    use crate::pipeline::concurrent::BatchStatus;
344
345    use super::*;
346
347    #[derive(Clone, FieldCount)]
348    pub struct StoredData;
349
350    pub struct DataPipeline;
351
352    #[async_trait]
353    impl Processor for DataPipeline {
354        const NAME: &'static str = "data";
355
356        type Value = StoredData;
357
358        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
359            Ok(vec![])
360        }
361    }
362
363    #[async_trait]
364    impl Handler for DataPipeline {
365        type Store = MockStore;
366        type Batch = Vec<Self::Value>;
367
368        fn batch(
369            &self,
370            batch: &mut Self::Batch,
371            values: &mut std::vec::IntoIter<Self::Value>,
372        ) -> BatchStatus {
373            batch.extend(values);
374            BatchStatus::Pending
375        }
376
377        async fn commit<'a>(
378            &self,
379            batch: &Self::Batch,
380            _conn: &mut MockConnection<'a>,
381        ) -> anyhow::Result<usize> {
382            Ok(batch.len())
383        }
384
385        async fn prune<'a>(
386            &self,
387            from: u64,
388            to_exclusive: u64,
389            conn: &mut MockConnection<'a>,
390        ) -> anyhow::Result<usize> {
391            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
392        }
393    }
394
395    #[test]
396    fn test_pending_ranges_basic_scheduling() {
397        let mut ranges = PendingRanges::default();
398
399        // Schedule initial range
400        ranges.schedule(1, 5);
401
402        // Schedule non-overlapping range
403        ranges.schedule(10, 15);
404
405        // Verify ranges are stored correctly
406        let scheduled: Vec<_> = ranges.iter().collect();
407        assert_eq!(scheduled, vec![(1, 5), (10, 15)]);
408    }
409
410    #[test]
411    fn test_pending_ranges_double_pruning_prevention() {
412        let mut ranges = PendingRanges::default();
413
414        // Schedule initial range
415        ranges.schedule(1, 5);
416
417        // Try to schedule overlapping range.
418        ranges.schedule(3, 7);
419
420        let scheduled: Vec<_> = ranges.iter().collect();
421        assert_eq!(scheduled, vec![(1, 5), (5, 7)]);
422
423        // Try to schedule range that's entirely covered by previous range
424        ranges.schedule(2, 4); // Entirely within (1,5), should be ignored
425        assert_eq!(ranges.len(), 2); // No change
426
427        let scheduled: Vec<_> = ranges.iter().collect();
428        assert_eq!(scheduled, vec![(1, 5), (5, 7)]); // No change
429    }
430
431    #[test]
432    fn test_pending_ranges_exact_duplicate() {
433        let mut ranges = PendingRanges::default();
434
435        // Schedule initial range
436        ranges.schedule(1, 5);
437        assert_eq!(ranges.len(), 1);
438
439        // Schedule exact same range.
440        ranges.schedule(1, 5);
441        assert_eq!(ranges.len(), 1); // No change
442
443        let scheduled: Vec<_> = ranges.iter().collect();
444        assert_eq!(scheduled, vec![(1, 5)]);
445    }
446
447    #[test]
448    fn test_pending_ranges_adjacent_ranges() {
449        let mut ranges = PendingRanges::default();
450
451        // Schedule initial range
452        ranges.schedule(1, 5);
453
454        // Schedule adjacent range
455        ranges.schedule(5, 10);
456
457        let scheduled: Vec<_> = ranges.iter().collect();
458        assert_eq!(scheduled, vec![(1, 5), (5, 10)]);
459    }
460
461    #[test]
462    fn test_pending_ranges_remove_and_watermark() {
463        let mut ranges = PendingRanges::default();
464
465        // Schedule multiple ranges
466        ranges.schedule(1, 5);
467        ranges.schedule(10, 15);
468        ranges.schedule(20, 25);
469        assert_eq!(ranges.len(), 3);
470        assert_eq!(ranges.get_pruner_hi(), 1);
471
472        // Remove first range - watermark should advance
473        ranges.remove(&1);
474        assert_eq!(ranges.len(), 2);
475        assert_eq!(ranges.get_pruner_hi(), 10); // Next range starts at 10
476
477        // Remove middle range
478        ranges.remove(&10);
479        assert_eq!(ranges.len(), 1);
480        assert_eq!(ranges.get_pruner_hi(), 20);
481
482        // Remove last range - watermark should use last_scheduled_range
483        ranges.remove(&20);
484        assert_eq!(ranges.len(), 0);
485        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
486    }
487
488    #[test]
489    fn test_pending_ranges_remove_and_watermark_out_of_order() {
490        let mut ranges = PendingRanges::default();
491
492        // Schedule multiple ranges
493        ranges.schedule(1, 5);
494        ranges.schedule(10, 15);
495        ranges.schedule(20, 25);
496        assert_eq!(ranges.len(), 3);
497        assert_eq!(ranges.get_pruner_hi(), 1);
498
499        // Remove middle range
500        ranges.remove(&10);
501        assert_eq!(ranges.len(), 2);
502        assert_eq!(ranges.get_pruner_hi(), 1);
503
504        // Remove first range
505        ranges.remove(&1);
506        assert_eq!(ranges.len(), 1);
507        assert_eq!(ranges.get_pruner_hi(), 20);
508
509        // Remove last range - watermark should use last_scheduled_range
510        ranges.remove(&20);
511        assert_eq!(ranges.len(), 0);
512        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
513    }
514
515    #[tokio::test]
516    async fn test_pruner() {
517        let handler = Arc::new(DataPipeline);
518        let pruner_config = PrunerConfig {
519            interval_ms: 10,
520            delay_ms: 2000,
521            retention: 1,
522            max_chunk_size: 100,
523            prune_concurrency: 1,
524        };
525        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
526        let metrics = IndexerMetrics::new(None, &registry);
527
528        // Update data
529        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
530        // Update committer watermark
531        let timestamp = SystemTime::now()
532            .duration_since(UNIX_EPOCH)
533            .unwrap()
534            .as_millis() as u64;
535
536        let watermark = MockWatermark {
537            epoch_hi_inclusive: 0,
538            checkpoint_hi_inclusive: 3,
539            tx_hi: 9,
540            timestamp_ms_hi_inclusive: timestamp,
541            reader_lo: 3,
542            pruner_timestamp: timestamp,
543            pruner_hi: 0,
544        };
545        let store = MockStore::new()
546            .with_watermark(DataPipeline::NAME, watermark)
547            .with_data(DataPipeline::NAME, test_data);
548
549        // Start the pruner
550        let store_clone = store.clone();
551        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
552
553        // Wait a short time within delay_ms
554        tokio::time::sleep(Duration::from_millis(200)).await;
555        {
556            let data = store.data.get(DataPipeline::NAME).unwrap();
557            assert!(
558                data.contains_key(&1),
559                "Checkpoint 1 shouldn't be pruned before delay"
560            );
561            assert!(
562                data.contains_key(&2),
563                "Checkpoint 2 shouldn't be pruned before delay"
564            );
565            assert!(
566                data.contains_key(&3),
567                "Checkpoint 3 shouldn't be pruned before delay"
568            );
569        }
570
571        // Wait for the delay to expire
572        tokio::time::sleep(Duration::from_millis(2000)).await;
573
574        // Now checkpoint 1 should be pruned
575        {
576            let data = store.data.get(DataPipeline::NAME).unwrap();
577            assert!(
578                !data.contains_key(&1),
579                "Checkpoint 1 should be pruned after delay"
580            );
581
582            // Checkpoint 3 should never be pruned (it's the reader_lo)
583            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
584
585            // Check that the pruner_hi was updated past 1
586            let watermark = store.watermark(DataPipeline::NAME).unwrap();
587            assert!(
588                watermark.pruner_hi > 1,
589                "Pruner watermark should be updated"
590            );
591        }
592    }
593
594    #[tokio::test]
595    async fn test_pruner_timestamp_in_the_past() {
596        let handler = Arc::new(DataPipeline);
597        let pruner_config = PrunerConfig {
598            interval_ms: 10,
599            delay_ms: 20_000,
600            retention: 1,
601            max_chunk_size: 100,
602            prune_concurrency: 1,
603        };
604        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
605        let metrics = IndexerMetrics::new(None, &registry);
606
607        // Update data
608        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
609        // Update committer watermark
610        let timestamp = SystemTime::now()
611            .duration_since(UNIX_EPOCH)
612            .unwrap()
613            .as_millis() as u64;
614
615        let watermark = MockWatermark {
616            epoch_hi_inclusive: 0,
617            checkpoint_hi_inclusive: 3,
618            tx_hi: 9,
619            timestamp_ms_hi_inclusive: timestamp,
620            reader_lo: 3,
621            pruner_timestamp: 0,
622            pruner_hi: 0,
623        };
624        let store = MockStore::new()
625            .with_watermark(DataPipeline::NAME, watermark)
626            .with_data(DataPipeline::NAME, test_data);
627
628        // Start the pruner
629        let store_clone = store.clone();
630        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
631
632        // Because the `pruner_timestamp` is in the past, even with the delay_ms it should be pruned
633        // close to immediately. To be safe, sleep for 1000ms before checking, which is well under
634        // the delay_ms of 20_000 ms.
635        tokio::time::sleep(Duration::from_millis(500)).await;
636
637        {
638            let data = store.data.get(DataPipeline::NAME).unwrap();
639            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
640
641            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
642
643            // Checkpoint 3 should never be pruned (it's the reader_lo)
644            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
645
646            // Check that the pruner_hi was updated past 1
647            let watermark = store.watermark(DataPipeline::NAME).unwrap();
648            assert!(
649                watermark.pruner_hi > 1,
650                "Pruner watermark should be updated"
651            );
652        }
653    }
654
655    #[tokio::test]
656    async fn test_pruner_watermark_update_with_retries() {
657        let handler = Arc::new(DataPipeline);
658        let pruner_config = PrunerConfig {
659            interval_ms: 3_000, // Long interval to test retried attempts of failed range.
660            delay_ms: 100,      // Short delay to speed up each interval
661            retention: 1,
662            max_chunk_size: 1, // Process one checkpoint at a time
663            prune_concurrency: 1,
664        };
665        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
666        let metrics = IndexerMetrics::new(None, &registry);
667
668        // Set up test data for checkpoints 1-4
669        let test_data = HashMap::from([
670            (1, vec![1, 2]),
671            (2, vec![3, 4]),
672            (3, vec![5, 6]),
673            (4, vec![7, 8]),
674        ]);
675
676        let timestamp = SystemTime::now()
677            .duration_since(UNIX_EPOCH)
678            .unwrap()
679            .as_millis() as u64;
680
681        let watermark = MockWatermark {
682            epoch_hi_inclusive: 0,
683            checkpoint_hi_inclusive: 4,
684            tx_hi: 8,
685            timestamp_ms_hi_inclusive: timestamp,
686            reader_lo: 4,        // Allow pruning up to checkpoint 4 (exclusive)
687            pruner_timestamp: 0, // Past timestamp so delay doesn't block
688            pruner_hi: 1,
689        };
690
691        // Configure failing behavior: range [1,2) should fail once before succeeding
692        let store = MockStore::new()
693            .with_watermark(DataPipeline::NAME, watermark)
694            .with_data(DataPipeline::NAME, test_data.clone())
695            .with_prune_failures(1, 2, 1);
696
697        // Start the pruner
698        let store_clone = store.clone();
699        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
700
701        // Wait for first pruning cycle - ranges [2,3) and [3,4) should succeed, [1,2) should fail
702        tokio::time::sleep(Duration::from_millis(500)).await;
703        {
704            let data = store.data.get(DataPipeline::NAME).unwrap();
705            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
706
707            // Verify watermark doesn't advance past the failed range [1,2)
708            assert_eq!(
709                watermarks.pruner_hi, 1,
710                "Pruner watermark should remain at 1 because range [1,2) failed"
711            );
712            assert!(data.contains_key(&1), "Checkpoint 1 should be preserved");
713            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
714            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
715            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
716        }
717
718        // Wait for second pruning cycle - range [1,2) should succeed on retry
719        tokio::time::sleep(Duration::from_millis(3000)).await;
720        {
721            let data = store.data.get(DataPipeline::NAME).unwrap();
722            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
723
724            // Verify watermark advances after all ranges complete successfully
725            assert_eq!(
726                watermarks.pruner_hi, 4,
727                "Pruner watermark should advance to 4 after all ranges complete"
728            );
729            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
730            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
731            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
732            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
733        }
734    }
735}