sui_indexer_builder/
sui_datasource.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::Task;
5use crate::indexer_builder::{DataSender, Datasource};
6use crate::metrics::IndexerMetricProvider;
7use anyhow::Error;
8use async_trait::async_trait;
9use mysten_metrics::{metered_channel, spawn_monitored_task};
10use prometheus::IntGauge;
11use std::path::PathBuf;
12use std::sync::Arc;
13use sui_data_ingestion_core::{
14    DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool,
15};
16use sui_sdk::SuiClient;
17use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData;
18use sui_types::full_checkpoint_content::CheckpointTransaction;
19use sui_types::messages_checkpoint::CheckpointSequenceNumber;
20use tokio::sync::oneshot;
21use tokio::sync::oneshot::Sender;
22use tokio::task::JoinHandle;
23
24const BACKFILL_TASK_INGESTION_READER_BATCH_SIZE: usize = 300;
25const LIVE_TASK_INGESTION_READER_BATCH_SIZE: usize = 10;
26
27pub struct SuiCheckpointDatasource {
28    remote_store_url: String,
29    sui_client: Arc<SuiClient>,
30    concurrency: usize,
31    checkpoint_path: PathBuf,
32    genesis_checkpoint: u64,
33    ingestion_metrics: DataIngestionMetrics,
34    metrics: Box<dyn IndexerMetricProvider>,
35}
36impl SuiCheckpointDatasource {
37    pub fn new(
38        remote_store_url: String,
39        sui_client: Arc<SuiClient>,
40        concurrency: usize,
41        checkpoint_path: PathBuf,
42        genesis_checkpoint: u64,
43        ingestion_metrics: DataIngestionMetrics,
44        metrics: Box<dyn IndexerMetricProvider>,
45    ) -> Self {
46        SuiCheckpointDatasource {
47            remote_store_url,
48            sui_client,
49            concurrency,
50            checkpoint_path,
51            genesis_checkpoint,
52            ingestion_metrics,
53            metrics,
54        }
55    }
56}
57
58#[async_trait]
59impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
60    async fn start_data_retrieval(
61        &self,
62        task: Task,
63        data_sender: DataSender<CheckpointTxnData>,
64    ) -> Result<JoinHandle<Result<(), Error>>, Error> {
65        let (exit_sender, exit_receiver) = oneshot::channel();
66        let progress_store = PerTaskInMemProgressStore {
67            current_checkpoint: task.start_checkpoint,
68            exit_checkpoint: task.target_checkpoint,
69            exit_sender: Some(exit_sender),
70        };
71        // The max concurrnecy of checkpoint to fetch at the same time for ingestion framework
72        let ingestion_reader_batch_size = if task.is_live_task {
73            // Live task uses smaller number to be cost effective
74            LIVE_TASK_INGESTION_READER_BATCH_SIZE
75        } else {
76            std::env::var("BACKFILL_TASK_INGESTION_READER_BATCH_SIZE")
77                .unwrap_or(BACKFILL_TASK_INGESTION_READER_BATCH_SIZE.to_string())
78                .parse::<usize>()
79                .unwrap()
80        };
81        tracing::info!(
82            "Starting Sui checkpoint data retrieval with batch size {}",
83            ingestion_reader_batch_size
84        );
85        let mut executor = IndexerExecutor::new(progress_store, 1, self.ingestion_metrics.clone());
86        let progress_metric = self
87            .metrics
88            .get_tasks_latest_retrieved_checkpoints()
89            .with_label_values(&[task.name_prefix(), task.type_str()]);
90        let worker = IndexerWorker::new(data_sender, progress_metric);
91        let worker_pool = WorkerPool::new(worker, task.task_name.clone(), self.concurrency);
92        executor.register(worker_pool).await?;
93        let checkpoint_path = self.checkpoint_path.clone();
94        let remote_store_url = self.remote_store_url.clone();
95        Ok(spawn_monitored_task!(async {
96            executor
97                .run(
98                    checkpoint_path,
99                    Some(remote_store_url),
100                    vec![], // optional remote store access options
101                    ReaderOptions {
102                        batch_size: ingestion_reader_batch_size,
103                        ..Default::default()
104                    },
105                    exit_receiver,
106                )
107                .await?;
108            Ok(())
109        }))
110    }
111
112    async fn get_live_task_starting_checkpoint(&self) -> Result<u64, Error> {
113        self.sui_client
114            .read_api()
115            .get_latest_checkpoint_sequence_number()
116            .await
117            .map_err(|e| anyhow::anyhow!("Failed to get last finalized block id: {:?}", e))
118    }
119
120    fn get_genesis_height(&self) -> u64 {
121        self.genesis_checkpoint
122    }
123
124    fn metric_provider(&self) -> &dyn IndexerMetricProvider {
125        self.metrics.as_ref()
126    }
127}
128
129struct PerTaskInMemProgressStore {
130    pub current_checkpoint: u64,
131    pub exit_checkpoint: u64,
132    pub exit_sender: Option<Sender<()>>,
133}
134
135#[async_trait]
136impl ProgressStore for PerTaskInMemProgressStore {
137    async fn load(
138        &mut self,
139        _task_name: String,
140    ) -> Result<CheckpointSequenceNumber, anyhow::Error> {
141        Ok(self.current_checkpoint)
142    }
143
144    async fn save(
145        &mut self,
146        task_name: String,
147        checkpoint_number: CheckpointSequenceNumber,
148    ) -> anyhow::Result<()> {
149        if checkpoint_number >= self.exit_checkpoint {
150            tracing::info!(
151                task_name,
152                checkpoint_number,
153                exit_checkpoint = self.exit_checkpoint,
154                "Task completed, sending exit signal"
155            );
156            // `exit_sender` may be `None` if we have already sent the exit signal.
157            if let Some(sender) = self.exit_sender.take() {
158                // Ignore the error if the receiver has already been dropped.
159                let _ = sender.send(());
160            }
161        }
162        self.current_checkpoint = checkpoint_number;
163        Ok(())
164    }
165}
166
167pub struct IndexerWorker<T> {
168    data_sender: metered_channel::Sender<(u64, Vec<T>)>,
169    progress_metric: IntGauge,
170}
171
172impl<T> IndexerWorker<T> {
173    pub fn new(
174        data_sender: metered_channel::Sender<(u64, Vec<T>)>,
175        progress_metric: IntGauge,
176    ) -> Self {
177        Self {
178            data_sender,
179            progress_metric,
180        }
181    }
182}
183
184pub type CheckpointTxnData = (CheckpointTransaction, u64, u64);
185
186#[async_trait]
187impl Worker for IndexerWorker<CheckpointTxnData> {
188    type Result = ();
189
190    async fn process_checkpoint(&self, checkpoint: &SuiCheckpointData) -> anyhow::Result<()> {
191        tracing::trace!(
192            "Received checkpoint [{}] {}: {}",
193            checkpoint.checkpoint_summary.epoch,
194            checkpoint.checkpoint_summary.sequence_number,
195            checkpoint.transactions.len(),
196        );
197        let checkpoint_num = checkpoint.checkpoint_summary.sequence_number;
198        let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms;
199
200        let transactions = checkpoint
201            .transactions
202            .clone()
203            .into_iter()
204            .map(|tx| (tx, checkpoint_num, timestamp_ms))
205            .collect();
206        self.data_sender
207            .send((checkpoint_num, transactions))
208            .await?;
209        self.progress_metric.set(checkpoint_num as i64);
210        Ok(())
211    }
212}