sui_indexer_builder/
sui_datasource.rs1use crate::Task;
5use crate::indexer_builder::{DataSender, Datasource};
6use crate::metrics::IndexerMetricProvider;
7use anyhow::Error;
8use async_trait::async_trait;
9use mysten_metrics::{metered_channel, spawn_monitored_task};
10use prometheus::IntGauge;
11use std::path::PathBuf;
12use std::sync::Arc;
13use sui_data_ingestion_core::{
14 DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool,
15};
16use sui_sdk::SuiClient;
17use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData;
18use sui_types::full_checkpoint_content::CheckpointTransaction;
19use sui_types::messages_checkpoint::CheckpointSequenceNumber;
20use tokio::sync::oneshot;
21use tokio::sync::oneshot::Sender;
22use tokio::task::JoinHandle;
23
24const BACKFILL_TASK_INGESTION_READER_BATCH_SIZE: usize = 300;
25const LIVE_TASK_INGESTION_READER_BATCH_SIZE: usize = 10;
26
27pub struct SuiCheckpointDatasource {
28 remote_store_url: String,
29 sui_client: Arc<SuiClient>,
30 concurrency: usize,
31 checkpoint_path: PathBuf,
32 genesis_checkpoint: u64,
33 ingestion_metrics: DataIngestionMetrics,
34 metrics: Box<dyn IndexerMetricProvider>,
35}
36impl SuiCheckpointDatasource {
37 pub fn new(
38 remote_store_url: String,
39 sui_client: Arc<SuiClient>,
40 concurrency: usize,
41 checkpoint_path: PathBuf,
42 genesis_checkpoint: u64,
43 ingestion_metrics: DataIngestionMetrics,
44 metrics: Box<dyn IndexerMetricProvider>,
45 ) -> Self {
46 SuiCheckpointDatasource {
47 remote_store_url,
48 sui_client,
49 concurrency,
50 checkpoint_path,
51 genesis_checkpoint,
52 ingestion_metrics,
53 metrics,
54 }
55 }
56}
57
58#[async_trait]
59impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
60 async fn start_data_retrieval(
61 &self,
62 task: Task,
63 data_sender: DataSender<CheckpointTxnData>,
64 ) -> Result<JoinHandle<Result<(), Error>>, Error> {
65 let (exit_sender, exit_receiver) = oneshot::channel();
66 let progress_store = PerTaskInMemProgressStore {
67 current_checkpoint: task.start_checkpoint,
68 exit_checkpoint: task.target_checkpoint,
69 exit_sender: Some(exit_sender),
70 };
71 let ingestion_reader_batch_size = if task.is_live_task {
73 LIVE_TASK_INGESTION_READER_BATCH_SIZE
75 } else {
76 std::env::var("BACKFILL_TASK_INGESTION_READER_BATCH_SIZE")
77 .unwrap_or(BACKFILL_TASK_INGESTION_READER_BATCH_SIZE.to_string())
78 .parse::<usize>()
79 .unwrap()
80 };
81 tracing::info!(
82 "Starting Sui checkpoint data retrieval with batch size {}",
83 ingestion_reader_batch_size
84 );
85 let mut executor = IndexerExecutor::new(progress_store, 1, self.ingestion_metrics.clone());
86 let progress_metric = self
87 .metrics
88 .get_tasks_latest_retrieved_checkpoints()
89 .with_label_values(&[task.name_prefix(), task.type_str()]);
90 let worker = IndexerWorker::new(data_sender, progress_metric);
91 let worker_pool = WorkerPool::new(worker, task.task_name.clone(), self.concurrency);
92 executor.register(worker_pool).await?;
93 let checkpoint_path = self.checkpoint_path.clone();
94 let remote_store_url = self.remote_store_url.clone();
95 Ok(spawn_monitored_task!(async {
96 executor
97 .run(
98 checkpoint_path,
99 Some(remote_store_url),
100 vec![], ReaderOptions {
102 batch_size: ingestion_reader_batch_size,
103 ..Default::default()
104 },
105 exit_receiver,
106 )
107 .await?;
108 Ok(())
109 }))
110 }
111
112 async fn get_live_task_starting_checkpoint(&self) -> Result<u64, Error> {
113 self.sui_client
114 .read_api()
115 .get_latest_checkpoint_sequence_number()
116 .await
117 .map_err(|e| anyhow::anyhow!("Failed to get last finalized block id: {:?}", e))
118 }
119
120 fn get_genesis_height(&self) -> u64 {
121 self.genesis_checkpoint
122 }
123
124 fn metric_provider(&self) -> &dyn IndexerMetricProvider {
125 self.metrics.as_ref()
126 }
127}
128
129struct PerTaskInMemProgressStore {
130 pub current_checkpoint: u64,
131 pub exit_checkpoint: u64,
132 pub exit_sender: Option<Sender<()>>,
133}
134
135#[async_trait]
136impl ProgressStore for PerTaskInMemProgressStore {
137 async fn load(
138 &mut self,
139 _task_name: String,
140 ) -> Result<CheckpointSequenceNumber, anyhow::Error> {
141 Ok(self.current_checkpoint)
142 }
143
144 async fn save(
145 &mut self,
146 task_name: String,
147 checkpoint_number: CheckpointSequenceNumber,
148 ) -> anyhow::Result<()> {
149 if checkpoint_number >= self.exit_checkpoint {
150 tracing::info!(
151 task_name,
152 checkpoint_number,
153 exit_checkpoint = self.exit_checkpoint,
154 "Task completed, sending exit signal"
155 );
156 if let Some(sender) = self.exit_sender.take() {
158 let _ = sender.send(());
160 }
161 }
162 self.current_checkpoint = checkpoint_number;
163 Ok(())
164 }
165}
166
167pub struct IndexerWorker<T> {
168 data_sender: metered_channel::Sender<(u64, Vec<T>)>,
169 progress_metric: IntGauge,
170}
171
172impl<T> IndexerWorker<T> {
173 pub fn new(
174 data_sender: metered_channel::Sender<(u64, Vec<T>)>,
175 progress_metric: IntGauge,
176 ) -> Self {
177 Self {
178 data_sender,
179 progress_metric,
180 }
181 }
182}
183
184pub type CheckpointTxnData = (CheckpointTransaction, u64, u64);
185
186#[async_trait]
187impl Worker for IndexerWorker<CheckpointTxnData> {
188 type Result = ();
189
190 async fn process_checkpoint(&self, checkpoint: &SuiCheckpointData) -> anyhow::Result<()> {
191 tracing::trace!(
192 "Received checkpoint [{}] {}: {}",
193 checkpoint.checkpoint_summary.epoch,
194 checkpoint.checkpoint_summary.sequence_number,
195 checkpoint.transactions.len(),
196 );
197 let checkpoint_num = checkpoint.checkpoint_summary.sequence_number;
198 let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms;
199
200 let transactions = checkpoint
201 .transactions
202 .clone()
203 .into_iter()
204 .map(|tx| (tx, checkpoint_num, timestamp_ms))
205 .collect();
206 self.data_sender
207 .send((checkpoint_num, transactions))
208 .await?;
209 self.progress_metric.set(checkpoint_num as i64);
210 Ok(())
211 }
212}