sui_indexer_builder/
indexer_builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::min;
5use std::sync::Arc;
6
7use anyhow::Error;
8use async_trait::async_trait;
9use futures::StreamExt;
10use prometheus::{IntGauge, IntGaugeVec};
11use tokio::task::JoinHandle;
12
13use crate::metrics::IndexerMetricProvider;
14use crate::{Task, Tasks};
15use mysten_metrics::{metered_channel, spawn_monitored_task};
16use tap::tap::TapFallible;
17
18type CheckpointData<T> = (u64, Vec<T>);
19pub type DataSender<T> = metered_channel::Sender<CheckpointData<T>>;
20
21const INGESTION_BATCH_SIZE: usize = 100;
22const RETRIEVED_CHECKPOINT_CHANNEL_SIZE: usize = 10000;
23
24pub struct IndexerBuilder<D, M, P> {
25    name: String,
26    datasource: D,
27    data_mapper: M,
28    persistent: P,
29    backfill_strategy: BackfillStrategy,
30    disable_live_task: bool,
31}
32
33impl<D, M, P> IndexerBuilder<D, M, P> {
34    pub fn new<R>(
35        name: &str,
36        datasource: D,
37        data_mapper: M,
38        persistent: P,
39    ) -> IndexerBuilder<D, M, P>
40    where
41        P: Persistent<R>,
42    {
43        IndexerBuilder {
44            name: name.into(),
45            datasource,
46            data_mapper,
47            backfill_strategy: BackfillStrategy::Simple,
48            disable_live_task: false,
49            persistent,
50        }
51    }
52    pub fn build(self) -> Indexer<P, D, M> {
53        Indexer {
54            name: self.name,
55            storage: self.persistent,
56            datasource: self.datasource.into(),
57            backfill_strategy: self.backfill_strategy,
58            disable_live_task: self.disable_live_task,
59            data_mapper: self.data_mapper,
60        }
61    }
62
63    pub fn with_backfill_strategy(mut self, backfill: BackfillStrategy) -> Self {
64        self.backfill_strategy = backfill;
65        self
66    }
67
68    pub fn disable_live_task(mut self) -> Self {
69        self.disable_live_task = true;
70        self
71    }
72}
73
74pub struct Indexer<P, D, M> {
75    name: String,
76    storage: P,
77    datasource: Arc<D>,
78    data_mapper: M,
79    backfill_strategy: BackfillStrategy,
80    disable_live_task: bool,
81}
82
83impl<P, D, M> Indexer<P, D, M> {
84    pub async fn start<T, R>(mut self) -> Result<(), Error>
85    where
86        D: Datasource<T> + 'static,
87        M: DataMapper<T, R> + 'static,
88        P: Persistent<R> + 'static,
89        T: Send,
90    {
91        let task_name = self.name.clone();
92        // Update tasks first
93        self.update_tasks()
94            .await
95            .tap_err(|e| {
96                tracing::error!(task_name, "Failed to update tasks: {:?}", e);
97            })
98            .tap_ok(|_| {
99                tracing::info!(task_name, "Tasks updated.");
100            })?;
101
102        // get ongoing tasks from storage
103        let ongoing_tasks = self
104            .storage
105            .get_ongoing_tasks(&self.name)
106            .await
107            .tap_err(|e| {
108                tracing::error!(task_name, "Failed to get updated tasks: {:?}", e);
109            })
110            .tap_ok(|tasks| {
111                tracing::info!(task_name, "Got updated tasks: {:?}", tasks);
112            })?;
113
114        // Start latest checkpoint worker
115        // Tasks are ordered in checkpoint descending order, realtime update task always come first
116        // tasks won't be empty here, ok to unwrap.
117        let live_task_future = match ongoing_tasks.live_task() {
118            Some(live_task) if !self.disable_live_task => {
119                let live_task_future = self.datasource.start_ingestion_task(
120                    live_task,
121                    self.storage.clone(),
122                    self.data_mapper.clone(),
123                );
124                Some(live_task_future)
125            }
126            _ => None,
127        };
128
129        let backfill_tasks = ongoing_tasks.backfill_tasks_ordered_desc();
130        let storage_clone = self.storage.clone();
131        let data_mapper_clone = self.data_mapper.clone();
132        let datasource_clone = self.datasource.clone();
133
134        let handle = spawn_monitored_task!(async {
135            // Execute tasks one by one
136            for backfill_task in backfill_tasks {
137                if backfill_task.start_checkpoint < backfill_task.target_checkpoint {
138                    datasource_clone
139                        .start_ingestion_task(
140                            backfill_task,
141                            storage_clone.clone(),
142                            data_mapper_clone.clone(),
143                        )
144                        .await
145                        .expect("Backfill task failed");
146                }
147            }
148        });
149
150        if let Some(live_task_future) = live_task_future {
151            live_task_future.await?;
152        }
153
154        tokio::try_join!(handle)?;
155
156        Ok(())
157    }
158
159    async fn update_tasks<T, R>(&mut self) -> Result<(), Error>
160    where
161        P: Persistent<R>,
162        D: Datasource<T>,
163        T: Send,
164    {
165        let ongoing_tasks = self.storage.get_ongoing_tasks(&self.name).await?;
166        let largest_checkpoint = self
167            .storage
168            .get_largest_indexed_checkpoint(&self.name)
169            .await?;
170        let live_task_from_checkpoint = self.datasource.get_live_task_starting_checkpoint().await?;
171
172        // Create and update live task if needed
173        // for live task, we always start from `live_task_from_checkpoint`.
174        // What if there are older tasks with larger height? It's very
175        // unlikely, and even if it happens, we just reprocess the range.
176        // This simplifies the logic of determining task boundaries.
177        if !self.disable_live_task {
178            match ongoing_tasks.live_task() {
179                None => {
180                    self.storage
181                        .register_live_task(
182                            format!("{} - Live", self.name),
183                            live_task_from_checkpoint,
184                        )
185                        .await
186                        .tap_ok(|_| {
187                            tracing::info!(
188                                task_name = self.name.as_str(),
189                                "Created live task from {}",
190                                live_task_from_checkpoint,
191                            );
192                        })
193                        .tap_err(|e| {
194                            tracing::error!(
195                                "Failed to register live task ({}-MAX): {:?}",
196                                live_task_from_checkpoint,
197                                e
198                            );
199                        })?;
200                }
201                Some(mut live_task) => {
202                    // We still check this because in the case of slow
203                    // block generation (e.g. Ethereum), it's possible we will
204                    // stay on the same block for a bit.
205                    if live_task_from_checkpoint != live_task.start_checkpoint {
206                        let old_checkpoint = live_task.start_checkpoint;
207                        live_task.start_checkpoint = live_task_from_checkpoint;
208                        self.storage
209                            .update_task(live_task)
210                            .await
211                            .tap_ok(|_| {
212                                tracing::info!(
213                                    task_name = self.name.as_str(),
214                                    "Updated live task starting point from {} to {}",
215                                    old_checkpoint,
216                                    live_task_from_checkpoint,
217                                );
218                            })
219                            .tap_err(|e| {
220                                tracing::error!(
221                                    "Failed to update live task to ({}-MAX): {:?}",
222                                    live_task_from_checkpoint,
223                                    e
224                                );
225                            })?;
226                    }
227                }
228            }
229        }
230
231        // 2, if there is a gap between `largest_checkpoint` and `live_task_from_checkpoint`,
232        // create backfill task [largest_checkpoint + 1, live_task_from_checkpoint - 1]
233
234        // TODO: when there is a hole, we create one task for the hole, but ideally we should
235        // honor the partition size and create as needed.
236        let from_checkpoint = largest_checkpoint
237            .map(|cp| cp + 1)
238            .unwrap_or(self.datasource.get_genesis_height());
239        if from_checkpoint < live_task_from_checkpoint {
240            self.create_backfill_tasks(from_checkpoint, live_task_from_checkpoint - 1)
241                .await
242                .tap_ok(|_| {
243                    tracing::info!(
244                        task_name = self.name.as_str(),
245                        "Created backfill tasks ({}-{})",
246                        from_checkpoint,
247                        live_task_from_checkpoint - 1
248                    );
249                })
250                .tap_err(|e| {
251                    tracing::error!(
252                        task_name = self.name.as_str(),
253                        "Failed to create backfill tasks ({}-{}): {:?}",
254                        from_checkpoint,
255                        live_task_from_checkpoint - 1,
256                        e
257                    );
258                })?;
259        }
260        Ok(())
261    }
262
263    // Create backfill tasks according to backfill strategy
264    async fn create_backfill_tasks<R>(&mut self, mut from_cp: u64, to_cp: u64) -> Result<(), Error>
265    where
266        P: Persistent<R>,
267    {
268        match self.backfill_strategy {
269            BackfillStrategy::Simple => {
270                self.storage
271                    .register_task(
272                        format!("{} - backfill - {from_cp}:{to_cp}", self.name),
273                        from_cp,
274                        to_cp,
275                    )
276                    .await
277            }
278            BackfillStrategy::Partitioned { task_size } => {
279                // TODO: register all tasks in one DB write
280                while from_cp < to_cp {
281                    let target_cp = min(from_cp + task_size - 1, to_cp);
282                    self.storage
283                        .register_task(
284                            format!("{} - backfill - {from_cp}:{target_cp}", self.name),
285                            from_cp,
286                            target_cp,
287                        )
288                        .await?;
289                    from_cp = target_cp + 1;
290                }
291                Ok(())
292            }
293            BackfillStrategy::Disabled => Ok(()),
294        }
295    }
296
297    #[cfg(any(feature = "test-utils", test))]
298    pub async fn test_only_update_tasks<R, T>(&mut self) -> Result<(), Error>
299    where
300        P: Persistent<R>,
301        D: Datasource<T>,
302        T: Send,
303    {
304        self.update_tasks().await
305    }
306
307    #[cfg(any(feature = "test-utils", test))]
308    pub fn test_only_storage<R>(&self) -> &P
309    where
310        P: Persistent<R>,
311    {
312        &self.storage
313    }
314
315    #[cfg(any(feature = "test-utils", test))]
316    pub fn test_only_name(&self) -> String {
317        self.name.clone()
318    }
319}
320
321#[async_trait]
322pub trait Persistent<T>: IndexerProgressStore + Sync + Send + Clone {
323    async fn write(&self, data: Vec<T>) -> Result<(), Error>;
324}
325
326#[async_trait]
327pub trait IndexerProgressStore: Send {
328    async fn load_progress(&self, task_name: String) -> anyhow::Result<u64>;
329    /// Attempt to save progress. Depending on the `ProgressSavingPolicy`,
330    /// the progress may be cached somewhere instead of flushing to persistent storage.
331    /// Returns saved checkpoint number if any. Caller can use this value as a signal
332    /// to see if we have reached the target checkpoint.
333    async fn save_progress(
334        &mut self,
335        task: &Task,
336        checkpoint_numbers: &[u64],
337    ) -> anyhow::Result<Option<u64>>;
338
339    async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result<Tasks, Error>;
340
341    async fn get_largest_indexed_checkpoint(&self, prefix: &str) -> Result<Option<u64>, Error>;
342
343    async fn register_task(
344        &mut self,
345        task_name: String,
346        start_checkpoint: u64,
347        target_checkpoint: u64,
348    ) -> Result<(), anyhow::Error>;
349
350    async fn register_live_task(
351        &mut self,
352        task_name: String,
353        start_checkpoint: u64,
354    ) -> Result<(), anyhow::Error>;
355
356    async fn update_task(&mut self, task: Task) -> Result<(), Error>;
357}
358
359#[async_trait]
360pub trait Datasource<T: Send>: Sync + Send {
361    async fn start_ingestion_task<M, P, R>(
362        &self,
363        task: Task,
364        mut storage: P,
365        data_mapper: M,
366    ) -> Result<(), Error>
367    where
368        M: DataMapper<T, R>,
369        P: Persistent<R>,
370    {
371        let task_name = task.task_name.clone();
372        let task_name_prefix = task.name_prefix();
373        let task_type_label = task.type_str();
374        let starting_checkpoint = task.start_checkpoint;
375        let target_checkpoint = task.target_checkpoint;
376        let ingestion_batch_size = std::env::var("INGESTION_BATCH_SIZE")
377            .unwrap_or(INGESTION_BATCH_SIZE.to_string())
378            .parse::<usize>()
379            .unwrap();
380        let checkpoint_channel_size = std::env::var("RETRIEVED_CHECKPOINT_CHANNEL_SIZE")
381            .unwrap_or(RETRIEVED_CHECKPOINT_CHANNEL_SIZE.to_string())
382            .parse::<usize>()
383            .unwrap();
384        tracing::info!(
385            task_name,
386            ingestion_batch_size,
387            checkpoint_channel_size,
388            "Starting ingestion task ({}-{})",
389            starting_checkpoint,
390            target_checkpoint,
391        );
392        let (data_sender, data_rx) = metered_channel::channel(
393            checkpoint_channel_size,
394            &mysten_metrics::get_metrics()
395                .unwrap()
396                .channel_inflight
397                // This metric works now when there is only 1 backfill task running per task name.
398                // It will be unusable when there are parallel backfill tasks per task name.
399                .with_label_values(&[&format!("{}-{}", task_name_prefix, task_type_label)]),
400        );
401        let is_live_task = task.is_live_task;
402        let _live_tasks_tracker = if is_live_task {
403            Some(LiveTasksTracker::new(
404                self.metric_provider()
405                    .get_inflight_live_tasks_metrics()
406                    .clone(),
407                &task_name,
408            ))
409        } else {
410            None
411        };
412        let join_handle = self.start_data_retrieval(task.clone(), data_sender).await?;
413        let processed_checkpoints_metrics = self
414            .metric_provider()
415            .get_tasks_processed_checkpoints_metric()
416            .with_label_values(&[task_name_prefix, task_type_label]);
417        // track remaining checkpoints per task, except for live task
418        let remaining_checkpoints_metric = if !is_live_task {
419            let remaining = self
420                .metric_provider()
421                .get_tasks_remaining_checkpoints_metric()
422                .with_label_values(&[task_name_prefix]);
423            remaining.set((target_checkpoint - starting_checkpoint + 1) as i64);
424            Some(remaining)
425        } else {
426            None
427        };
428
429        let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(data_rx)
430            .ready_chunks(ingestion_batch_size);
431        let mut last_saved_checkpoint = None;
432        loop {
433            let batch_option = stream.next().await;
434            if batch_option.is_none() {
435                tracing::error!(task_name, "Data stream ended unexpectedly");
436                break;
437            }
438            let batch = batch_option.unwrap();
439            let mut max_height = 0;
440            let mut heights = vec![];
441            let mut data = vec![];
442            for (height, d) in batch {
443                // Filter out data with height > target_checkpoint, in case data source returns any
444                if height > target_checkpoint {
445                    tracing::warn!(
446                        task_name,
447                        height,
448                        "Received data with height > target_checkpoint, skipping."
449                    );
450                    continue;
451                }
452                max_height = std::cmp::max(max_height, height);
453                heights.push(height);
454                data.extend(d);
455            }
456            tracing::debug!(
457                task_name,
458                max_height,
459                "Ingestion task received {} blocks.",
460                heights.len(),
461            );
462            let timer = tokio::time::Instant::now();
463
464            if !data.is_empty() {
465                let timer = tokio::time::Instant::now();
466                let processed_data = data.into_iter().try_fold(vec![], |mut result, d| {
467                    result.append(&mut data_mapper.map(d)?);
468                    Ok::<Vec<_>, Error>(result)
469                })?;
470                tracing::debug!(
471                    task_name,
472                    max_height,
473                    "Data mapper processed {} blocks in {}ms.",
474                    heights.len(),
475                    timer.elapsed().as_millis(),
476                );
477                let timer = tokio::time::Instant::now();
478                // TODO: batch write data
479                // TODO: we might be able to write data and progress in a single transaction.
480                storage.write(processed_data).await?;
481                tracing::debug!(
482                    task_name,
483                    max_height,
484                    "Processed data ({} blocks) was wrote to storage in {}ms.",
485                    heights.len(),
486                    timer.elapsed().as_millis(),
487                );
488            }
489            last_saved_checkpoint = storage.save_progress(&task, &heights).await?;
490            tracing::debug!(
491                task_name,
492                max_height,
493                last_saved_checkpoint,
494                "Ingestion task processed {} blocks in {}ms",
495                heights.len(),
496                timer.elapsed().as_millis(),
497            );
498            processed_checkpoints_metrics.inc_by(heights.len() as u64);
499            if let Some(m) = &remaining_checkpoints_metric {
500                // Note this is only approximate as the data may come in out of order
501                m.set(std::cmp::max(
502                    target_checkpoint as i64 - max_height as i64,
503                    0,
504                ));
505            }
506            // If we have reached the target checkpoint, exit proactively
507            if let Some(cp) = last_saved_checkpoint
508                && cp >= target_checkpoint
509            {
510                // Task is done
511                break;
512            }
513        }
514        if is_live_task {
515            // Live task should never exit, except in unit tests
516            tracing::error!(task_name, "Live task exiting unexpectedly");
517        } else if let Some(last_saved_checkpoint) = last_saved_checkpoint {
518            if last_saved_checkpoint < target_checkpoint {
519                tracing::error!(
520                    task_name,
521                    last_saved_checkpoint,
522                    "Task exiting before reaching target checkpoint",
523                );
524            } else {
525                tracing::info!(task_name, "Backfill task is done, exiting");
526            }
527        } else {
528            tracing::error!(
529                task_name,
530                "Task exiting unexpectedly with no progress saved"
531            );
532        }
533        join_handle.abort();
534        if let Some(m) = &remaining_checkpoints_metric {
535            m.set(0)
536        }
537        join_handle.await?.tap_err(|err| {
538            tracing::error!(task_name, "Data retrieval task failed: {:?}", err);
539        })
540    }
541
542    async fn start_data_retrieval(
543        &self,
544        task: Task,
545        data_sender: DataSender<T>,
546    ) -> Result<JoinHandle<Result<(), Error>>, Error>;
547
548    async fn get_live_task_starting_checkpoint(&self) -> Result<u64, Error>;
549
550    fn get_genesis_height(&self) -> u64;
551
552    fn metric_provider(&self) -> &dyn IndexerMetricProvider;
553}
554
555pub enum BackfillStrategy {
556    Simple,
557    Partitioned { task_size: u64 },
558    Disabled,
559}
560
561pub trait DataMapper<T, R>: Sync + Send + Clone {
562    fn map(&self, data: T) -> Result<Vec<R>, anyhow::Error>;
563}
564
565struct LiveTasksTracker {
566    gauge: IntGauge,
567}
568
569impl LiveTasksTracker {
570    pub fn new(metrics: IntGaugeVec, task_name: &str) -> Self {
571        let gauge = metrics.with_label_values(&[task_name]);
572        gauge.inc();
573        Self { gauge }
574    }
575}
576
577impl Drop for LiveTasksTracker {
578    fn drop(&mut self) {
579        self.gauge.dec();
580    }
581}