sui_indexer_alt_framework/pipeline/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6pub use processor::Processor;
7use serde::{Deserialize, Serialize};
8
9use crate::store::CommitterWatermark;
10
11pub mod concurrent;
12mod logging;
13mod processor;
14pub mod sequential;
15
16/// Extra buffer added to channels between tasks in a pipeline. There does not need to be a huge
17/// capacity here because tasks already buffer rows to insert internally.
18const PIPELINE_BUFFER: usize = 5;
19
20/// Issue a warning every time the number of pending watermarks exceeds this number. This can
21/// happen if the pipeline was started with its initial checkpoint overridden to be strictly
22/// greater than its current watermark -- in that case, the pipeline will never be able to update
23/// its watermarks.
24const WARN_PENDING_WATERMARKS: usize = 10000;
25
26#[derive(Serialize, Deserialize, Debug, Clone)]
27pub struct CommitterConfig {
28    /// Number of concurrent writers per pipeline.
29    pub write_concurrency: usize,
30
31    /// The collector will check for pending data at least this often, in milliseconds.
32    pub collect_interval_ms: u64,
33
34    /// Watermark task will check for pending watermarks this often, in milliseconds.
35    pub watermark_interval_ms: u64,
36}
37
38/// Processed values associated with a single checkpoint. This is an internal type used to
39/// communicate between the processor and the collector parts of the pipeline.
40struct IndexedCheckpoint<P: Processor> {
41    /// Values to be inserted into the database from this checkpoint
42    values: Vec<P::Value>,
43    /// The watermark associated with this checkpoint
44    watermark: CommitterWatermark,
45}
46
47/// A representation of the proportion of a watermark.
48#[derive(Debug, Clone)]
49struct WatermarkPart {
50    /// The watermark itself
51    watermark: CommitterWatermark,
52    /// The number of rows from this watermark that are in this part
53    batch_rows: usize,
54    /// The total number of rows from this watermark
55    total_rows: usize,
56}
57
58/// Internal type used by workers to propagate errors or shutdown signals up to their
59/// supervisor.
60#[derive(thiserror::Error, Debug)]
61enum Break {
62    #[error("Shutdown received")]
63    Cancel,
64
65    #[error(transparent)]
66    Err(#[from] anyhow::Error),
67}
68
69impl CommitterConfig {
70    pub fn collect_interval(&self) -> Duration {
71        Duration::from_millis(self.collect_interval_ms)
72    }
73
74    pub fn watermark_interval(&self) -> Duration {
75        Duration::from_millis(self.watermark_interval_ms)
76    }
77}
78
79impl<P: Processor> IndexedCheckpoint<P> {
80    fn new(
81        epoch: u64,
82        cp_sequence_number: u64,
83        tx_hi: u64,
84        timestamp_ms: u64,
85        values: Vec<P::Value>,
86    ) -> Self {
87        Self {
88            watermark: CommitterWatermark {
89                epoch_hi_inclusive: epoch,
90                checkpoint_hi_inclusive: cp_sequence_number,
91                tx_hi,
92                timestamp_ms_hi_inclusive: timestamp_ms,
93            },
94            values,
95        }
96    }
97
98    /// Number of rows from this checkpoint
99    fn len(&self) -> usize {
100        self.values.len()
101    }
102
103    /// The checkpoint sequence number that this data is from
104    fn checkpoint(&self) -> u64 {
105        self.watermark.checkpoint_hi_inclusive
106    }
107}
108
109impl WatermarkPart {
110    fn checkpoint(&self) -> u64 {
111        self.watermark.checkpoint_hi_inclusive
112    }
113
114    fn timestamp_ms(&self) -> u64 {
115        self.watermark.timestamp_ms_hi_inclusive
116    }
117
118    /// Check if all the rows from this watermark are represented in this part.
119    fn is_complete(&self) -> bool {
120        self.batch_rows == self.total_rows
121    }
122
123    /// Add the rows from `other` to this part.
124    fn add(&mut self, other: WatermarkPart) {
125        debug_assert_eq!(self.checkpoint(), other.checkpoint());
126        self.batch_rows += other.batch_rows;
127    }
128
129    /// Record that `rows` have been taken from this part.
130    fn take(&mut self, rows: usize) -> WatermarkPart {
131        debug_assert!(
132            self.batch_rows >= rows,
133            "Can't take more rows than are available"
134        );
135
136        self.batch_rows -= rows;
137        WatermarkPart {
138            watermark: self.watermark,
139            batch_rows: rows,
140            total_rows: self.total_rows,
141        }
142    }
143}
144
145impl Default for CommitterConfig {
146    fn default() -> Self {
147        Self {
148            write_concurrency: 5,
149            collect_interval_ms: 500,
150            watermark_interval_ms: 500,
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use async_trait::async_trait;
159    use std::sync::Arc;
160    use sui_types::full_checkpoint_content::Checkpoint;
161
162    // Test implementation of Processor
163    struct TestProcessor;
164    #[async_trait]
165    impl Processor for TestProcessor {
166        const NAME: &'static str = "test";
167        type Value = i32;
168
169        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
170            Ok(vec![1, 2, 3])
171        }
172    }
173
174    #[test]
175    fn test_watermark_part_getters() {
176        let watermark = CommitterWatermark {
177            epoch_hi_inclusive: 1,
178            checkpoint_hi_inclusive: 100,
179            tx_hi: 1000,
180            timestamp_ms_hi_inclusive: 1234567890,
181        };
182
183        let part = WatermarkPart {
184            watermark,
185            batch_rows: 50,
186            total_rows: 200,
187        };
188
189        assert_eq!(part.checkpoint(), 100);
190        assert_eq!(part.timestamp_ms(), 1234567890);
191    }
192
193    #[test]
194    fn test_watermark_part_is_complete() {
195        let part = WatermarkPart {
196            watermark: CommitterWatermark::default(),
197            batch_rows: 200,
198            total_rows: 200,
199        };
200
201        assert!(part.is_complete());
202    }
203
204    #[test]
205    fn test_watermark_part_is_not_complete() {
206        let part = WatermarkPart {
207            watermark: CommitterWatermark::default(),
208            batch_rows: 199,
209            total_rows: 200,
210        };
211
212        assert!(!part.is_complete());
213    }
214
215    #[test]
216    fn test_watermark_part_becomes_complete_after_adding_new_batch() {
217        let mut part = WatermarkPart {
218            watermark: CommitterWatermark::default(),
219            batch_rows: 199,
220            total_rows: 200,
221        };
222
223        // Add a batch that makes it complete
224        part.add(WatermarkPart {
225            watermark: CommitterWatermark::default(),
226            batch_rows: 1,
227            total_rows: 200,
228        });
229
230        assert!(part.is_complete());
231        assert_eq!(part.batch_rows, 200);
232    }
233
234    #[test]
235    fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
236        let mut part = WatermarkPart {
237            watermark: CommitterWatermark::default(),
238            batch_rows: 200,
239            total_rows: 200,
240        };
241        assert!(part.is_complete(), "Initial part should be complete");
242
243        // Take away a portion of the batch
244        let extracted_part = part.take(10);
245
246        // Verify state of extracted part
247        assert!(!extracted_part.is_complete());
248        assert_eq!(extracted_part.batch_rows, 10);
249        assert_eq!(extracted_part.total_rows, 200);
250    }
251
252    #[test]
253    fn test_indexed_checkpoint() {
254        let epoch = 1;
255        let cp_sequence_number = 100;
256        let tx_hi = 1000;
257        let timestamp_ms = 1234567890;
258        let values = vec![1, 2, 3];
259
260        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
261            epoch,
262            cp_sequence_number,
263            tx_hi,
264            timestamp_ms,
265            values,
266        );
267
268        assert_eq!(checkpoint.len(), 3);
269        assert_eq!(checkpoint.checkpoint(), 100);
270    }
271
272    #[test]
273    fn test_indexed_checkpoint_with_empty_values() {
274        let epoch = 1;
275        let cp_sequence_number = 100;
276        let tx_hi = 1000;
277        let timestamp_ms = 1234567890;
278        let values: Vec<<TestProcessor as Processor>::Value> = vec![];
279
280        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
281            epoch,
282            cp_sequence_number,
283            tx_hi,
284            timestamp_ms,
285            values,
286        );
287
288        assert_eq!(checkpoint.len(), 0);
289        assert_eq!(checkpoint.checkpoint(), 100);
290    }
291}