sui_indexer_alt_framework/pipeline/concurrent/
pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use futures::stream::FuturesUnordered;
9use sui_futures::service::Service;
10use tokio::sync::Semaphore;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::concurrent::Handler;
20use crate::pipeline::concurrent::PrunerConfig;
21use crate::pipeline::logging::LoggerWatermark;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::ConcurrentConnection;
24use crate::store::Store;
25
26#[derive(Default)]
27struct PendingRanges {
28    /// Maps from `from` to `to_exclusive` for all the ranges that are ready to be pruned.
29    ranges: BTreeMap<u64, u64>,
30    /// The last range that has been scheduled for pruning.
31    last_scheduled_range: Option<(u64, u64)>,
32}
33
34impl PendingRanges {
35    /// Schedule a new range to be pruned.
36    /// Using the last scheduled range to avoid double pruning of the same range.
37    /// This is important because double pruning will not always work since pruning
38    /// may not be idempotent for some pipelines.
39    /// For instance, if handler holds processed data needed for pruning,
40    /// the pruning step may remove those data once done.
41    fn schedule(&mut self, mut from: u64, to_exclusive: u64) {
42        let last_scheduled_range = self.last_scheduled_range.unwrap_or((0, 0));
43        // If the end of the last scheduled range is greater than the end of the new range,
44        // it means the entire new range was already scheduled before.
45        if to_exclusive <= last_scheduled_range.1 {
46            return;
47        }
48        // Otherwise, we make sure the new range starts after the last scheduled range.
49        from = from.max(last_scheduled_range.1);
50        self.ranges.insert(from, to_exclusive);
51        self.last_scheduled_range = Some((from, to_exclusive));
52    }
53
54    fn len(&self) -> usize {
55        self.ranges.len()
56    }
57
58    fn iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
59        self.ranges
60            .iter()
61            .map(|(from, to_exclusive)| (*from, *to_exclusive))
62    }
63
64    /// Remove the range from the pending_prune_ranges.
65    fn remove(&mut self, from: &u64) {
66        self.ranges.remove(from).unwrap();
67    }
68
69    /// Returns the current pruner_hi watermark, i.e. the first checkpoint that has not yet been pruned.
70    /// This will be the first key in the pending_prune_ranges map.
71    /// If the map is empty, then it is the last checkpoint that has been scheduled for pruning.
72    fn get_pruner_hi(&self) -> u64 {
73        self.ranges.keys().next().cloned().unwrap_or(
74            self.last_scheduled_range
75                .map(|(_, t)| t)
76                // get_pruner_hi will generally not be called until we have scheduled something.
77                // But return 0 just in case we called it earlier.
78                .unwrap_or_default(),
79        )
80    }
81}
82
83/// The pruner task is responsible for deleting old data from the database. It will periodically
84/// check the `watermarks` table to see if there is any data that should be pruned between the
85/// `pruner_hi` (inclusive), and `reader_lo` (exclusive) checkpoints. This task will also provide a
86/// mapping of the pruned checkpoints to their corresponding epoch and tx, which the handler can
87/// then use to delete the corresponding data from the database.
88///
89/// To ensure that the pruner does not interfere with reads that are still in flight, it respects
90/// the watermark's `pruner_timestamp`, which records the time that `reader_lo` was last updated.
91/// The task will not prune data until at least `config.delay()` has passed since `pruner_timestamp`
92/// to give in-flight reads time to land.
93///
94/// The task regularly traces its progress, outputting at a higher log level every
95/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
96///
97/// If the `config` is `None`, the task will shutdown immediately.
98pub(super) fn pruner<H: Handler>(
99    handler: Arc<H>,
100    config: Option<PrunerConfig>,
101    store: H::Store,
102    metrics: Arc<IndexerMetrics>,
103) -> Service {
104    Service::new().spawn_aborting(async move {
105        let Some(config) = config else {
106            info!(pipeline = H::NAME, "Skipping pruner task");
107            return Ok(());
108        };
109
110        info!(
111            pipeline = H::NAME,
112            "Starting pruner with config: {:?}", config
113        );
114
115        // The pruner can pause for a while, waiting for the delay imposed by the
116        // `pruner_timestamp` to expire. In that case, the period between ticks should not be
117        // compressed to make up for missed ticks.
118        let mut poll = interval(config.interval());
119        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
120
121        // The pruner task will periodically output a log message at a higher log level to
122        // demonstrate that it is making progress.
123        let mut logger = WatermarkLogger::new("pruner");
124
125        // Maintains the list of chunks that are ready to be pruned but not yet pruned.
126        // This map can contain ranges that were attempted to be pruned in previous iterations,
127        // but failed due to errors.
128        let mut pending_prune_ranges = PendingRanges::default();
129
130        loop {
131            poll.tick().await;
132
133            // (1) Get the latest pruning bounds from the database.
134            let mut watermark = {
135                let guard = metrics
136                    .watermark_pruner_read_latency
137                    .with_label_values(&[H::NAME])
138                    .start_timer();
139
140                let Ok(mut conn) = store.connect().await else {
141                    warn!(
142                        pipeline = H::NAME,
143                        "Pruner failed to connect, while fetching watermark"
144                    );
145                    continue;
146                };
147
148                match conn.pruner_watermark(H::NAME, config.delay()).await {
149                    Ok(Some(current)) => {
150                        guard.stop_and_record();
151                        current
152                    }
153
154                    Ok(None) => {
155                        guard.stop_and_record();
156                        info!(pipeline = H::NAME, "No watermark for pipeline, skipping");
157                        continue;
158                    }
159
160                    Err(e) => {
161                        guard.stop_and_record();
162                        warn!(pipeline = H::NAME, "Failed to get watermark: {e}");
163                        continue;
164                    }
165                }
166            };
167
168            // (2) Wait until this information can be acted upon.
169            if let Some(wait_for) = watermark.wait_for() {
170                debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune");
171                tokio::time::sleep(wait_for).await;
172            }
173
174            // Tracks the current highest `pruner_hi` not yet written to db. This is updated as
175            // chunks complete.
176            let mut highest_pruned = watermark.pruner_hi;
177            // Tracks the `pruner_hi` that has been written to the db.
178            let mut highest_watermarked = watermark.pruner_hi;
179
180            // (3) Collect all the new chunks that are ready to be pruned.
181            // This will also advance the watermark.
182            while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
183                pending_prune_ranges.schedule(from, to_exclusive);
184            }
185
186            debug!(
187                pipeline = H::NAME,
188                "Number of chunks to prune: {}",
189                pending_prune_ranges.len()
190            );
191
192            // (3) Prune chunk by chunk to avoid the task waiting on a long-running database
193            // transaction, between tests for cancellation.
194            // Spawn all tasks in parallel, but limit the number of concurrent tasks.
195            let semaphore = Arc::new(Semaphore::new(config.prune_concurrency as usize));
196            let mut tasks = FuturesUnordered::new();
197            for (from, to_exclusive) in pending_prune_ranges.iter() {
198                let semaphore = semaphore.clone();
199                let metrics = metrics.clone();
200                let handler = handler.clone();
201
202                let db = store.clone();
203
204                tasks.push(tokio::spawn(async move {
205                    let _permit = semaphore.acquire().await.unwrap();
206                    let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
207                    ((from, to_exclusive), result)
208                }));
209            }
210
211            // (4) Wait for all tasks to finish. For each task, if it succeeds, remove the range
212            // from the pending_prune_ranges. Otherwise the range will remain in the map and will be
213            // retried in the next iteration. Update the highest_pruned watermark if the task
214            // succeeds in metrics and in db, to minimize redundant pruner work if the pipeline is
215            // restarted.
216            while let Some(r) = tasks.next().await {
217                let ((from, to_exclusive), result) = r.unwrap();
218                match result {
219                    Ok(()) => {
220                        pending_prune_ranges.remove(&from);
221                        let pruner_hi = pending_prune_ranges.get_pruner_hi();
222                        highest_pruned = highest_pruned.max(pruner_hi);
223                    }
224                    Err(e) => {
225                        error!(
226                            pipeline = H::NAME,
227                            "Failed to prune data for range: {from} to {to_exclusive}: {e}"
228                        );
229                    }
230                }
231
232                if highest_pruned > highest_watermarked {
233                    metrics
234                        .watermark_pruner_hi
235                        .with_label_values(&[H::NAME])
236                        .set(highest_pruned as i64);
237
238                    let guard = metrics
239                        .watermark_pruner_write_latency
240                        .with_label_values(&[H::NAME])
241                        .start_timer();
242
243                    let Ok(mut conn) = store.connect().await else {
244                        warn!(
245                            pipeline = H::NAME,
246                            "Pruner failed to connect while updating watermark"
247                        );
248                        continue;
249                    };
250
251                    match conn.set_pruner_watermark(H::NAME, highest_pruned).await {
252                        Err(e) => {
253                            let elapsed = guard.stop_and_record();
254                            error!(
255                                pipeline = H::NAME,
256                                elapsed_ms = elapsed * 1000.0,
257                                "Failed to update pruner watermark: {e}"
258                            )
259                        }
260                        Ok(true) => {
261                            highest_watermarked = highest_pruned;
262                            let elapsed = guard.stop_and_record();
263                            logger.log::<H>(
264                                LoggerWatermark::checkpoint(highest_watermarked),
265                                elapsed,
266                            );
267
268                            metrics
269                                .watermark_pruner_hi_in_db
270                                .with_label_values(&[H::NAME])
271                                .set(highest_watermarked as i64);
272                        }
273                        Ok(false) => {}
274                    }
275                }
276            }
277        }
278    })
279}
280
281async fn prune_task_impl<H: Handler>(
282    metrics: Arc<IndexerMetrics>,
283    db: H::Store,
284    handler: Arc<H>,
285    from: u64,
286    to_exclusive: u64,
287) -> Result<(), anyhow::Error> {
288    metrics
289        .total_pruner_chunks_attempted
290        .with_label_values(&[H::NAME])
291        .inc();
292
293    let guard = metrics
294        .pruner_delete_latency
295        .with_label_values(&[H::NAME])
296        .start_timer();
297
298    let mut conn = db.connect().await?;
299
300    debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");
301
302    let affected = match handler.prune(from, to_exclusive, &mut conn).await {
303        Ok(affected) => {
304            guard.stop_and_record();
305            affected
306        }
307
308        Err(e) => {
309            guard.stop_and_record();
310            return Err(e);
311        }
312    };
313
314    metrics
315        .total_pruner_chunks_deleted
316        .with_label_values(&[H::NAME])
317        .inc();
318
319    metrics
320        .total_pruner_rows_deleted
321        .with_label_values(&[H::NAME])
322        .inc_by(affected as u64);
323
324    Ok(())
325}
326
327#[cfg(test)]
328mod tests {
329    use std::collections::HashMap;
330    use std::sync::Arc;
331    use std::time::SystemTime;
332    use std::time::UNIX_EPOCH;
333
334    use async_trait::async_trait;
335    use prometheus::Registry;
336    use sui_types::full_checkpoint_content::Checkpoint;
337    use tokio::time::Duration;
338
339    use crate::FieldCount;
340    use crate::metrics::IndexerMetrics;
341    use crate::mocks::store::*;
342    use crate::pipeline::Processor;
343    use crate::pipeline::concurrent::BatchStatus;
344
345    use super::*;
346
347    #[derive(Clone, FieldCount)]
348    pub struct StoredData;
349
350    pub struct DataPipeline;
351
352    #[async_trait]
353    impl Processor for DataPipeline {
354        const NAME: &'static str = "data";
355
356        type Value = StoredData;
357
358        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
359            Ok(vec![])
360        }
361    }
362
363    #[async_trait]
364    impl Handler for DataPipeline {
365        type Store = MockStore;
366        type Batch = Vec<Self::Value>;
367
368        fn batch(
369            &self,
370            batch: &mut Self::Batch,
371            values: &mut std::vec::IntoIter<Self::Value>,
372        ) -> BatchStatus {
373            batch.extend(values);
374            BatchStatus::Pending
375        }
376
377        async fn commit<'a>(
378            &self,
379            batch: &Self::Batch,
380            _conn: &mut MockConnection<'a>,
381        ) -> anyhow::Result<usize> {
382            Ok(batch.len())
383        }
384
385        async fn prune<'a>(
386            &self,
387            from: u64,
388            to_exclusive: u64,
389            conn: &mut MockConnection<'a>,
390        ) -> anyhow::Result<usize> {
391            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
392        }
393    }
394
395    #[test]
396    fn test_pending_ranges_basic_scheduling() {
397        let mut ranges = PendingRanges::default();
398
399        // Schedule initial range
400        ranges.schedule(1, 5);
401
402        // Schedule non-overlapping range
403        ranges.schedule(10, 15);
404
405        // Verify ranges are stored correctly
406        let scheduled: Vec<_> = ranges.iter().collect();
407        assert_eq!(scheduled, vec![(1, 5), (10, 15)]);
408    }
409
410    #[test]
411    fn test_pending_ranges_double_pruning_prevention() {
412        let mut ranges = PendingRanges::default();
413
414        // Schedule initial range
415        ranges.schedule(1, 5);
416
417        // Try to schedule overlapping range.
418        ranges.schedule(3, 7);
419
420        let scheduled: Vec<_> = ranges.iter().collect();
421        assert_eq!(scheduled, vec![(1, 5), (5, 7)]);
422
423        // Try to schedule range that's entirely covered by previous range
424        ranges.schedule(2, 4); // Entirely within (1,5), should be ignored
425        assert_eq!(ranges.len(), 2); // No change
426
427        let scheduled: Vec<_> = ranges.iter().collect();
428        assert_eq!(scheduled, vec![(1, 5), (5, 7)]); // No change
429    }
430
431    #[test]
432    fn test_pending_ranges_exact_duplicate() {
433        let mut ranges = PendingRanges::default();
434
435        // Schedule initial range
436        ranges.schedule(1, 5);
437        assert_eq!(ranges.len(), 1);
438
439        // Schedule exact same range.
440        ranges.schedule(1, 5);
441        assert_eq!(ranges.len(), 1); // No change
442
443        let scheduled: Vec<_> = ranges.iter().collect();
444        assert_eq!(scheduled, vec![(1, 5)]);
445    }
446
447    #[test]
448    fn test_pending_ranges_adjacent_ranges() {
449        let mut ranges = PendingRanges::default();
450
451        // Schedule initial range
452        ranges.schedule(1, 5);
453
454        // Schedule adjacent range
455        ranges.schedule(5, 10);
456
457        let scheduled: Vec<_> = ranges.iter().collect();
458        assert_eq!(scheduled, vec![(1, 5), (5, 10)]);
459    }
460
461    #[test]
462    fn test_pending_ranges_remove_and_watermark() {
463        let mut ranges = PendingRanges::default();
464
465        // Schedule multiple ranges
466        ranges.schedule(1, 5);
467        ranges.schedule(10, 15);
468        ranges.schedule(20, 25);
469        assert_eq!(ranges.len(), 3);
470        assert_eq!(ranges.get_pruner_hi(), 1);
471
472        // Remove first range - watermark should advance
473        ranges.remove(&1);
474        assert_eq!(ranges.len(), 2);
475        assert_eq!(ranges.get_pruner_hi(), 10); // Next range starts at 10
476
477        // Remove middle range
478        ranges.remove(&10);
479        assert_eq!(ranges.len(), 1);
480        assert_eq!(ranges.get_pruner_hi(), 20);
481
482        // Remove last range - watermark should use last_scheduled_range
483        ranges.remove(&20);
484        assert_eq!(ranges.len(), 0);
485        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
486    }
487
488    #[test]
489    fn test_pending_ranges_remove_and_watermark_out_of_order() {
490        let mut ranges = PendingRanges::default();
491
492        // Schedule multiple ranges
493        ranges.schedule(1, 5);
494        ranges.schedule(10, 15);
495        ranges.schedule(20, 25);
496        assert_eq!(ranges.len(), 3);
497        assert_eq!(ranges.get_pruner_hi(), 1);
498
499        // Remove middle range
500        ranges.remove(&10);
501        assert_eq!(ranges.len(), 2);
502        assert_eq!(ranges.get_pruner_hi(), 1);
503
504        // Remove first range
505        ranges.remove(&1);
506        assert_eq!(ranges.len(), 1);
507        assert_eq!(ranges.get_pruner_hi(), 20);
508
509        // Remove last range - watermark should use last_scheduled_range
510        ranges.remove(&20);
511        assert_eq!(ranges.len(), 0);
512        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
513    }
514
515    #[tokio::test]
516    async fn test_pruner() {
517        let handler = Arc::new(DataPipeline);
518        let pruner_config = PrunerConfig {
519            interval_ms: 10,
520            delay_ms: 2000,
521            retention: 1,
522            max_chunk_size: 100,
523            prune_concurrency: 1,
524        };
525        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
526        let metrics = IndexerMetrics::new(None, &registry);
527
528        // Update data
529        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
530        // Update committer watermark
531        let timestamp = SystemTime::now()
532            .duration_since(UNIX_EPOCH)
533            .unwrap()
534            .as_millis() as u64;
535
536        let watermark = MockWatermark {
537            epoch_hi_inclusive: 0,
538            checkpoint_hi_inclusive: Some(3),
539            tx_hi: 9,
540            timestamp_ms_hi_inclusive: timestamp,
541            reader_lo: 3,
542            pruner_timestamp: timestamp,
543            pruner_hi: 0,
544            chain_id: None,
545        };
546        let store = MockStore::new()
547            .with_watermark(DataPipeline::NAME, watermark)
548            .with_data(DataPipeline::NAME, test_data);
549
550        // Start the pruner
551        let store_clone = store.clone();
552        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
553
554        // Wait a short time within delay_ms
555        tokio::time::sleep(Duration::from_millis(200)).await;
556        {
557            let data = store.data.get(DataPipeline::NAME).unwrap();
558            assert!(
559                data.contains_key(&1),
560                "Checkpoint 1 shouldn't be pruned before delay"
561            );
562            assert!(
563                data.contains_key(&2),
564                "Checkpoint 2 shouldn't be pruned before delay"
565            );
566            assert!(
567                data.contains_key(&3),
568                "Checkpoint 3 shouldn't be pruned before delay"
569            );
570        }
571
572        // Wait for the delay to expire
573        tokio::time::sleep(Duration::from_millis(2000)).await;
574
575        // Now checkpoint 1 should be pruned
576        {
577            let data = store.data.get(DataPipeline::NAME).unwrap();
578            assert!(
579                !data.contains_key(&1),
580                "Checkpoint 1 should be pruned after delay"
581            );
582
583            // Checkpoint 3 should never be pruned (it's the reader_lo)
584            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
585
586            // Check that the pruner_hi was updated past 1
587            let watermark = store.watermark(DataPipeline::NAME).unwrap();
588            assert!(
589                watermark.pruner_hi > 1,
590                "Pruner watermark should be updated"
591            );
592        }
593    }
594
595    #[tokio::test]
596    async fn test_pruner_timestamp_in_the_past() {
597        let handler = Arc::new(DataPipeline);
598        let pruner_config = PrunerConfig {
599            interval_ms: 10,
600            delay_ms: 20_000,
601            retention: 1,
602            max_chunk_size: 100,
603            prune_concurrency: 1,
604        };
605        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
606        let metrics = IndexerMetrics::new(None, &registry);
607
608        // Update data
609        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
610        // Update committer watermark
611        let timestamp = SystemTime::now()
612            .duration_since(UNIX_EPOCH)
613            .unwrap()
614            .as_millis() as u64;
615
616        let watermark = MockWatermark {
617            epoch_hi_inclusive: 0,
618            checkpoint_hi_inclusive: Some(3),
619            tx_hi: 9,
620            timestamp_ms_hi_inclusive: timestamp,
621            reader_lo: 3,
622            pruner_timestamp: 0,
623            pruner_hi: 0,
624            chain_id: None,
625        };
626        let store = MockStore::new()
627            .with_watermark(DataPipeline::NAME, watermark)
628            .with_data(DataPipeline::NAME, test_data);
629
630        // Start the pruner
631        let store_clone = store.clone();
632        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
633
634        // Because the `pruner_timestamp` is in the past, even with the delay_ms it should be pruned
635        // close to immediately. To be safe, sleep for 1000ms before checking, which is well under
636        // the delay_ms of 20_000 ms.
637        tokio::time::sleep(Duration::from_millis(500)).await;
638
639        {
640            let data = store.data.get(DataPipeline::NAME).unwrap();
641            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
642
643            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
644
645            // Checkpoint 3 should never be pruned (it's the reader_lo)
646            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
647
648            // Check that the pruner_hi was updated past 1
649            let watermark = store.watermark(DataPipeline::NAME).unwrap();
650            assert!(
651                watermark.pruner_hi > 1,
652                "Pruner watermark should be updated"
653            );
654        }
655    }
656
657    #[tokio::test]
658    async fn test_pruner_watermark_update_with_retries() {
659        let handler = Arc::new(DataPipeline);
660        let pruner_config = PrunerConfig {
661            interval_ms: 3_000, // Long interval to test retried attempts of failed range.
662            delay_ms: 100,      // Short delay to speed up each interval
663            retention: 1,
664            max_chunk_size: 1, // Process one checkpoint at a time
665            prune_concurrency: 1,
666        };
667        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
668        let metrics = IndexerMetrics::new(None, &registry);
669
670        // Set up test data for checkpoints 1-4
671        let test_data = HashMap::from([
672            (1, vec![1, 2]),
673            (2, vec![3, 4]),
674            (3, vec![5, 6]),
675            (4, vec![7, 8]),
676        ]);
677
678        let timestamp = SystemTime::now()
679            .duration_since(UNIX_EPOCH)
680            .unwrap()
681            .as_millis() as u64;
682
683        let watermark = MockWatermark {
684            epoch_hi_inclusive: 0,
685            checkpoint_hi_inclusive: Some(4),
686            tx_hi: 8,
687            timestamp_ms_hi_inclusive: timestamp,
688            reader_lo: 4,        // Allow pruning up to checkpoint 4 (exclusive)
689            pruner_timestamp: 0, // Past timestamp so delay doesn't block
690            pruner_hi: 1,
691            chain_id: None,
692        };
693
694        // Configure failing behavior: range [1,2) should fail once before succeeding
695        let store = MockStore::new()
696            .with_watermark(DataPipeline::NAME, watermark)
697            .with_data(DataPipeline::NAME, test_data.clone())
698            .with_prune_failures(1, 2, 1);
699
700        // Start the pruner
701        let store_clone = store.clone();
702        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
703
704        // Wait for first pruning cycle - ranges [2,3) and [3,4) should succeed, [1,2) should fail
705        tokio::time::sleep(Duration::from_millis(500)).await;
706        {
707            let data = store.data.get(DataPipeline::NAME).unwrap();
708            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
709
710            // Verify watermark doesn't advance past the failed range [1,2)
711            assert_eq!(
712                watermarks.pruner_hi, 1,
713                "Pruner watermark should remain at 1 because range [1,2) failed"
714            );
715            assert!(data.contains_key(&1), "Checkpoint 1 should be preserved");
716            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
717            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
718            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
719        }
720
721        // Wait for second pruning cycle - range [1,2) should succeed on retry
722        tokio::time::sleep(Duration::from_millis(3000)).await;
723        {
724            let data = store.data.get(DataPipeline::NAME).unwrap();
725            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
726
727            // Verify watermark advances after all ranges complete successfully
728            assert_eq!(
729                watermarks.pruner_hi, 4,
730                "Pruner watermark should advance to 4 after all ranges complete"
731            );
732            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
733            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
734            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
735            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
736        }
737    }
738}