sui_indexer_alt_framework/pipeline/sequential/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use serde::{Deserialize, Serialize};
7use sui_futures::service::Service;
8use tokio::sync::mpsc;
9use tracing::info;
10
11use super::{CommitterConfig, PIPELINE_BUFFER, Processor, processor::processor};
12
13use crate::{
14 metrics::IndexerMetrics,
15 store::{Store, TransactionalStore},
16 types::full_checkpoint_content::Checkpoint,
17};
18
19use self::committer::committer;
20use async_trait::async_trait;
21
22mod committer;
23
24/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
25/// implementing [Processor]) into rows for their table, how to combine multiple rows into a single
26/// DB operation, and then how to write those rows atomically to the database.
27///
28/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
29/// associated values).
30///
31/// Sequential handlers can only be used in sequential pipelines, where checkpoint data is
32/// processed out-of-order, but then gathered and written in order. If multiple checkpoints are
33/// available, the pipeline will attempt to combine their writes taking advantage of batching to
34/// avoid emitting redundant writes.
35///
36/// Back-pressure is handled by setting a high watermark on the ingestion service: The pipeline
37/// notifies the ingestion service of the checkpoint it last successfully wrote to the database
38/// for, and in turn the ingestion service will only run ahead by its buffer size. This guarantees
39/// liveness and limits the amount of memory the pipeline can consume, by bounding the number of
40/// checkpoints that can be received before the next checkpoint.
41#[async_trait]
42pub trait Handler: Processor {
43 type Store: TransactionalStore;
44
45 /// If at least this many rows are pending, the committer will commit them eagerly.
46 const MIN_EAGER_ROWS: usize = 50;
47
48 /// Maximum number of checkpoints to try and write in a single batch. The larger this number
49 /// is, the more chances the pipeline has to merge redundant writes, but the longer each write
50 /// transaction is likely to be.
51 const MAX_BATCH_CHECKPOINTS: usize = 5 * 60;
52
53 /// A type to combine multiple `Self::Value`-s into. This can be used to avoid redundant writes
54 /// by combining multiple rows into one (e.g. if one row supersedes another, the latter can be
55 /// omitted).
56 type Batch: Default + Send + Sync + 'static;
57
58 /// Add `values` from processing a checkpoint to the current `batch`. Checkpoints are
59 /// guaranteed to be presented to the batch in checkpoint order. The handler takes ownership
60 /// of the iterator and consumes all values.
61 ///
62 /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
63 /// or `BatchStatus::Pending` if the batch can accept more values.
64 ///
65 /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
66 /// may also decide to commit a batch based on the trait parameters above.
67 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>);
68
69 /// Take a batch of values and commit them to the database, returning the number of rows
70 /// affected.
71 async fn commit<'a>(
72 &self,
73 batch: &Self::Batch,
74 conn: &mut <Self::Store as Store>::Connection<'a>,
75 ) -> anyhow::Result<usize>;
76}
77
78/// Configuration for a sequential pipeline
79#[derive(Serialize, Deserialize, Debug, Clone, Default)]
80pub struct SequentialConfig {
81 /// Configuration for the writer, that makes forward progress.
82 pub committer: CommitterConfig,
83
84 /// How many checkpoints to hold back writes for.
85 pub checkpoint_lag: u64,
86}
87
88/// Start a new sequential (in-order) indexing pipeline, served by the handler, `H`. Starting
89/// strictly after the `watermark` (or from the beginning if no watermark was provided).
90///
91/// Each pipeline consists of a processor which takes checkpoint data and breaks it down into rows,
92/// ready for insertion, and a committer which orders the rows and combines them into batches to
93/// write to the database.
94///
95/// Commits are performed in checkpoint order, potentially involving multiple checkpoints at a
96/// time. The call to [Handler::commit] and the associated watermark update are performed in a
97/// transaction to ensure atomicity. Unlike in the case of concurrent pipelines, the data passed to
98/// [Handler::commit] is not chunked up, so the handler must perform this step itself, if
99/// necessary.
100///
101/// The pipeline can optionally be configured to lag behind the ingestion service by a fixed number
102/// of checkpoints (configured by `checkpoint_lag`).
103///
104/// Watermarks are also shared with the ingestion service, which is guaranteed to bound the
105/// checkpoint height it pre-fetches to some constant additive factor above the pipeline's
106/// watermark.
107///
108/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, watermark updates
109/// are communicated to the ingestion service through the `watermark_tx` channel and internal
110/// channels are created to communicate between its various components. The pipeline will shutdown
111/// if any of its input or output channels close, any of its independent tasks fail, or if it is
112/// signalled to shutdown through the returned service handle.
113pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
114 handler: H,
115 next_checkpoint: u64,
116 config: SequentialConfig,
117 db: H::Store,
118 checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
119 watermark_tx: mpsc::UnboundedSender<(&'static str, u64)>,
120 metrics: Arc<IndexerMetrics>,
121) -> Service {
122 info!(
123 pipeline = H::NAME,
124 "Starting pipeline with config: {config:#?}",
125 );
126
127 let (processor_tx, committer_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
128
129 let handler = Arc::new(handler);
130
131 let s_processor = processor(
132 handler.clone(),
133 checkpoint_rx,
134 processor_tx,
135 metrics.clone(),
136 );
137
138 let s_committer = committer::<H>(
139 handler,
140 config,
141 next_checkpoint,
142 committer_rx,
143 watermark_tx,
144 db,
145 metrics.clone(),
146 );
147
148 s_processor.merge(s_committer)
149}