sui_indexer_alt_framework/pipeline/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6pub use processor::Processor;
7use serde::Deserialize;
8use serde::Serialize;
9
10use crate::store::CommitterWatermark;
11
12pub mod concurrent;
13mod logging;
14mod processor;
15pub mod sequential;
16
17/// Extra buffer added to channels between tasks in a pipeline. There does not need to be a huge
18/// capacity here because tasks already buffer rows to insert internally.
19const PIPELINE_BUFFER: usize = 5;
20
21/// Issue a warning every time the number of pending watermarks exceeds this number. This can
22/// happen if the pipeline was started with its initial checkpoint overridden to be strictly
23/// greater than its current watermark -- in that case, the pipeline will never be able to update
24/// its watermarks.
25const WARN_PENDING_WATERMARKS: usize = 10000;
26
27#[derive(Serialize, Deserialize, Debug, Clone)]
28pub struct CommitterConfig {
29    /// Number of concurrent writers per pipeline.
30    pub write_concurrency: usize,
31
32    /// The collector will check for pending data at least this often, in milliseconds.
33    pub collect_interval_ms: u64,
34
35    /// Watermark task will check for pending watermarks this often, in milliseconds.
36    pub watermark_interval_ms: u64,
37}
38
39/// Processed values associated with a single checkpoint. This is an internal type used to
40/// communicate between the processor and the collector parts of the pipeline.
41struct IndexedCheckpoint<P: Processor> {
42    /// Values to be inserted into the database from this checkpoint
43    values: Vec<P::Value>,
44    /// The watermark associated with this checkpoint
45    watermark: CommitterWatermark,
46}
47
48/// A representation of the proportion of a watermark.
49#[derive(Debug, Clone)]
50struct WatermarkPart {
51    /// The watermark itself
52    watermark: CommitterWatermark,
53    /// The number of rows from this watermark that are in this part
54    batch_rows: usize,
55    /// The total number of rows from this watermark
56    total_rows: usize,
57}
58
59impl CommitterConfig {
60    pub fn collect_interval(&self) -> Duration {
61        Duration::from_millis(self.collect_interval_ms)
62    }
63
64    pub fn watermark_interval(&self) -> Duration {
65        Duration::from_millis(self.watermark_interval_ms)
66    }
67}
68
69impl<P: Processor> IndexedCheckpoint<P> {
70    fn new(
71        epoch: u64,
72        cp_sequence_number: u64,
73        tx_hi: u64,
74        timestamp_ms: u64,
75        values: Vec<P::Value>,
76    ) -> Self {
77        Self {
78            watermark: CommitterWatermark {
79                epoch_hi_inclusive: epoch,
80                checkpoint_hi_inclusive: cp_sequence_number,
81                tx_hi,
82                timestamp_ms_hi_inclusive: timestamp_ms,
83            },
84            values,
85        }
86    }
87
88    /// Number of rows from this checkpoint
89    fn len(&self) -> usize {
90        self.values.len()
91    }
92
93    /// The checkpoint sequence number that this data is from
94    fn checkpoint(&self) -> u64 {
95        self.watermark.checkpoint_hi_inclusive
96    }
97}
98
99impl WatermarkPart {
100    fn checkpoint(&self) -> u64 {
101        self.watermark.checkpoint_hi_inclusive
102    }
103
104    fn timestamp_ms(&self) -> u64 {
105        self.watermark.timestamp_ms_hi_inclusive
106    }
107
108    /// Check if all the rows from this watermark are represented in this part.
109    fn is_complete(&self) -> bool {
110        self.batch_rows == self.total_rows
111    }
112
113    /// Add the rows from `other` to this part.
114    fn add(&mut self, other: WatermarkPart) {
115        debug_assert_eq!(self.checkpoint(), other.checkpoint());
116        self.batch_rows += other.batch_rows;
117    }
118
119    /// Record that `rows` have been taken from this part.
120    fn take(&mut self, rows: usize) -> WatermarkPart {
121        debug_assert!(
122            self.batch_rows >= rows,
123            "Can't take more rows than are available"
124        );
125
126        self.batch_rows -= rows;
127        WatermarkPart {
128            watermark: self.watermark,
129            batch_rows: rows,
130            total_rows: self.total_rows,
131        }
132    }
133}
134
135impl Default for CommitterConfig {
136    fn default() -> Self {
137        Self {
138            write_concurrency: 5,
139            collect_interval_ms: 500,
140            watermark_interval_ms: 500,
141        }
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use async_trait::async_trait;
149    use std::sync::Arc;
150    use sui_types::full_checkpoint_content::Checkpoint;
151
152    // Test implementation of Processor
153    struct TestProcessor;
154    #[async_trait]
155    impl Processor for TestProcessor {
156        const NAME: &'static str = "test";
157        type Value = i32;
158
159        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
160            Ok(vec![1, 2, 3])
161        }
162    }
163
164    #[test]
165    fn test_watermark_part_getters() {
166        let watermark = CommitterWatermark {
167            epoch_hi_inclusive: 1,
168            checkpoint_hi_inclusive: 100,
169            tx_hi: 1000,
170            timestamp_ms_hi_inclusive: 1234567890,
171        };
172
173        let part = WatermarkPart {
174            watermark,
175            batch_rows: 50,
176            total_rows: 200,
177        };
178
179        assert_eq!(part.checkpoint(), 100);
180        assert_eq!(part.timestamp_ms(), 1234567890);
181    }
182
183    #[test]
184    fn test_watermark_part_is_complete() {
185        let part = WatermarkPart {
186            watermark: CommitterWatermark::default(),
187            batch_rows: 200,
188            total_rows: 200,
189        };
190
191        assert!(part.is_complete());
192    }
193
194    #[test]
195    fn test_watermark_part_is_not_complete() {
196        let part = WatermarkPart {
197            watermark: CommitterWatermark::default(),
198            batch_rows: 199,
199            total_rows: 200,
200        };
201
202        assert!(!part.is_complete());
203    }
204
205    #[test]
206    fn test_watermark_part_becomes_complete_after_adding_new_batch() {
207        let mut part = WatermarkPart {
208            watermark: CommitterWatermark::default(),
209            batch_rows: 199,
210            total_rows: 200,
211        };
212
213        // Add a batch that makes it complete
214        part.add(WatermarkPart {
215            watermark: CommitterWatermark::default(),
216            batch_rows: 1,
217            total_rows: 200,
218        });
219
220        assert!(part.is_complete());
221        assert_eq!(part.batch_rows, 200);
222    }
223
224    #[test]
225    fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
226        let mut part = WatermarkPart {
227            watermark: CommitterWatermark::default(),
228            batch_rows: 200,
229            total_rows: 200,
230        };
231        assert!(part.is_complete(), "Initial part should be complete");
232
233        // Take away a portion of the batch
234        let extracted_part = part.take(10);
235
236        // Verify state of extracted part
237        assert!(!extracted_part.is_complete());
238        assert_eq!(extracted_part.batch_rows, 10);
239        assert_eq!(extracted_part.total_rows, 200);
240    }
241
242    #[test]
243    fn test_indexed_checkpoint() {
244        let epoch = 1;
245        let cp_sequence_number = 100;
246        let tx_hi = 1000;
247        let timestamp_ms = 1234567890;
248        let values = vec![1, 2, 3];
249
250        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
251            epoch,
252            cp_sequence_number,
253            tx_hi,
254            timestamp_ms,
255            values,
256        );
257
258        assert_eq!(checkpoint.len(), 3);
259        assert_eq!(checkpoint.checkpoint(), 100);
260    }
261
262    #[test]
263    fn test_indexed_checkpoint_with_empty_values() {
264        let epoch = 1;
265        let cp_sequence_number = 100;
266        let tx_hi = 1000;
267        let timestamp_ms = 1234567890;
268        let values: Vec<<TestProcessor as Processor>::Value> = vec![];
269
270        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
271            epoch,
272            cp_sequence_number,
273            tx_hi,
274            timestamp_ms,
275            values,
276        );
277
278        assert_eq!(checkpoint.len(), 0);
279        assert_eq!(checkpoint.checkpoint(), 100);
280    }
281}