sui_indexer_alt_framework/pipeline/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6pub use crate::config::ConcurrencyConfig;
7use crate::store::CommitterWatermark;
8pub use processor::Processor;
9use rand::Rng;
10use serde::Deserialize;
11use serde::Serialize;
12
13pub mod concurrent;
14mod logging;
15mod processor;
16pub mod sequential;
17
18/// Issue a warning every time the number of pending watermarks exceeds this number. This can
19/// happen if the pipeline was started with its initial checkpoint overridden to be strictly
20/// greater than its current watermark -- in that case, the pipeline will never be able to update
21/// its watermarks.
22const WARN_PENDING_WATERMARKS: usize = 10000;
23
24#[derive(Serialize, Deserialize, Debug, Clone)]
25pub struct CommitterConfig {
26    /// Number of concurrent writers per pipeline.
27    pub write_concurrency: usize,
28
29    /// The collector will check for pending data at least this often, in milliseconds.
30    pub collect_interval_ms: u64,
31
32    /// Watermark task will check for pending watermarks this often, in milliseconds.
33    pub watermark_interval_ms: u64,
34
35    /// Maximum random jitter to add to the watermark interval, in milliseconds.
36    pub watermark_interval_jitter_ms: u64,
37}
38
39/// Per-pipeline ingestion settings.
40#[derive(Serialize, Deserialize, Debug, Clone, Default)]
41pub struct IngestionConfig {
42    /// Capacity of this pipeline's bounded subscriber channel. If `None`, the built-in default
43    /// is used (see [`IngestionConfig::subscriber_channel_size`]).
44    pub subscriber_channel_size: Option<usize>,
45}
46
47impl IngestionConfig {
48    /// Resolves `subscriber_channel_size` to its final value, substituting the built-in default
49    /// if unset.
50    ///
51    /// The default is small on purpose: the adaptive controller does the real backpressure work,
52    /// and larger values just pin more decoded checkpoints in memory without throughput benefit.
53    /// Scales with CPU count for fetch parallelism headroom, with a floor of 4 so the
54    /// controller's dead band (0.6..0.85) has integer room to maneuver on small machines.
55    pub fn subscriber_channel_size(&self) -> usize {
56        self.subscriber_channel_size
57            .unwrap_or_else(|| (num_cpus::get() / 2).max(4))
58    }
59}
60
61/// Processed values associated with a single checkpoint. This is an internal type used to
62/// communicate between the processor and the collector parts of the pipeline.
63struct IndexedCheckpoint<P: Processor> {
64    /// Values to be inserted into the database from this checkpoint
65    values: Vec<P::Value>,
66    /// The watermark associated with this checkpoint
67    watermark: CommitterWatermark,
68}
69
70/// A representation of the proportion of a watermark.
71#[derive(Debug, Clone)]
72struct WatermarkPart {
73    /// The watermark itself
74    watermark: CommitterWatermark,
75    /// The number of rows from this watermark that are in this part
76    batch_rows: usize,
77    /// The total number of rows from this watermark
78    total_rows: usize,
79}
80
81impl CommitterConfig {
82    pub fn collect_interval(&self) -> Duration {
83        Duration::from_millis(self.collect_interval_ms)
84    }
85
86    pub fn watermark_interval(&self) -> Duration {
87        Duration::from_millis(self.watermark_interval_ms)
88    }
89
90    /// Returns the next watermark update instant with a random jitter added. The jitter is a
91    /// random value between 0 and `watermark_interval_jitter_ms`.
92    pub fn watermark_interval_with_jitter(&self) -> tokio::time::Instant {
93        let jitter = if self.watermark_interval_jitter_ms == 0 {
94            0
95        } else {
96            rand::thread_rng().gen_range(0..=self.watermark_interval_jitter_ms)
97        };
98        tokio::time::Instant::now() + Duration::from_millis(self.watermark_interval_ms + jitter)
99    }
100}
101
102impl<P: Processor> IndexedCheckpoint<P> {
103    fn new(
104        epoch: u64,
105        cp_sequence_number: u64,
106        tx_hi: u64,
107        timestamp_ms: u64,
108        values: Vec<P::Value>,
109    ) -> Self {
110        Self {
111            watermark: CommitterWatermark {
112                epoch_hi_inclusive: epoch,
113                checkpoint_hi_inclusive: cp_sequence_number,
114                tx_hi,
115                timestamp_ms_hi_inclusive: timestamp_ms,
116            },
117            values,
118        }
119    }
120
121    /// Number of rows from this checkpoint
122    fn len(&self) -> usize {
123        self.values.len()
124    }
125
126    /// The checkpoint sequence number that this data is from
127    fn checkpoint(&self) -> u64 {
128        self.watermark.checkpoint_hi_inclusive
129    }
130}
131
132impl WatermarkPart {
133    fn checkpoint(&self) -> u64 {
134        self.watermark.checkpoint_hi_inclusive
135    }
136
137    fn timestamp_ms(&self) -> u64 {
138        self.watermark.timestamp_ms_hi_inclusive
139    }
140
141    /// Check if all the rows from this watermark are represented in this part.
142    fn is_complete(&self) -> bool {
143        self.batch_rows == self.total_rows
144    }
145
146    /// Add the rows from `other` to this part.
147    fn add(&mut self, other: WatermarkPart) {
148        assert_eq!(self.checkpoint(), other.checkpoint());
149        self.batch_rows += other.batch_rows;
150        assert!(
151            self.batch_rows <= self.total_rows,
152            "batch_rows ({}) exceeded total_rows ({})",
153            self.batch_rows,
154            self.total_rows,
155        );
156    }
157
158    /// Record that `rows` have been taken from this part.
159    fn take(&mut self, rows: usize) -> WatermarkPart {
160        assert!(
161            self.batch_rows >= rows,
162            "Can't take more rows than are available"
163        );
164
165        self.batch_rows -= rows;
166        WatermarkPart {
167            watermark: self.watermark,
168            batch_rows: rows,
169            total_rows: self.total_rows,
170        }
171    }
172}
173
174impl Default for CommitterConfig {
175    fn default() -> Self {
176        Self {
177            write_concurrency: 5,
178            collect_interval_ms: 500,
179            watermark_interval_ms: 500,
180            watermark_interval_jitter_ms: 0,
181        }
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use async_trait::async_trait;
189    use std::sync::Arc;
190    use sui_types::full_checkpoint_content::Checkpoint;
191
192    // Test implementation of Processor
193    struct TestProcessor;
194    #[async_trait]
195    impl Processor for TestProcessor {
196        const NAME: &'static str = "test";
197        type Value = i32;
198
199        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
200            Ok(vec![1, 2, 3])
201        }
202    }
203
204    #[test]
205    fn test_watermark_part_getters() {
206        let watermark = CommitterWatermark {
207            epoch_hi_inclusive: 1,
208            checkpoint_hi_inclusive: 100,
209            tx_hi: 1000,
210            timestamp_ms_hi_inclusive: 1234567890,
211        };
212
213        let part = WatermarkPart {
214            watermark,
215            batch_rows: 50,
216            total_rows: 200,
217        };
218
219        assert_eq!(part.checkpoint(), 100);
220        assert_eq!(part.timestamp_ms(), 1234567890);
221    }
222
223    #[test]
224    fn test_watermark_part_is_complete() {
225        let part = WatermarkPart {
226            watermark: CommitterWatermark::default(),
227            batch_rows: 200,
228            total_rows: 200,
229        };
230
231        assert!(part.is_complete());
232    }
233
234    #[test]
235    fn test_watermark_part_is_not_complete() {
236        let part = WatermarkPart {
237            watermark: CommitterWatermark::default(),
238            batch_rows: 199,
239            total_rows: 200,
240        };
241
242        assert!(!part.is_complete());
243    }
244
245    #[test]
246    fn test_watermark_part_becomes_complete_after_adding_new_batch() {
247        let mut part = WatermarkPart {
248            watermark: CommitterWatermark::default(),
249            batch_rows: 199,
250            total_rows: 200,
251        };
252
253        // Add a batch that makes it complete
254        part.add(WatermarkPart {
255            watermark: CommitterWatermark::default(),
256            batch_rows: 1,
257            total_rows: 200,
258        });
259
260        assert!(part.is_complete());
261        assert_eq!(part.batch_rows, 200);
262    }
263
264    #[test]
265    fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
266        let mut part = WatermarkPart {
267            watermark: CommitterWatermark::default(),
268            batch_rows: 200,
269            total_rows: 200,
270        };
271        assert!(part.is_complete(), "Initial part should be complete");
272
273        // Take away a portion of the batch
274        let extracted_part = part.take(10);
275
276        // Verify state of extracted part
277        assert!(!extracted_part.is_complete());
278        assert_eq!(extracted_part.batch_rows, 10);
279        assert_eq!(extracted_part.total_rows, 200);
280    }
281
282    #[test]
283    fn test_indexed_checkpoint() {
284        let epoch = 1;
285        let cp_sequence_number = 100;
286        let tx_hi = 1000;
287        let timestamp_ms = 1234567890;
288        let values = vec![1, 2, 3];
289
290        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
291            epoch,
292            cp_sequence_number,
293            tx_hi,
294            timestamp_ms,
295            values,
296        );
297
298        assert_eq!(checkpoint.len(), 3);
299        assert_eq!(checkpoint.checkpoint(), 100);
300    }
301
302    #[test]
303    fn test_indexed_checkpoint_with_empty_values() {
304        let epoch = 1;
305        let cp_sequence_number = 100;
306        let tx_hi = 1000;
307        let timestamp_ms = 1234567890;
308        let values: Vec<<TestProcessor as Processor>::Value> = vec![];
309
310        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
311            epoch,
312            cp_sequence_number,
313            tx_hi,
314            timestamp_ms,
315            values,
316        );
317
318        assert_eq!(checkpoint.len(), 0);
319        assert_eq!(checkpoint.checkpoint(), 100);
320    }
321}