sui_indexer/backfill/
backfill_runner.rs1use crate::backfill::BackfillTaskKind;
5use crate::backfill::backfill_instances::get_backfill_task;
6use crate::backfill::backfill_task::BackfillTask;
7use crate::config::BackFillConfig;
8use crate::database::ConnectionPool;
9use futures::StreamExt;
10use std::collections::BTreeSet;
11use std::ops::RangeInclusive;
12use std::sync::Arc;
13use std::time::Instant;
14use tokio::sync::{Mutex, mpsc};
15use tokio_stream::wrappers::ReceiverStream;
16
17pub struct BackfillRunner {}
18
19impl BackfillRunner {
20 pub async fn run(
21 runner_kind: BackfillTaskKind,
22 pool: ConnectionPool,
23 backfill_config: BackFillConfig,
24 total_range: RangeInclusive<usize>,
25 ) {
26 let task = get_backfill_task(runner_kind, *total_range.start()).await;
27 Self::run_impl(pool, backfill_config, total_range, task).await;
28 }
29
30 async fn run_impl(
32 pool: ConnectionPool,
33 config: BackFillConfig,
34 total_range: RangeInclusive<usize>,
35 task: Arc<dyn BackfillTask>,
36 ) {
37 let cur_time = Instant::now();
38 let in_progress = Arc::new(Mutex::new(BTreeSet::new()));
41
42 let concurrency = config.max_concurrency;
43 let (tx, rx) = mpsc::channel(concurrency * 10);
44 tokio::spawn(async move {
46 for chunk in create_chunk_iter(total_range, config.chunk_size) {
47 if tx.send(chunk).await.is_err() {
48 break;
50 }
51 }
52 });
53 let stream = ReceiverStream::new(rx);
55
56 stream
58 .for_each_concurrent(concurrency, move |range| {
59 let pool_clone = pool.clone();
60 let in_progress_clone = in_progress.clone();
61 let task = task.clone();
62
63 async move {
64 in_progress_clone.lock().await.insert(*range.start());
65 task.backfill_range(pool_clone, &range).await;
66 println!("Finished range: {:?}.", range);
67 in_progress_clone.lock().await.remove(range.start());
68 let cur_min_in_progress = in_progress_clone.lock().await.iter().next().cloned();
69 if let Some(cur_min_in_progress) = cur_min_in_progress {
70 println!(
71 "Average backfill speed: {} checkpoints/s. Minimum range start number still in progress: {:?}.",
72 cur_min_in_progress as f64 / cur_time.elapsed().as_secs_f64(),
73 cur_min_in_progress
74 );
75 }
76 }
77 })
78 .await;
79
80 println!("Finished backfilling in {:?}", cur_time.elapsed());
81 }
82}
83
84fn create_chunk_iter(
86 total_range: RangeInclusive<usize>,
87 chunk_size: usize,
88) -> impl Iterator<Item = RangeInclusive<usize>> {
89 let end = *total_range.end();
90 total_range.step_by(chunk_size).map(move |chunk_start| {
91 let chunk_end = std::cmp::min(chunk_start + chunk_size - 1, end);
92 chunk_start..=chunk_end
93 })
94}