sui_indexer_alt_framework/pipeline/sequential/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use serde::{Deserialize, Serialize};
7use tokio::{sync::mpsc, task::JoinHandle};
8use tokio_util::sync::CancellationToken;
9
10use super::{CommitterConfig, PIPELINE_BUFFER, Processor, processor::processor};
11
12use crate::{
13    metrics::IndexerMetrics,
14    store::{Store, TransactionalStore},
15    types::full_checkpoint_content::Checkpoint,
16};
17
18use self::committer::committer;
19use async_trait::async_trait;
20
21mod committer;
22
23/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
24/// implementing [Processor]) into rows for their table, how to combine multiple rows into a single
25/// DB operation, and then how to write those rows atomically to the database.
26///
27/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
28/// associated values).
29///
30/// Sequential handlers can only be used in sequential pipelines, where checkpoint data is
31/// processed out-of-order, but then gathered and written in order. If multiple checkpoints are
32/// available, the pipeline will attempt to combine their writes taking advantage of batching to
33/// avoid emitting redundant writes.
34///
35/// Back-pressure is handled by setting a high watermark on the ingestion service: The pipeline
36/// notifies the ingestion service of the checkpoint it last successfully wrote to the database
37/// for, and in turn the ingestion service will only run ahead by its buffer size. This guarantees
38/// liveness and limits the amount of memory the pipeline can consume, by bounding the number of
39/// checkpoints that can be received before the next checkpoint.
40#[async_trait]
41pub trait Handler: Processor {
42    type Store: TransactionalStore;
43
44    /// If at least this many rows are pending, the committer will commit them eagerly.
45    const MIN_EAGER_ROWS: usize = 50;
46
47    /// Maximum number of checkpoints to try and write in a single batch. The larger this number
48    /// is, the more chances the pipeline has to merge redundant writes, but the longer each write
49    /// transaction is likely to be.
50    const MAX_BATCH_CHECKPOINTS: usize = 5 * 60;
51
52    /// A type to combine multiple `Self::Value`-s into. This can be used to avoid redundant writes
53    /// by combining multiple rows into one (e.g. if one row supersedes another, the latter can be
54    /// omitted).
55    type Batch: Default + Send + Sync + 'static;
56
57    /// Add `values` from processing a checkpoint to the current `batch`. Checkpoints are
58    /// guaranteed to be presented to the batch in checkpoint order. The handler takes ownership
59    /// of the iterator and consumes all values.
60    ///
61    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
62    /// or `BatchStatus::Pending` if the batch can accept more values.
63    ///
64    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
65    /// may also decide to commit a batch based on the trait parameters above.
66    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>);
67
68    /// Take a batch of values and commit them to the database, returning the number of rows
69    /// affected.
70    async fn commit<'a>(
71        &self,
72        batch: &Self::Batch,
73        conn: &mut <Self::Store as Store>::Connection<'a>,
74    ) -> anyhow::Result<usize>;
75}
76
77/// Configuration for a sequential pipeline
78#[derive(Serialize, Deserialize, Clone, Default)]
79pub struct SequentialConfig {
80    /// Configuration for the writer, that makes forward progress.
81    pub committer: CommitterConfig,
82
83    /// How many checkpoints to hold back writes for.
84    pub checkpoint_lag: u64,
85}
86
87/// Start a new sequential (in-order) indexing pipeline, served by the handler, `H`. Starting
88/// strictly after the `watermark` (or from the beginning if no watermark was provided).
89///
90/// Each pipeline consists of a processor which takes checkpoint data and breaks it down into rows,
91/// ready for insertion, and a committer which orders the rows and combines them into batches to
92/// write to the database.
93///
94/// Commits are performed in checkpoint order, potentially involving multiple checkpoints at a
95/// time. The call to [Handler::commit] and the associated watermark update are performed in a
96/// transaction to ensure atomicity. Unlike in the case of concurrent pipelines, the data passed to
97/// [Handler::commit] is not chunked up, so the handler must perform this step itself, if
98/// necessary.
99///
100/// The pipeline can optionally be configured to lag behind the ingestion service by a fixed number
101/// of checkpoints (configured by `checkpoint_lag`).
102///
103/// Watermarks are also shared with the ingestion service, which is guaranteed to bound the
104/// checkpoint height it pre-fetches to some constant additive factor above the pipeline's
105/// watermark.
106///
107/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, watermark updates
108/// are communicated to the ingestion service through the `watermark_tx` channel and internal
109/// channels are created to communicate between its various components. The pipeline can be
110/// shutdown using its `cancel` token, and will also shutdown if any of its input or output
111/// channels close, or any of its independent tasks fail.
112pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
113    handler: H,
114    next_checkpoint: u64,
115    config: SequentialConfig,
116    db: H::Store,
117    checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
118    watermark_tx: mpsc::UnboundedSender<(&'static str, u64)>,
119    metrics: Arc<IndexerMetrics>,
120    cancel: CancellationToken,
121) -> JoinHandle<()> {
122    let (processor_tx, committer_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
123
124    let handler = Arc::new(handler);
125
126    let processor = processor(
127        handler.clone(),
128        checkpoint_rx,
129        processor_tx,
130        metrics.clone(),
131        cancel.clone(),
132    );
133
134    let committer = committer::<H>(
135        handler,
136        config,
137        next_checkpoint,
138        committer_rx,
139        watermark_tx,
140        db,
141        metrics.clone(),
142        cancel.clone(),
143    );
144
145    tokio::spawn(async move {
146        let (_, _) = futures::join!(processor, committer);
147    })
148}