sui_indexer/backfill/
backfill_runner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::backfill::BackfillTaskKind;
5use crate::backfill::backfill_instances::get_backfill_task;
6use crate::backfill::backfill_task::BackfillTask;
7use crate::config::BackFillConfig;
8use crate::database::ConnectionPool;
9use futures::StreamExt;
10use std::collections::BTreeSet;
11use std::ops::RangeInclusive;
12use std::sync::Arc;
13use std::time::Instant;
14use tokio::sync::{Mutex, mpsc};
15use tokio_stream::wrappers::ReceiverStream;
16
17pub struct BackfillRunner {}
18
19impl BackfillRunner {
20    pub async fn run(
21        runner_kind: BackfillTaskKind,
22        pool: ConnectionPool,
23        backfill_config: BackFillConfig,
24        total_range: RangeInclusive<usize>,
25    ) {
26        let task = get_backfill_task(runner_kind, *total_range.start()).await;
27        Self::run_impl(pool, backfill_config, total_range, task).await;
28    }
29
30    /// Main function to run the parallel queries and batch processing.
31    async fn run_impl(
32        pool: ConnectionPool,
33        config: BackFillConfig,
34        total_range: RangeInclusive<usize>,
35        task: Arc<dyn BackfillTask>,
36    ) {
37        let cur_time = Instant::now();
38        // Keeps track of the checkpoint ranges (using starting checkpoint number)
39        // that are in progress.
40        let in_progress = Arc::new(Mutex::new(BTreeSet::new()));
41
42        let concurrency = config.max_concurrency;
43        let (tx, rx) = mpsc::channel(concurrency * 10);
44        // Spawn a task to send chunks lazily over the channel
45        tokio::spawn(async move {
46            for chunk in create_chunk_iter(total_range, config.chunk_size) {
47                if tx.send(chunk).await.is_err() {
48                    // Channel closed, stop producing chunks
49                    break;
50                }
51            }
52        });
53        // Convert the receiver into a stream
54        let stream = ReceiverStream::new(rx);
55
56        // Process chunks in parallel, limiting the number of concurrent query tasks
57        stream
58            .for_each_concurrent(concurrency, move |range| {
59                let pool_clone = pool.clone();
60                let in_progress_clone = in_progress.clone();
61                let task = task.clone();
62
63                async move {
64                    in_progress_clone.lock().await.insert(*range.start());
65                    task.backfill_range(pool_clone, &range).await;
66                    println!("Finished range: {:?}.", range);
67                    in_progress_clone.lock().await.remove(range.start());
68                    let cur_min_in_progress = in_progress_clone.lock().await.iter().next().cloned();
69                    if let Some(cur_min_in_progress) = cur_min_in_progress {
70                        println!(
71                            "Average backfill speed: {} checkpoints/s. Minimum range start number still in progress: {:?}.",
72                            cur_min_in_progress as f64 / cur_time.elapsed().as_secs_f64(),
73                            cur_min_in_progress
74                        );
75                    }
76                }
77            })
78            .await;
79
80        println!("Finished backfilling in {:?}", cur_time.elapsed());
81    }
82}
83
84/// Creates chunks based on the total range and chunk size.
85fn create_chunk_iter(
86    total_range: RangeInclusive<usize>,
87    chunk_size: usize,
88) -> impl Iterator<Item = RangeInclusive<usize>> {
89    let end = *total_range.end();
90    total_range.step_by(chunk_size).map(move |chunk_start| {
91        let chunk_end = std::cmp::min(chunk_start + chunk_size - 1, end);
92        chunk_start..=chunk_end
93    })
94}