sui_indexer_alt_framework/pipeline/sequential/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use serde::Deserialize;
8use serde::Serialize;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::info;
12
13use crate::config::ConcurrencyConfig;
14use crate::metrics::IndexerMetrics;
15use crate::pipeline::CommitterConfig;
16use crate::pipeline::Processor;
17use crate::pipeline::processor::processor;
18use crate::pipeline::sequential::committer::committer;
19use crate::store::Store;
20use crate::store::TransactionalStore;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23mod committer;
24
25/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
26/// implementing [Processor]) into rows for their table, how to combine multiple rows into a single
27/// DB operation, and then how to write those rows atomically to the database.
28///
29/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
30/// associated values).
31///
32/// Sequential handlers can only be used in sequential pipelines, where checkpoint data is
33/// processed out-of-order, but then gathered and written in order. If multiple checkpoints are
34/// available, the pipeline will attempt to combine their writes taking advantage of batching to
35/// avoid emitting redundant writes.
36///
37/// Back-pressure is handled by setting a high watermark on the ingestion service: The pipeline
38/// notifies the ingestion service of the checkpoint it last successfully wrote to the database
39/// for, and in turn the ingestion service will only run ahead by its buffer size. This guarantees
40/// liveness and limits the amount of memory the pipeline can consume, by bounding the number of
41/// checkpoints that can be received before the next checkpoint.
42#[async_trait]
43pub trait Handler: Processor {
44    type Store: TransactionalStore;
45
46    /// If at least this many rows are pending, the committer will commit them eagerly.
47    const MIN_EAGER_ROWS: usize = 50;
48
49    /// Maximum number of checkpoints to try and write in a single batch. The larger this number
50    /// is, the more chances the pipeline has to merge redundant writes, but the longer each write
51    /// transaction is likely to be.
52    const MAX_BATCH_CHECKPOINTS: usize = 5 * 60;
53
54    /// A type to combine multiple `Self::Value`-s into. This can be used to avoid redundant writes
55    /// by combining multiple rows into one (e.g. if one row supersedes another, the latter can be
56    /// omitted).
57    type Batch: Default + Send + Sync + 'static;
58
59    /// Add `values` from processing a checkpoint to the current `batch`. Checkpoints are
60    /// guaranteed to be presented to the batch in checkpoint order. The handler takes ownership
61    /// of the iterator and consumes all values.
62    ///
63    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
64    /// or `BatchStatus::Pending` if the batch can accept more values.
65    ///
66    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
67    /// may also decide to commit a batch based on the trait parameters above.
68    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>);
69
70    /// Take a batch of values and commit them to the database, returning the number of rows
71    /// affected.
72    async fn commit<'a>(
73        &self,
74        batch: &Self::Batch,
75        conn: &mut <Self::Store as Store>::Connection<'a>,
76    ) -> anyhow::Result<usize>;
77}
78
79/// Configuration for a sequential pipeline
80#[derive(Serialize, Deserialize, Debug, Clone, Default)]
81pub struct SequentialConfig {
82    /// Configuration for the writer, that makes forward progress.
83    pub committer: CommitterConfig,
84
85    /// How many checkpoints to hold back writes for.
86    pub checkpoint_lag: u64,
87
88    /// Processor concurrency. Defaults to adaptive scaling up to the number of CPUs.
89    pub fanout: Option<ConcurrencyConfig>,
90
91    /// Override for `Handler::MIN_EAGER_ROWS` (eager batch threshold).
92    pub min_eager_rows: Option<usize>,
93
94    /// Override for `Handler::MAX_BATCH_CHECKPOINTS` (checkpoints per write batch).
95    pub max_batch_checkpoints: Option<usize>,
96
97    /// Size of the channel between the processor and committer.
98    pub processor_channel_size: Option<usize>,
99}
100
101/// Start a new sequential (in-order) indexing pipeline, served by the handler, `H`. Starting
102/// strictly after the `watermark` (or from the beginning if no watermark was provided).
103///
104/// Each pipeline consists of a processor which takes checkpoint data and breaks it down into rows,
105/// ready for insertion, and a committer which orders the rows and combines them into batches to
106/// write to the database.
107///
108/// Commits are performed in checkpoint order, potentially involving multiple checkpoints at a
109/// time. The call to [Handler::commit] and the associated watermark update are performed in a
110/// transaction to ensure atomicity. Unlike in the case of concurrent pipelines, the data passed to
111/// [Handler::commit] is not chunked up, so the handler must perform this step itself, if
112/// necessary.
113///
114/// The pipeline can optionally be configured to lag behind the ingestion service by a fixed number
115/// of checkpoints (configured by `checkpoint_lag`).
116///
117/// Watermarks are also shared with the ingestion service, which is guaranteed to bound the
118/// checkpoint height it pre-fetches to some constant additive factor above the pipeline's
119/// watermark.
120///
121/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, watermark updates
122/// are communicated to the ingestion service through the `watermark_tx` channel and internal
123/// channels are created to communicate between its various components. The pipeline will shutdown
124/// if any of its input or output channels close, any of its independent tasks fail, or if it is
125/// signalled to shutdown through the returned service handle.
126pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
127    handler: H,
128    next_checkpoint: u64,
129    config: SequentialConfig,
130    db: H::Store,
131    checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
132    commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
133    metrics: Arc<IndexerMetrics>,
134) -> Service {
135    info!(
136        pipeline = H::NAME,
137        "Starting pipeline with config: {config:#?}",
138    );
139
140    let concurrency = config
141        .fanout
142        .clone()
143        .unwrap_or(ConcurrencyConfig::Adaptive {
144            initial: 1,
145            min: 1,
146            max: num_cpus::get(),
147            dead_band: None,
148        });
149    let min_eager_rows = config.min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
150    let max_batch_checkpoints = config
151        .max_batch_checkpoints
152        .unwrap_or(H::MAX_BATCH_CHECKPOINTS);
153
154    let processor_channel_size = config.processor_channel_size.unwrap_or(num_cpus::get() / 2);
155    let (processor_tx, committer_rx) = mpsc::channel(processor_channel_size);
156
157    let handler = Arc::new(handler);
158
159    let s_processor = processor(
160        handler.clone(),
161        checkpoint_rx,
162        processor_tx,
163        metrics.clone(),
164        concurrency,
165    );
166
167    let s_committer = committer::<H>(
168        handler,
169        config,
170        next_checkpoint,
171        committer_rx,
172        commit_hi_tx,
173        db,
174        metrics.clone(),
175        min_eager_rows,
176        max_batch_checkpoints,
177    );
178
179    s_processor.merge(s_committer)
180}