sui_indexer_alt_framework/pipeline/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6pub use crate::config::ConcurrencyConfig;
7use crate::store::CommitterWatermark;
8pub use processor::Processor;
9use rand::Rng;
10use serde::Deserialize;
11use serde::Serialize;
12
13pub mod concurrent;
14mod logging;
15mod processor;
16pub mod sequential;
17
18/// Issue a warning every time the number of pending watermarks exceeds this number. This can
19/// happen if the pipeline was started with its initial checkpoint overridden to be strictly
20/// greater than its current watermark -- in that case, the pipeline will never be able to update
21/// its watermarks.
22const WARN_PENDING_WATERMARKS: usize = 10000;
23
24#[derive(Serialize, Deserialize, Debug, Clone)]
25pub struct CommitterConfig {
26    /// Number of concurrent writers per pipeline.
27    pub write_concurrency: usize,
28
29    /// The collector will check for pending data at least this often, in milliseconds.
30    pub collect_interval_ms: u64,
31
32    /// Watermark task will check for pending watermarks this often, in milliseconds.
33    pub watermark_interval_ms: u64,
34
35    /// Maximum random jitter to add to the watermark interval, in milliseconds.
36    pub watermark_interval_jitter_ms: u64,
37}
38
39/// Processed values associated with a single checkpoint. This is an internal type used to
40/// communicate between the processor and the collector parts of the pipeline.
41struct IndexedCheckpoint<P: Processor> {
42    /// Values to be inserted into the database from this checkpoint
43    values: Vec<P::Value>,
44    /// The watermark associated with this checkpoint
45    watermark: CommitterWatermark,
46}
47
48/// A representation of the proportion of a watermark.
49#[derive(Debug, Clone)]
50struct WatermarkPart {
51    /// The watermark itself
52    watermark: CommitterWatermark,
53    /// The number of rows from this watermark that are in this part
54    batch_rows: usize,
55    /// The total number of rows from this watermark
56    total_rows: usize,
57}
58
59impl CommitterConfig {
60    pub fn collect_interval(&self) -> Duration {
61        Duration::from_millis(self.collect_interval_ms)
62    }
63
64    pub fn watermark_interval(&self) -> Duration {
65        Duration::from_millis(self.watermark_interval_ms)
66    }
67
68    /// Returns the next watermark update instant with a random jitter added. The jitter is a
69    /// random value between 0 and `watermark_interval_jitter_ms`.
70    pub fn watermark_interval_with_jitter(&self) -> tokio::time::Instant {
71        let jitter = if self.watermark_interval_jitter_ms == 0 {
72            0
73        } else {
74            rand::thread_rng().gen_range(0..=self.watermark_interval_jitter_ms)
75        };
76        tokio::time::Instant::now() + Duration::from_millis(self.watermark_interval_ms + jitter)
77    }
78}
79
80impl<P: Processor> IndexedCheckpoint<P> {
81    fn new(
82        epoch: u64,
83        cp_sequence_number: u64,
84        tx_hi: u64,
85        timestamp_ms: u64,
86        values: Vec<P::Value>,
87    ) -> Self {
88        Self {
89            watermark: CommitterWatermark {
90                epoch_hi_inclusive: epoch,
91                checkpoint_hi_inclusive: cp_sequence_number,
92                tx_hi,
93                timestamp_ms_hi_inclusive: timestamp_ms,
94            },
95            values,
96        }
97    }
98
99    /// Number of rows from this checkpoint
100    fn len(&self) -> usize {
101        self.values.len()
102    }
103
104    /// The checkpoint sequence number that this data is from
105    fn checkpoint(&self) -> u64 {
106        self.watermark.checkpoint_hi_inclusive
107    }
108}
109
110impl WatermarkPart {
111    fn checkpoint(&self) -> u64 {
112        self.watermark.checkpoint_hi_inclusive
113    }
114
115    fn timestamp_ms(&self) -> u64 {
116        self.watermark.timestamp_ms_hi_inclusive
117    }
118
119    /// Check if all the rows from this watermark are represented in this part.
120    fn is_complete(&self) -> bool {
121        self.batch_rows == self.total_rows
122    }
123
124    /// Add the rows from `other` to this part.
125    fn add(&mut self, other: WatermarkPart) {
126        debug_assert_eq!(self.checkpoint(), other.checkpoint());
127        self.batch_rows += other.batch_rows;
128    }
129
130    /// Record that `rows` have been taken from this part.
131    fn take(&mut self, rows: usize) -> WatermarkPart {
132        debug_assert!(
133            self.batch_rows >= rows,
134            "Can't take more rows than are available"
135        );
136
137        self.batch_rows -= rows;
138        WatermarkPart {
139            watermark: self.watermark,
140            batch_rows: rows,
141            total_rows: self.total_rows,
142        }
143    }
144}
145
146impl Default for CommitterConfig {
147    fn default() -> Self {
148        Self {
149            write_concurrency: 5,
150            collect_interval_ms: 500,
151            watermark_interval_ms: 500,
152            watermark_interval_jitter_ms: 0,
153        }
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use async_trait::async_trait;
161    use std::sync::Arc;
162    use sui_types::full_checkpoint_content::Checkpoint;
163
164    // Test implementation of Processor
165    struct TestProcessor;
166    #[async_trait]
167    impl Processor for TestProcessor {
168        const NAME: &'static str = "test";
169        type Value = i32;
170
171        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
172            Ok(vec![1, 2, 3])
173        }
174    }
175
176    #[test]
177    fn test_watermark_part_getters() {
178        let watermark = CommitterWatermark {
179            epoch_hi_inclusive: 1,
180            checkpoint_hi_inclusive: 100,
181            tx_hi: 1000,
182            timestamp_ms_hi_inclusive: 1234567890,
183        };
184
185        let part = WatermarkPart {
186            watermark,
187            batch_rows: 50,
188            total_rows: 200,
189        };
190
191        assert_eq!(part.checkpoint(), 100);
192        assert_eq!(part.timestamp_ms(), 1234567890);
193    }
194
195    #[test]
196    fn test_watermark_part_is_complete() {
197        let part = WatermarkPart {
198            watermark: CommitterWatermark::default(),
199            batch_rows: 200,
200            total_rows: 200,
201        };
202
203        assert!(part.is_complete());
204    }
205
206    #[test]
207    fn test_watermark_part_is_not_complete() {
208        let part = WatermarkPart {
209            watermark: CommitterWatermark::default(),
210            batch_rows: 199,
211            total_rows: 200,
212        };
213
214        assert!(!part.is_complete());
215    }
216
217    #[test]
218    fn test_watermark_part_becomes_complete_after_adding_new_batch() {
219        let mut part = WatermarkPart {
220            watermark: CommitterWatermark::default(),
221            batch_rows: 199,
222            total_rows: 200,
223        };
224
225        // Add a batch that makes it complete
226        part.add(WatermarkPart {
227            watermark: CommitterWatermark::default(),
228            batch_rows: 1,
229            total_rows: 200,
230        });
231
232        assert!(part.is_complete());
233        assert_eq!(part.batch_rows, 200);
234    }
235
236    #[test]
237    fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
238        let mut part = WatermarkPart {
239            watermark: CommitterWatermark::default(),
240            batch_rows: 200,
241            total_rows: 200,
242        };
243        assert!(part.is_complete(), "Initial part should be complete");
244
245        // Take away a portion of the batch
246        let extracted_part = part.take(10);
247
248        // Verify state of extracted part
249        assert!(!extracted_part.is_complete());
250        assert_eq!(extracted_part.batch_rows, 10);
251        assert_eq!(extracted_part.total_rows, 200);
252    }
253
254    #[test]
255    fn test_indexed_checkpoint() {
256        let epoch = 1;
257        let cp_sequence_number = 100;
258        let tx_hi = 1000;
259        let timestamp_ms = 1234567890;
260        let values = vec![1, 2, 3];
261
262        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
263            epoch,
264            cp_sequence_number,
265            tx_hi,
266            timestamp_ms,
267            values,
268        );
269
270        assert_eq!(checkpoint.len(), 3);
271        assert_eq!(checkpoint.checkpoint(), 100);
272    }
273
274    #[test]
275    fn test_indexed_checkpoint_with_empty_values() {
276        let epoch = 1;
277        let cp_sequence_number = 100;
278        let tx_hi = 1000;
279        let timestamp_ms = 1234567890;
280        let values: Vec<<TestProcessor as Processor>::Value> = vec![];
281
282        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
283            epoch,
284            cp_sequence_number,
285            tx_hi,
286            timestamp_ms,
287            values,
288        );
289
290        assert_eq!(checkpoint.len(), 0);
291        assert_eq!(checkpoint.checkpoint(), 100);
292    }
293}