sui_indexer_alt_framework/pipeline/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6pub use crate::config::ConcurrencyConfig;
7use crate::store::CommitterWatermark;
8pub use processor::Processor;
9use rand::Rng;
10use serde::Deserialize;
11use serde::Serialize;
12
13pub mod concurrent;
14mod logging;
15mod processor;
16pub mod sequential;
17
18/// Issue a warning every time the number of pending watermarks exceeds this number. This can
19/// happen if the pipeline was started with its initial checkpoint overridden to be strictly
20/// greater than its current watermark -- in that case, the pipeline will never be able to update
21/// its watermarks.
22const WARN_PENDING_WATERMARKS: usize = 10000;
23
24#[derive(Serialize, Deserialize, Debug, Clone)]
25pub struct CommitterConfig {
26    /// Number of concurrent writers per pipeline.
27    pub write_concurrency: usize,
28
29    /// The collector will check for pending data at least this often, in milliseconds.
30    pub collect_interval_ms: u64,
31
32    /// Watermark task will check for pending watermarks this often, in milliseconds.
33    pub watermark_interval_ms: u64,
34
35    /// Maximum random jitter to add to the watermark interval, in milliseconds.
36    pub watermark_interval_jitter_ms: u64,
37}
38
39/// Processed values associated with a single checkpoint. This is an internal type used to
40/// communicate between the processor and the collector parts of the pipeline.
41struct IndexedCheckpoint<P: Processor> {
42    /// Values to be inserted into the database from this checkpoint
43    values: Vec<P::Value>,
44    /// The watermark associated with this checkpoint
45    watermark: CommitterWatermark,
46}
47
48/// A representation of the proportion of a watermark.
49#[derive(Debug, Clone)]
50struct WatermarkPart {
51    /// The watermark itself
52    watermark: CommitterWatermark,
53    /// The number of rows from this watermark that are in this part
54    batch_rows: usize,
55    /// The total number of rows from this watermark
56    total_rows: usize,
57}
58
59impl CommitterConfig {
60    pub fn collect_interval(&self) -> Duration {
61        Duration::from_millis(self.collect_interval_ms)
62    }
63
64    pub fn watermark_interval(&self) -> Duration {
65        Duration::from_millis(self.watermark_interval_ms)
66    }
67
68    /// Returns the next watermark update instant with a random jitter added. The jitter is a
69    /// random value between 0 and `watermark_interval_jitter_ms`.
70    pub fn watermark_interval_with_jitter(&self) -> tokio::time::Instant {
71        let jitter = if self.watermark_interval_jitter_ms == 0 {
72            0
73        } else {
74            rand::thread_rng().gen_range(0..=self.watermark_interval_jitter_ms)
75        };
76        tokio::time::Instant::now() + Duration::from_millis(self.watermark_interval_ms + jitter)
77    }
78}
79
80impl<P: Processor> IndexedCheckpoint<P> {
81    fn new(
82        epoch: u64,
83        cp_sequence_number: u64,
84        tx_hi: u64,
85        timestamp_ms: u64,
86        values: Vec<P::Value>,
87    ) -> Self {
88        Self {
89            watermark: CommitterWatermark {
90                epoch_hi_inclusive: epoch,
91                checkpoint_hi_inclusive: cp_sequence_number,
92                tx_hi,
93                timestamp_ms_hi_inclusive: timestamp_ms,
94            },
95            values,
96        }
97    }
98
99    /// Number of rows from this checkpoint
100    fn len(&self) -> usize {
101        self.values.len()
102    }
103
104    /// The checkpoint sequence number that this data is from
105    fn checkpoint(&self) -> u64 {
106        self.watermark.checkpoint_hi_inclusive
107    }
108}
109
110impl WatermarkPart {
111    fn checkpoint(&self) -> u64 {
112        self.watermark.checkpoint_hi_inclusive
113    }
114
115    fn timestamp_ms(&self) -> u64 {
116        self.watermark.timestamp_ms_hi_inclusive
117    }
118
119    /// Check if all the rows from this watermark are represented in this part.
120    fn is_complete(&self) -> bool {
121        self.batch_rows == self.total_rows
122    }
123
124    /// Add the rows from `other` to this part.
125    fn add(&mut self, other: WatermarkPart) {
126        assert_eq!(self.checkpoint(), other.checkpoint());
127        self.batch_rows += other.batch_rows;
128        assert!(
129            self.batch_rows <= self.total_rows,
130            "batch_rows ({}) exceeded total_rows ({})",
131            self.batch_rows,
132            self.total_rows,
133        );
134    }
135
136    /// Record that `rows` have been taken from this part.
137    fn take(&mut self, rows: usize) -> WatermarkPart {
138        assert!(
139            self.batch_rows >= rows,
140            "Can't take more rows than are available"
141        );
142
143        self.batch_rows -= rows;
144        WatermarkPart {
145            watermark: self.watermark,
146            batch_rows: rows,
147            total_rows: self.total_rows,
148        }
149    }
150}
151
152impl Default for CommitterConfig {
153    fn default() -> Self {
154        Self {
155            write_concurrency: 5,
156            collect_interval_ms: 500,
157            watermark_interval_ms: 500,
158            watermark_interval_jitter_ms: 0,
159        }
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use async_trait::async_trait;
167    use std::sync::Arc;
168    use sui_types::full_checkpoint_content::Checkpoint;
169
170    // Test implementation of Processor
171    struct TestProcessor;
172    #[async_trait]
173    impl Processor for TestProcessor {
174        const NAME: &'static str = "test";
175        type Value = i32;
176
177        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
178            Ok(vec![1, 2, 3])
179        }
180    }
181
182    #[test]
183    fn test_watermark_part_getters() {
184        let watermark = CommitterWatermark {
185            epoch_hi_inclusive: 1,
186            checkpoint_hi_inclusive: 100,
187            tx_hi: 1000,
188            timestamp_ms_hi_inclusive: 1234567890,
189        };
190
191        let part = WatermarkPart {
192            watermark,
193            batch_rows: 50,
194            total_rows: 200,
195        };
196
197        assert_eq!(part.checkpoint(), 100);
198        assert_eq!(part.timestamp_ms(), 1234567890);
199    }
200
201    #[test]
202    fn test_watermark_part_is_complete() {
203        let part = WatermarkPart {
204            watermark: CommitterWatermark::default(),
205            batch_rows: 200,
206            total_rows: 200,
207        };
208
209        assert!(part.is_complete());
210    }
211
212    #[test]
213    fn test_watermark_part_is_not_complete() {
214        let part = WatermarkPart {
215            watermark: CommitterWatermark::default(),
216            batch_rows: 199,
217            total_rows: 200,
218        };
219
220        assert!(!part.is_complete());
221    }
222
223    #[test]
224    fn test_watermark_part_becomes_complete_after_adding_new_batch() {
225        let mut part = WatermarkPart {
226            watermark: CommitterWatermark::default(),
227            batch_rows: 199,
228            total_rows: 200,
229        };
230
231        // Add a batch that makes it complete
232        part.add(WatermarkPart {
233            watermark: CommitterWatermark::default(),
234            batch_rows: 1,
235            total_rows: 200,
236        });
237
238        assert!(part.is_complete());
239        assert_eq!(part.batch_rows, 200);
240    }
241
242    #[test]
243    fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
244        let mut part = WatermarkPart {
245            watermark: CommitterWatermark::default(),
246            batch_rows: 200,
247            total_rows: 200,
248        };
249        assert!(part.is_complete(), "Initial part should be complete");
250
251        // Take away a portion of the batch
252        let extracted_part = part.take(10);
253
254        // Verify state of extracted part
255        assert!(!extracted_part.is_complete());
256        assert_eq!(extracted_part.batch_rows, 10);
257        assert_eq!(extracted_part.total_rows, 200);
258    }
259
260    #[test]
261    fn test_indexed_checkpoint() {
262        let epoch = 1;
263        let cp_sequence_number = 100;
264        let tx_hi = 1000;
265        let timestamp_ms = 1234567890;
266        let values = vec![1, 2, 3];
267
268        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
269            epoch,
270            cp_sequence_number,
271            tx_hi,
272            timestamp_ms,
273            values,
274        );
275
276        assert_eq!(checkpoint.len(), 3);
277        assert_eq!(checkpoint.checkpoint(), 100);
278    }
279
280    #[test]
281    fn test_indexed_checkpoint_with_empty_values() {
282        let epoch = 1;
283        let cp_sequence_number = 100;
284        let tx_hi = 1000;
285        let timestamp_ms = 1234567890;
286        let values: Vec<<TestProcessor as Processor>::Value> = vec![];
287
288        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
289            epoch,
290            cp_sequence_number,
291            tx_hi,
292            timestamp_ms,
293            values,
294        );
295
296        assert_eq!(checkpoint.len(), 0);
297        assert_eq!(checkpoint.checkpoint(), 100);
298    }
299}