sui_indexer_alt_framework/pipeline/concurrent/
pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, sync::Arc};
5
6use futures::StreamExt;
7use futures::stream::FuturesUnordered;
8use sui_futures::service::Service;
9use tokio::{
10    sync::Semaphore,
11    time::{MissedTickBehavior, interval},
12};
13use tracing::{debug, error, info, warn};
14
15use crate::{
16    metrics::IndexerMetrics,
17    pipeline::logging::{LoggerWatermark, WatermarkLogger},
18    store::{Connection, Store},
19};
20
21use super::{Handler, PrunerConfig};
22
23#[derive(Default)]
24struct PendingRanges {
25    /// Maps from `from` to `to_exclusive` for all the ranges that are ready to be pruned.
26    ranges: BTreeMap<u64, u64>,
27    /// The last range that has been scheduled for pruning.
28    last_scheduled_range: Option<(u64, u64)>,
29}
30
31impl PendingRanges {
32    /// Schedule a new range to be pruned.
33    /// Using the last scheduled range to avoid double pruning of the same range.
34    /// This is important because double pruning will not always work since pruning
35    /// may not be idempotent for some pipelines.
36    /// For instance, if handler holds processed data needed for pruning,
37    /// the pruning step may remove those data once done.
38    fn schedule(&mut self, mut from: u64, to_exclusive: u64) {
39        let last_scheduled_range = self.last_scheduled_range.unwrap_or((0, 0));
40        // If the end of the last scheduled range is greater than the end of the new range,
41        // it means the entire new range was already scheduled before.
42        if to_exclusive <= last_scheduled_range.1 {
43            return;
44        }
45        // Otherwise, we make sure the new range starts after the last scheduled range.
46        from = from.max(last_scheduled_range.1);
47        self.ranges.insert(from, to_exclusive);
48        self.last_scheduled_range = Some((from, to_exclusive));
49    }
50
51    fn len(&self) -> usize {
52        self.ranges.len()
53    }
54
55    fn iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
56        self.ranges
57            .iter()
58            .map(|(from, to_exclusive)| (*from, *to_exclusive))
59    }
60
61    /// Remove the range from the pending_prune_ranges.
62    fn remove(&mut self, from: &u64) {
63        self.ranges.remove(from).unwrap();
64    }
65
66    /// Returns the current pruner_hi watermark, i.e. the first checkpoint that has not yet been pruned.
67    /// This will be the first key in the pending_prune_ranges map.
68    /// If the map is empty, then it is the last checkpoint that has been scheduled for pruning.
69    fn get_pruner_hi(&self) -> u64 {
70        self.ranges.keys().next().cloned().unwrap_or(
71            self.last_scheduled_range
72                .map(|(_, t)| t)
73                // get_pruner_hi will generally not be called until we have scheduled something.
74                // But return 0 just in case we called it earlier.
75                .unwrap_or_default(),
76        )
77    }
78}
79
80/// The pruner task is responsible for deleting old data from the database. It will periodically
81/// check the `watermarks` table to see if there is any data that should be pruned between the
82/// `pruner_hi` (inclusive), and `reader_lo` (exclusive) checkpoints. This task will also provide a
83/// mapping of the pruned checkpoints to their corresponding epoch and tx, which the handler can
84/// then use to delete the corresponding data from the database.
85///
86/// To ensure that the pruner does not interfere with reads that are still in flight, it respects
87/// the watermark's `pruner_timestamp`, which records the time that `reader_lo` was last updated.
88/// The task will not prune data until at least `config.delay()` has passed since `pruner_timestamp`
89/// to give in-flight reads time to land.
90///
91/// The task regularly traces its progress, outputting at a higher log level every
92/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
93///
94/// If the `config` is `None`, the task will shutdown immediately.
95pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
96    handler: Arc<H>,
97    config: Option<PrunerConfig>,
98    store: H::Store,
99    metrics: Arc<IndexerMetrics>,
100) -> Service {
101    Service::new().spawn_aborting(async move {
102        let Some(config) = config else {
103            info!(pipeline = H::NAME, "Skipping pruner task");
104            return Ok(());
105        };
106
107        info!(
108            pipeline = H::NAME,
109            "Starting pruner with config: {:?}", config
110        );
111
112        // The pruner can pause for a while, waiting for the delay imposed by the
113        // `pruner_timestamp` to expire. In that case, the period between ticks should not be
114        // compressed to make up for missed ticks.
115        let mut poll = interval(config.interval());
116        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
117
118        // The pruner task will periodically output a log message at a higher log level to
119        // demonstrate that it is making progress.
120        let mut logger = WatermarkLogger::new("pruner");
121
122        // Maintains the list of chunks that are ready to be pruned but not yet pruned.
123        // This map can contain ranges that were attempted to be pruned in previous iterations,
124        // but failed due to errors.
125        let mut pending_prune_ranges = PendingRanges::default();
126
127        loop {
128            poll.tick().await;
129
130            // (1) Get the latest pruning bounds from the database.
131            let mut watermark = {
132                let guard = metrics
133                    .watermark_pruner_read_latency
134                    .with_label_values(&[H::NAME])
135                    .start_timer();
136
137                let Ok(mut conn) = store.connect().await else {
138                    warn!(
139                        pipeline = H::NAME,
140                        "Pruner failed to connect, while fetching watermark"
141                    );
142                    continue;
143                };
144
145                match conn.pruner_watermark(H::NAME, config.delay()).await {
146                    Ok(Some(current)) => {
147                        guard.stop_and_record();
148                        current
149                    }
150
151                    Ok(None) => {
152                        guard.stop_and_record();
153                        warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
154                        continue;
155                    }
156
157                    Err(e) => {
158                        guard.stop_and_record();
159                        warn!(pipeline = H::NAME, "Failed to get watermark: {e}");
160                        continue;
161                    }
162                }
163            };
164
165            // (2) Wait until this information can be acted upon.
166            if let Some(wait_for) = watermark.wait_for() {
167                debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune");
168                tokio::time::sleep(wait_for).await;
169            }
170
171            // Tracks the current highest `pruner_hi` not yet written to db. This is updated as
172            // chunks complete.
173            let mut highest_pruned = watermark.pruner_hi;
174            // Tracks the `pruner_hi` that has been written to the db.
175            let mut highest_watermarked = watermark.pruner_hi;
176
177            // (3) Collect all the new chunks that are ready to be pruned.
178            // This will also advance the watermark.
179            while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
180                pending_prune_ranges.schedule(from, to_exclusive);
181            }
182
183            debug!(
184                pipeline = H::NAME,
185                "Number of chunks to prune: {}",
186                pending_prune_ranges.len()
187            );
188
189            // (3) Prune chunk by chunk to avoid the task waiting on a long-running database
190            // transaction, between tests for cancellation.
191            // Spawn all tasks in parallel, but limit the number of concurrent tasks.
192            let semaphore = Arc::new(Semaphore::new(config.prune_concurrency as usize));
193            let mut tasks = FuturesUnordered::new();
194            for (from, to_exclusive) in pending_prune_ranges.iter() {
195                let semaphore = semaphore.clone();
196                let metrics = metrics.clone();
197                let handler = handler.clone();
198
199                let db = store.clone();
200
201                tasks.push(tokio::spawn(async move {
202                    let _permit = semaphore.acquire().await.unwrap();
203                    let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
204                    ((from, to_exclusive), result)
205                }));
206            }
207
208            // (4) Wait for all tasks to finish. For each task, if it succeeds, remove the range
209            // from the pending_prune_ranges. Otherwise the range will remain in the map and will be
210            // retried in the next iteration. Update the highest_pruned watermark if the task
211            // succeeds in metrics and in db, to minimize redundant pruner work if the pipeline is
212            // restarted.
213            while let Some(r) = tasks.next().await {
214                let ((from, to_exclusive), result) = r.unwrap();
215                match result {
216                    Ok(()) => {
217                        pending_prune_ranges.remove(&from);
218                        let pruner_hi = pending_prune_ranges.get_pruner_hi();
219                        highest_pruned = highest_pruned.max(pruner_hi);
220                    }
221                    Err(e) => {
222                        error!(
223                            pipeline = H::NAME,
224                            "Failed to prune data for range: {from} to {to_exclusive}: {e}"
225                        );
226                    }
227                }
228
229                if highest_pruned > highest_watermarked {
230                    metrics
231                        .watermark_pruner_hi
232                        .with_label_values(&[H::NAME])
233                        .set(highest_pruned as i64);
234
235                    let guard = metrics
236                        .watermark_pruner_write_latency
237                        .with_label_values(&[H::NAME])
238                        .start_timer();
239
240                    let Ok(mut conn) = store.connect().await else {
241                        warn!(
242                            pipeline = H::NAME,
243                            "Pruner failed to connect while updating watermark"
244                        );
245                        continue;
246                    };
247
248                    match conn.set_pruner_watermark(H::NAME, highest_pruned).await {
249                        Err(e) => {
250                            let elapsed = guard.stop_and_record();
251                            error!(
252                                pipeline = H::NAME,
253                                elapsed_ms = elapsed * 1000.0,
254                                "Failed to update pruner watermark: {e}"
255                            )
256                        }
257                        Ok(true) => {
258                            highest_watermarked = highest_pruned;
259                            let elapsed = guard.stop_and_record();
260                            logger.log::<H>(
261                                LoggerWatermark::checkpoint(highest_watermarked),
262                                elapsed,
263                            );
264
265                            metrics
266                                .watermark_pruner_hi_in_db
267                                .with_label_values(&[H::NAME])
268                                .set(highest_watermarked as i64);
269                        }
270                        Ok(false) => {}
271                    }
272                }
273            }
274        }
275    })
276}
277
278async fn prune_task_impl<H: Handler + Send + Sync + 'static>(
279    metrics: Arc<IndexerMetrics>,
280    db: H::Store,
281    handler: Arc<H>,
282    from: u64,
283    to_exclusive: u64,
284) -> Result<(), anyhow::Error> {
285    metrics
286        .total_pruner_chunks_attempted
287        .with_label_values(&[H::NAME])
288        .inc();
289
290    let guard = metrics
291        .pruner_delete_latency
292        .with_label_values(&[H::NAME])
293        .start_timer();
294
295    let mut conn = db.connect().await?;
296
297    debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");
298
299    let affected = match handler.prune(from, to_exclusive, &mut conn).await {
300        Ok(affected) => {
301            guard.stop_and_record();
302            affected
303        }
304
305        Err(e) => {
306            guard.stop_and_record();
307            return Err(e);
308        }
309    };
310
311    metrics
312        .total_pruner_chunks_deleted
313        .with_label_values(&[H::NAME])
314        .inc();
315
316    metrics
317        .total_pruner_rows_deleted
318        .with_label_values(&[H::NAME])
319        .inc_by(affected as u64);
320
321    Ok(())
322}
323
324#[cfg(test)]
325mod tests {
326    use std::sync::Arc;
327    use std::{
328        collections::HashMap,
329        time::{SystemTime, UNIX_EPOCH},
330    };
331
332    use async_trait::async_trait;
333    use prometheus::Registry;
334    use sui_types::full_checkpoint_content::Checkpoint;
335    use tokio::time::Duration;
336
337    use crate::{
338        FieldCount,
339        metrics::IndexerMetrics,
340        mocks::store::*,
341        pipeline::{Processor, concurrent::BatchStatus},
342    };
343
344    use super::*;
345
346    #[derive(Clone, FieldCount)]
347    pub struct StoredData;
348
349    pub struct DataPipeline;
350
351    #[async_trait]
352    impl Processor for DataPipeline {
353        const NAME: &'static str = "data";
354
355        type Value = StoredData;
356
357        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
358            Ok(vec![])
359        }
360    }
361
362    #[async_trait]
363    impl Handler for DataPipeline {
364        type Store = MockStore;
365        type Batch = Vec<Self::Value>;
366
367        fn batch(
368            &self,
369            batch: &mut Self::Batch,
370            values: &mut std::vec::IntoIter<Self::Value>,
371        ) -> BatchStatus {
372            batch.extend(values);
373            BatchStatus::Pending
374        }
375
376        async fn commit<'a>(
377            &self,
378            batch: &Self::Batch,
379            _conn: &mut MockConnection<'a>,
380        ) -> anyhow::Result<usize> {
381            Ok(batch.len())
382        }
383
384        async fn prune<'a>(
385            &self,
386            from: u64,
387            to_exclusive: u64,
388            conn: &mut MockConnection<'a>,
389        ) -> anyhow::Result<usize> {
390            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
391        }
392    }
393
394    #[test]
395    fn test_pending_ranges_basic_scheduling() {
396        let mut ranges = PendingRanges::default();
397
398        // Schedule initial range
399        ranges.schedule(1, 5);
400
401        // Schedule non-overlapping range
402        ranges.schedule(10, 15);
403
404        // Verify ranges are stored correctly
405        let scheduled: Vec<_> = ranges.iter().collect();
406        assert_eq!(scheduled, vec![(1, 5), (10, 15)]);
407    }
408
409    #[test]
410    fn test_pending_ranges_double_pruning_prevention() {
411        let mut ranges = PendingRanges::default();
412
413        // Schedule initial range
414        ranges.schedule(1, 5);
415
416        // Try to schedule overlapping range.
417        ranges.schedule(3, 7);
418
419        let scheduled: Vec<_> = ranges.iter().collect();
420        assert_eq!(scheduled, vec![(1, 5), (5, 7)]);
421
422        // Try to schedule range that's entirely covered by previous range
423        ranges.schedule(2, 4); // Entirely within (1,5), should be ignored
424        assert_eq!(ranges.len(), 2); // No change
425
426        let scheduled: Vec<_> = ranges.iter().collect();
427        assert_eq!(scheduled, vec![(1, 5), (5, 7)]); // No change
428    }
429
430    #[test]
431    fn test_pending_ranges_exact_duplicate() {
432        let mut ranges = PendingRanges::default();
433
434        // Schedule initial range
435        ranges.schedule(1, 5);
436        assert_eq!(ranges.len(), 1);
437
438        // Schedule exact same range.
439        ranges.schedule(1, 5);
440        assert_eq!(ranges.len(), 1); // No change
441
442        let scheduled: Vec<_> = ranges.iter().collect();
443        assert_eq!(scheduled, vec![(1, 5)]);
444    }
445
446    #[test]
447    fn test_pending_ranges_adjacent_ranges() {
448        let mut ranges = PendingRanges::default();
449
450        // Schedule initial range
451        ranges.schedule(1, 5);
452
453        // Schedule adjacent range
454        ranges.schedule(5, 10);
455
456        let scheduled: Vec<_> = ranges.iter().collect();
457        assert_eq!(scheduled, vec![(1, 5), (5, 10)]);
458    }
459
460    #[test]
461    fn test_pending_ranges_remove_and_watermark() {
462        let mut ranges = PendingRanges::default();
463
464        // Schedule multiple ranges
465        ranges.schedule(1, 5);
466        ranges.schedule(10, 15);
467        ranges.schedule(20, 25);
468        assert_eq!(ranges.len(), 3);
469        assert_eq!(ranges.get_pruner_hi(), 1);
470
471        // Remove first range - watermark should advance
472        ranges.remove(&1);
473        assert_eq!(ranges.len(), 2);
474        assert_eq!(ranges.get_pruner_hi(), 10); // Next range starts at 10
475
476        // Remove middle range
477        ranges.remove(&10);
478        assert_eq!(ranges.len(), 1);
479        assert_eq!(ranges.get_pruner_hi(), 20);
480
481        // Remove last range - watermark should use last_scheduled_range
482        ranges.remove(&20);
483        assert_eq!(ranges.len(), 0);
484        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
485    }
486
487    #[test]
488    fn test_pending_ranges_remove_and_watermark_out_of_order() {
489        let mut ranges = PendingRanges::default();
490
491        // Schedule multiple ranges
492        ranges.schedule(1, 5);
493        ranges.schedule(10, 15);
494        ranges.schedule(20, 25);
495        assert_eq!(ranges.len(), 3);
496        assert_eq!(ranges.get_pruner_hi(), 1);
497
498        // Remove middle range
499        ranges.remove(&10);
500        assert_eq!(ranges.len(), 2);
501        assert_eq!(ranges.get_pruner_hi(), 1);
502
503        // Remove first range
504        ranges.remove(&1);
505        assert_eq!(ranges.len(), 1);
506        assert_eq!(ranges.get_pruner_hi(), 20);
507
508        // Remove last range - watermark should use last_scheduled_range
509        ranges.remove(&20);
510        assert_eq!(ranges.len(), 0);
511        assert_eq!(ranges.get_pruner_hi(), 25); // End of last scheduled range
512    }
513
514    #[tokio::test]
515    async fn test_pruner() {
516        let handler = Arc::new(DataPipeline);
517        let pruner_config = PrunerConfig {
518            interval_ms: 10,
519            delay_ms: 2000,
520            retention: 1,
521            max_chunk_size: 100,
522            prune_concurrency: 1,
523        };
524        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
525        let metrics = IndexerMetrics::new(None, &registry);
526
527        // Update data
528        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
529        // Update committer watermark
530        let timestamp = SystemTime::now()
531            .duration_since(UNIX_EPOCH)
532            .unwrap()
533            .as_millis() as u64;
534
535        let watermark = MockWatermark {
536            epoch_hi_inclusive: 0,
537            checkpoint_hi_inclusive: 3,
538            tx_hi: 9,
539            timestamp_ms_hi_inclusive: timestamp,
540            reader_lo: 3,
541            pruner_timestamp: timestamp,
542            pruner_hi: 0,
543        };
544        let store = MockStore::new()
545            .with_watermark(DataPipeline::NAME, watermark)
546            .with_data(DataPipeline::NAME, test_data);
547
548        // Start the pruner
549        let store_clone = store.clone();
550        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
551
552        // Wait a short time within delay_ms
553        tokio::time::sleep(Duration::from_millis(200)).await;
554        {
555            let data = store.data.get(DataPipeline::NAME).unwrap();
556            assert!(
557                data.contains_key(&1),
558                "Checkpoint 1 shouldn't be pruned before delay"
559            );
560            assert!(
561                data.contains_key(&2),
562                "Checkpoint 2 shouldn't be pruned before delay"
563            );
564            assert!(
565                data.contains_key(&3),
566                "Checkpoint 3 shouldn't be pruned before delay"
567            );
568        }
569
570        // Wait for the delay to expire
571        tokio::time::sleep(Duration::from_millis(2000)).await;
572
573        // Now checkpoint 1 should be pruned
574        {
575            let data = store.data.get(DataPipeline::NAME).unwrap();
576            assert!(
577                !data.contains_key(&1),
578                "Checkpoint 1 should be pruned after delay"
579            );
580
581            // Checkpoint 3 should never be pruned (it's the reader_lo)
582            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
583
584            // Check that the pruner_hi was updated past 1
585            let watermark = store.watermark(DataPipeline::NAME).unwrap();
586            assert!(
587                watermark.pruner_hi > 1,
588                "Pruner watermark should be updated"
589            );
590        }
591    }
592
593    #[tokio::test]
594    async fn test_pruner_timestamp_in_the_past() {
595        let handler = Arc::new(DataPipeline);
596        let pruner_config = PrunerConfig {
597            interval_ms: 10,
598            delay_ms: 20_000,
599            retention: 1,
600            max_chunk_size: 100,
601            prune_concurrency: 1,
602        };
603        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
604        let metrics = IndexerMetrics::new(None, &registry);
605
606        // Update data
607        let test_data = HashMap::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6]), (3, vec![7, 8, 9])]);
608        // Update committer watermark
609        let timestamp = SystemTime::now()
610            .duration_since(UNIX_EPOCH)
611            .unwrap()
612            .as_millis() as u64;
613
614        let watermark = MockWatermark {
615            epoch_hi_inclusive: 0,
616            checkpoint_hi_inclusive: 3,
617            tx_hi: 9,
618            timestamp_ms_hi_inclusive: timestamp,
619            reader_lo: 3,
620            pruner_timestamp: 0,
621            pruner_hi: 0,
622        };
623        let store = MockStore::new()
624            .with_watermark(DataPipeline::NAME, watermark)
625            .with_data(DataPipeline::NAME, test_data);
626
627        // Start the pruner
628        let store_clone = store.clone();
629        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
630
631        // Because the `pruner_timestamp` is in the past, even with the delay_ms it should be pruned
632        // close to immediately. To be safe, sleep for 1000ms before checking, which is well under
633        // the delay_ms of 20_000 ms.
634        tokio::time::sleep(Duration::from_millis(500)).await;
635
636        {
637            let data = store.data.get(DataPipeline::NAME).unwrap();
638            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
639
640            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
641
642            // Checkpoint 3 should never be pruned (it's the reader_lo)
643            assert!(data.contains_key(&3), "Checkpoint 3 should be preserved");
644
645            // Check that the pruner_hi was updated past 1
646            let watermark = store.watermark(DataPipeline::NAME).unwrap();
647            assert!(
648                watermark.pruner_hi > 1,
649                "Pruner watermark should be updated"
650            );
651        }
652    }
653
654    #[tokio::test]
655    async fn test_pruner_watermark_update_with_retries() {
656        let handler = Arc::new(DataPipeline);
657        let pruner_config = PrunerConfig {
658            interval_ms: 3_000, // Long interval to test retried attempts of failed range.
659            delay_ms: 100,      // Short delay to speed up each interval
660            retention: 1,
661            max_chunk_size: 1, // Process one checkpoint at a time
662            prune_concurrency: 1,
663        };
664        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
665        let metrics = IndexerMetrics::new(None, &registry);
666
667        // Set up test data for checkpoints 1-4
668        let test_data = HashMap::from([
669            (1, vec![1, 2]),
670            (2, vec![3, 4]),
671            (3, vec![5, 6]),
672            (4, vec![7, 8]),
673        ]);
674
675        let timestamp = SystemTime::now()
676            .duration_since(UNIX_EPOCH)
677            .unwrap()
678            .as_millis() as u64;
679
680        let watermark = MockWatermark {
681            epoch_hi_inclusive: 0,
682            checkpoint_hi_inclusive: 4,
683            tx_hi: 8,
684            timestamp_ms_hi_inclusive: timestamp,
685            reader_lo: 4,        // Allow pruning up to checkpoint 4 (exclusive)
686            pruner_timestamp: 0, // Past timestamp so delay doesn't block
687            pruner_hi: 1,
688        };
689
690        // Configure failing behavior: range [1,2) should fail once before succeeding
691        let store = MockStore::new()
692            .with_watermark(DataPipeline::NAME, watermark)
693            .with_data(DataPipeline::NAME, test_data.clone())
694            .with_prune_failures(1, 2, 1);
695
696        // Start the pruner
697        let store_clone = store.clone();
698        let _pruner = pruner(handler, Some(pruner_config), store_clone, metrics);
699
700        // Wait for first pruning cycle - ranges [2,3) and [3,4) should succeed, [1,2) should fail
701        tokio::time::sleep(Duration::from_millis(500)).await;
702        {
703            let data = store.data.get(DataPipeline::NAME).unwrap();
704            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
705
706            // Verify watermark doesn't advance past the failed range [1,2)
707            assert_eq!(
708                watermarks.pruner_hi, 1,
709                "Pruner watermark should remain at 1 because range [1,2) failed"
710            );
711            assert!(data.contains_key(&1), "Checkpoint 1 should be preserved");
712            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
713            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
714            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
715        }
716
717        // Wait for second pruning cycle - range [1,2) should succeed on retry
718        tokio::time::sleep(Duration::from_millis(3000)).await;
719        {
720            let data = store.data.get(DataPipeline::NAME).unwrap();
721            let watermarks = store.watermark(DataPipeline::NAME).unwrap();
722
723            // Verify watermark advances after all ranges complete successfully
724            assert_eq!(
725                watermarks.pruner_hi, 4,
726                "Pruner watermark should advance to 4 after all ranges complete"
727            );
728            assert!(!data.contains_key(&1), "Checkpoint 1 should be pruned");
729            assert!(!data.contains_key(&2), "Checkpoint 2 should be pruned");
730            assert!(!data.contains_key(&3), "Checkpoint 3 should be pruned");
731            assert!(data.contains_key(&4), "Checkpoint 4 should be preserved");
732        }
733    }
734}