sui_indexer_alt_framework/pipeline/sequential/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use serde::{Deserialize, Serialize};
7use tokio::{sync::mpsc, task::JoinHandle};
8use tokio_util::sync::CancellationToken;
9
10use super::{CommitterConfig, PIPELINE_BUFFER, Processor, processor::processor};
11
12use crate::{
13 metrics::IndexerMetrics,
14 store::{Store, TransactionalStore},
15 types::full_checkpoint_content::Checkpoint,
16};
17
18use self::committer::committer;
19use async_trait::async_trait;
20
21mod committer;
22
23/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
24/// implementing [Processor]) into rows for their table, how to combine multiple rows into a single
25/// DB operation, and then how to write those rows atomically to the database.
26///
27/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
28/// associated values).
29///
30/// Sequential handlers can only be used in sequential pipelines, where checkpoint data is
31/// processed out-of-order, but then gathered and written in order. If multiple checkpoints are
32/// available, the pipeline will attempt to combine their writes taking advantage of batching to
33/// avoid emitting redundant writes.
34///
35/// Back-pressure is handled by setting a high watermark on the ingestion service: The pipeline
36/// notifies the ingestion service of the checkpoint it last successfully wrote to the database
37/// for, and in turn the ingestion service will only run ahead by its buffer size. This guarantees
38/// liveness and limits the amount of memory the pipeline can consume, by bounding the number of
39/// checkpoints that can be received before the next checkpoint.
40#[async_trait]
41pub trait Handler: Processor {
42 type Store: TransactionalStore;
43
44 /// If at least this many rows are pending, the committer will commit them eagerly.
45 const MIN_EAGER_ROWS: usize = 50;
46
47 /// Maximum number of checkpoints to try and write in a single batch. The larger this number
48 /// is, the more chances the pipeline has to merge redundant writes, but the longer each write
49 /// transaction is likely to be.
50 const MAX_BATCH_CHECKPOINTS: usize = 5 * 60;
51
52 /// A type to combine multiple `Self::Value`-s into. This can be used to avoid redundant writes
53 /// by combining multiple rows into one (e.g. if one row supersedes another, the latter can be
54 /// omitted).
55 type Batch: Default + Send + Sync + 'static;
56
57 /// Add `values` from processing a checkpoint to the current `batch`. Checkpoints are
58 /// guaranteed to be presented to the batch in checkpoint order. The handler takes ownership
59 /// of the iterator and consumes all values.
60 ///
61 /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
62 /// or `BatchStatus::Pending` if the batch can accept more values.
63 ///
64 /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
65 /// may also decide to commit a batch based on the trait parameters above.
66 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>);
67
68 /// Take a batch of values and commit them to the database, returning the number of rows
69 /// affected.
70 async fn commit<'a>(
71 &self,
72 batch: &Self::Batch,
73 conn: &mut <Self::Store as Store>::Connection<'a>,
74 ) -> anyhow::Result<usize>;
75}
76
77/// Configuration for a sequential pipeline
78#[derive(Serialize, Deserialize, Clone, Default)]
79pub struct SequentialConfig {
80 /// Configuration for the writer, that makes forward progress.
81 pub committer: CommitterConfig,
82
83 /// How many checkpoints to hold back writes for.
84 pub checkpoint_lag: u64,
85}
86
87/// Start a new sequential (in-order) indexing pipeline, served by the handler, `H`. Starting
88/// strictly after the `watermark` (or from the beginning if no watermark was provided).
89///
90/// Each pipeline consists of a processor which takes checkpoint data and breaks it down into rows,
91/// ready for insertion, and a committer which orders the rows and combines them into batches to
92/// write to the database.
93///
94/// Commits are performed in checkpoint order, potentially involving multiple checkpoints at a
95/// time. The call to [Handler::commit] and the associated watermark update are performed in a
96/// transaction to ensure atomicity. Unlike in the case of concurrent pipelines, the data passed to
97/// [Handler::commit] is not chunked up, so the handler must perform this step itself, if
98/// necessary.
99///
100/// The pipeline can optionally be configured to lag behind the ingestion service by a fixed number
101/// of checkpoints (configured by `checkpoint_lag`).
102///
103/// Watermarks are also shared with the ingestion service, which is guaranteed to bound the
104/// checkpoint height it pre-fetches to some constant additive factor above the pipeline's
105/// watermark.
106///
107/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, watermark updates
108/// are communicated to the ingestion service through the `watermark_tx` channel and internal
109/// channels are created to communicate between its various components. The pipeline can be
110/// shutdown using its `cancel` token, and will also shutdown if any of its input or output
111/// channels close, or any of its independent tasks fail.
112pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
113 handler: H,
114 next_checkpoint: u64,
115 config: SequentialConfig,
116 db: H::Store,
117 checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
118 watermark_tx: mpsc::UnboundedSender<(&'static str, u64)>,
119 metrics: Arc<IndexerMetrics>,
120 cancel: CancellationToken,
121) -> JoinHandle<()> {
122 let (processor_tx, committer_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
123
124 let handler = Arc::new(handler);
125
126 let processor = processor(
127 handler.clone(),
128 checkpoint_rx,
129 processor_tx,
130 metrics.clone(),
131 cancel.clone(),
132 );
133
134 let committer = committer::<H>(
135 handler,
136 config,
137 next_checkpoint,
138 committer_rx,
139 watermark_tx,
140 db,
141 metrics.clone(),
142 cancel.clone(),
143 );
144
145 tokio::spawn(async move {
146 let (_, _) = futures::join!(processor, committer);
147 })
148}