sui_indexer_alt_framework/pipeline/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6pub use processor::Processor;
7use serde::{Deserialize, Serialize};
8
9use crate::store::CommitterWatermark;
10
11pub mod concurrent;
12mod logging;
13mod processor;
14pub mod sequential;
15
16/// Extra buffer added to channels between tasks in a pipeline. There does not need to be a huge
17/// capacity here because tasks already buffer rows to insert internally.
18const PIPELINE_BUFFER: usize = 5;
19
20/// Issue a warning every time the number of pending watermarks exceeds this number. This can
21/// happen if the pipeline was started with its initial checkpoint overridden to be strictly
22/// greater than its current watermark -- in that case, the pipeline will never be able to update
23/// its watermarks.
24const WARN_PENDING_WATERMARKS: usize = 10000;
25
26#[derive(Serialize, Deserialize, Debug, Clone)]
27pub struct CommitterConfig {
28    /// Number of concurrent writers per pipeline.
29    pub write_concurrency: usize,
30
31    /// The collector will check for pending data at least this often, in milliseconds.
32    pub collect_interval_ms: u64,
33
34    /// Watermark task will check for pending watermarks this often, in milliseconds.
35    pub watermark_interval_ms: u64,
36}
37
38/// Processed values associated with a single checkpoint. This is an internal type used to
39/// communicate between the processor and the collector parts of the pipeline.
40struct IndexedCheckpoint<P: Processor> {
41    /// Values to be inserted into the database from this checkpoint
42    values: Vec<P::Value>,
43    /// The watermark associated with this checkpoint
44    watermark: CommitterWatermark,
45}
46
47/// A representation of the proportion of a watermark.
48#[derive(Debug, Clone)]
49struct WatermarkPart {
50    /// The watermark itself
51    watermark: CommitterWatermark,
52    /// The number of rows from this watermark that are in this part
53    batch_rows: usize,
54    /// The total number of rows from this watermark
55    total_rows: usize,
56}
57
58impl CommitterConfig {
59    pub fn collect_interval(&self) -> Duration {
60        Duration::from_millis(self.collect_interval_ms)
61    }
62
63    pub fn watermark_interval(&self) -> Duration {
64        Duration::from_millis(self.watermark_interval_ms)
65    }
66}
67
68impl<P: Processor> IndexedCheckpoint<P> {
69    fn new(
70        epoch: u64,
71        cp_sequence_number: u64,
72        tx_hi: u64,
73        timestamp_ms: u64,
74        values: Vec<P::Value>,
75    ) -> Self {
76        Self {
77            watermark: CommitterWatermark {
78                epoch_hi_inclusive: epoch,
79                checkpoint_hi_inclusive: cp_sequence_number,
80                tx_hi,
81                timestamp_ms_hi_inclusive: timestamp_ms,
82            },
83            values,
84        }
85    }
86
87    /// Number of rows from this checkpoint
88    fn len(&self) -> usize {
89        self.values.len()
90    }
91
92    /// The checkpoint sequence number that this data is from
93    fn checkpoint(&self) -> u64 {
94        self.watermark.checkpoint_hi_inclusive
95    }
96}
97
98impl WatermarkPart {
99    fn checkpoint(&self) -> u64 {
100        self.watermark.checkpoint_hi_inclusive
101    }
102
103    fn timestamp_ms(&self) -> u64 {
104        self.watermark.timestamp_ms_hi_inclusive
105    }
106
107    /// Check if all the rows from this watermark are represented in this part.
108    fn is_complete(&self) -> bool {
109        self.batch_rows == self.total_rows
110    }
111
112    /// Add the rows from `other` to this part.
113    fn add(&mut self, other: WatermarkPart) {
114        debug_assert_eq!(self.checkpoint(), other.checkpoint());
115        self.batch_rows += other.batch_rows;
116    }
117
118    /// Record that `rows` have been taken from this part.
119    fn take(&mut self, rows: usize) -> WatermarkPart {
120        debug_assert!(
121            self.batch_rows >= rows,
122            "Can't take more rows than are available"
123        );
124
125        self.batch_rows -= rows;
126        WatermarkPart {
127            watermark: self.watermark,
128            batch_rows: rows,
129            total_rows: self.total_rows,
130        }
131    }
132}
133
134impl Default for CommitterConfig {
135    fn default() -> Self {
136        Self {
137            write_concurrency: 5,
138            collect_interval_ms: 500,
139            watermark_interval_ms: 500,
140        }
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use async_trait::async_trait;
148    use std::sync::Arc;
149    use sui_types::full_checkpoint_content::Checkpoint;
150
151    // Test implementation of Processor
152    struct TestProcessor;
153    #[async_trait]
154    impl Processor for TestProcessor {
155        const NAME: &'static str = "test";
156        type Value = i32;
157
158        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
159            Ok(vec![1, 2, 3])
160        }
161    }
162
163    #[test]
164    fn test_watermark_part_getters() {
165        let watermark = CommitterWatermark {
166            epoch_hi_inclusive: 1,
167            checkpoint_hi_inclusive: 100,
168            tx_hi: 1000,
169            timestamp_ms_hi_inclusive: 1234567890,
170        };
171
172        let part = WatermarkPart {
173            watermark,
174            batch_rows: 50,
175            total_rows: 200,
176        };
177
178        assert_eq!(part.checkpoint(), 100);
179        assert_eq!(part.timestamp_ms(), 1234567890);
180    }
181
182    #[test]
183    fn test_watermark_part_is_complete() {
184        let part = WatermarkPart {
185            watermark: CommitterWatermark::default(),
186            batch_rows: 200,
187            total_rows: 200,
188        };
189
190        assert!(part.is_complete());
191    }
192
193    #[test]
194    fn test_watermark_part_is_not_complete() {
195        let part = WatermarkPart {
196            watermark: CommitterWatermark::default(),
197            batch_rows: 199,
198            total_rows: 200,
199        };
200
201        assert!(!part.is_complete());
202    }
203
204    #[test]
205    fn test_watermark_part_becomes_complete_after_adding_new_batch() {
206        let mut part = WatermarkPart {
207            watermark: CommitterWatermark::default(),
208            batch_rows: 199,
209            total_rows: 200,
210        };
211
212        // Add a batch that makes it complete
213        part.add(WatermarkPart {
214            watermark: CommitterWatermark::default(),
215            batch_rows: 1,
216            total_rows: 200,
217        });
218
219        assert!(part.is_complete());
220        assert_eq!(part.batch_rows, 200);
221    }
222
223    #[test]
224    fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
225        let mut part = WatermarkPart {
226            watermark: CommitterWatermark::default(),
227            batch_rows: 200,
228            total_rows: 200,
229        };
230        assert!(part.is_complete(), "Initial part should be complete");
231
232        // Take away a portion of the batch
233        let extracted_part = part.take(10);
234
235        // Verify state of extracted part
236        assert!(!extracted_part.is_complete());
237        assert_eq!(extracted_part.batch_rows, 10);
238        assert_eq!(extracted_part.total_rows, 200);
239    }
240
241    #[test]
242    fn test_indexed_checkpoint() {
243        let epoch = 1;
244        let cp_sequence_number = 100;
245        let tx_hi = 1000;
246        let timestamp_ms = 1234567890;
247        let values = vec![1, 2, 3];
248
249        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
250            epoch,
251            cp_sequence_number,
252            tx_hi,
253            timestamp_ms,
254            values,
255        );
256
257        assert_eq!(checkpoint.len(), 3);
258        assert_eq!(checkpoint.checkpoint(), 100);
259    }
260
261    #[test]
262    fn test_indexed_checkpoint_with_empty_values() {
263        let epoch = 1;
264        let cp_sequence_number = 100;
265        let tx_hi = 1000;
266        let timestamp_ms = 1234567890;
267        let values: Vec<<TestProcessor as Processor>::Value> = vec![];
268
269        let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
270            epoch,
271            cp_sequence_number,
272            tx_hi,
273            timestamp_ms,
274            values,
275        );
276
277        assert_eq!(checkpoint.len(), 0);
278        assert_eq!(checkpoint.checkpoint(), 100);
279    }
280}