sui_indexer_alt_framework/pipeline/
mod.rs1use std::time::Duration;
5
6pub use processor::Processor;
7use serde::Deserialize;
8use serde::Serialize;
9
10use crate::store::CommitterWatermark;
11
12pub mod concurrent;
13mod logging;
14mod processor;
15pub mod sequential;
16
17const PIPELINE_BUFFER: usize = 5;
20
21const WARN_PENDING_WATERMARKS: usize = 10000;
26
27#[derive(Serialize, Deserialize, Debug, Clone)]
28pub struct CommitterConfig {
29 pub write_concurrency: usize,
31
32 pub collect_interval_ms: u64,
34
35 pub watermark_interval_ms: u64,
37}
38
39struct IndexedCheckpoint<P: Processor> {
42 values: Vec<P::Value>,
44 watermark: CommitterWatermark,
46}
47
48#[derive(Debug, Clone)]
50struct WatermarkPart {
51 watermark: CommitterWatermark,
53 batch_rows: usize,
55 total_rows: usize,
57}
58
59impl CommitterConfig {
60 pub fn collect_interval(&self) -> Duration {
61 Duration::from_millis(self.collect_interval_ms)
62 }
63
64 pub fn watermark_interval(&self) -> Duration {
65 Duration::from_millis(self.watermark_interval_ms)
66 }
67}
68
69impl<P: Processor> IndexedCheckpoint<P> {
70 fn new(
71 epoch: u64,
72 cp_sequence_number: u64,
73 tx_hi: u64,
74 timestamp_ms: u64,
75 values: Vec<P::Value>,
76 ) -> Self {
77 Self {
78 watermark: CommitterWatermark {
79 epoch_hi_inclusive: epoch,
80 checkpoint_hi_inclusive: cp_sequence_number,
81 tx_hi,
82 timestamp_ms_hi_inclusive: timestamp_ms,
83 },
84 values,
85 }
86 }
87
88 fn len(&self) -> usize {
90 self.values.len()
91 }
92
93 fn checkpoint(&self) -> u64 {
95 self.watermark.checkpoint_hi_inclusive
96 }
97}
98
99impl WatermarkPart {
100 fn checkpoint(&self) -> u64 {
101 self.watermark.checkpoint_hi_inclusive
102 }
103
104 fn timestamp_ms(&self) -> u64 {
105 self.watermark.timestamp_ms_hi_inclusive
106 }
107
108 fn is_complete(&self) -> bool {
110 self.batch_rows == self.total_rows
111 }
112
113 fn add(&mut self, other: WatermarkPart) {
115 debug_assert_eq!(self.checkpoint(), other.checkpoint());
116 self.batch_rows += other.batch_rows;
117 }
118
119 fn take(&mut self, rows: usize) -> WatermarkPart {
121 debug_assert!(
122 self.batch_rows >= rows,
123 "Can't take more rows than are available"
124 );
125
126 self.batch_rows -= rows;
127 WatermarkPart {
128 watermark: self.watermark,
129 batch_rows: rows,
130 total_rows: self.total_rows,
131 }
132 }
133}
134
135impl Default for CommitterConfig {
136 fn default() -> Self {
137 Self {
138 write_concurrency: 5,
139 collect_interval_ms: 500,
140 watermark_interval_ms: 500,
141 }
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use async_trait::async_trait;
149 use std::sync::Arc;
150 use sui_types::full_checkpoint_content::Checkpoint;
151
152 struct TestProcessor;
154 #[async_trait]
155 impl Processor for TestProcessor {
156 const NAME: &'static str = "test";
157 type Value = i32;
158
159 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
160 Ok(vec![1, 2, 3])
161 }
162 }
163
164 #[test]
165 fn test_watermark_part_getters() {
166 let watermark = CommitterWatermark {
167 epoch_hi_inclusive: 1,
168 checkpoint_hi_inclusive: 100,
169 tx_hi: 1000,
170 timestamp_ms_hi_inclusive: 1234567890,
171 };
172
173 let part = WatermarkPart {
174 watermark,
175 batch_rows: 50,
176 total_rows: 200,
177 };
178
179 assert_eq!(part.checkpoint(), 100);
180 assert_eq!(part.timestamp_ms(), 1234567890);
181 }
182
183 #[test]
184 fn test_watermark_part_is_complete() {
185 let part = WatermarkPart {
186 watermark: CommitterWatermark::default(),
187 batch_rows: 200,
188 total_rows: 200,
189 };
190
191 assert!(part.is_complete());
192 }
193
194 #[test]
195 fn test_watermark_part_is_not_complete() {
196 let part = WatermarkPart {
197 watermark: CommitterWatermark::default(),
198 batch_rows: 199,
199 total_rows: 200,
200 };
201
202 assert!(!part.is_complete());
203 }
204
205 #[test]
206 fn test_watermark_part_becomes_complete_after_adding_new_batch() {
207 let mut part = WatermarkPart {
208 watermark: CommitterWatermark::default(),
209 batch_rows: 199,
210 total_rows: 200,
211 };
212
213 part.add(WatermarkPart {
215 watermark: CommitterWatermark::default(),
216 batch_rows: 1,
217 total_rows: 200,
218 });
219
220 assert!(part.is_complete());
221 assert_eq!(part.batch_rows, 200);
222 }
223
224 #[test]
225 fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
226 let mut part = WatermarkPart {
227 watermark: CommitterWatermark::default(),
228 batch_rows: 200,
229 total_rows: 200,
230 };
231 assert!(part.is_complete(), "Initial part should be complete");
232
233 let extracted_part = part.take(10);
235
236 assert!(!extracted_part.is_complete());
238 assert_eq!(extracted_part.batch_rows, 10);
239 assert_eq!(extracted_part.total_rows, 200);
240 }
241
242 #[test]
243 fn test_indexed_checkpoint() {
244 let epoch = 1;
245 let cp_sequence_number = 100;
246 let tx_hi = 1000;
247 let timestamp_ms = 1234567890;
248 let values = vec![1, 2, 3];
249
250 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
251 epoch,
252 cp_sequence_number,
253 tx_hi,
254 timestamp_ms,
255 values,
256 );
257
258 assert_eq!(checkpoint.len(), 3);
259 assert_eq!(checkpoint.checkpoint(), 100);
260 }
261
262 #[test]
263 fn test_indexed_checkpoint_with_empty_values() {
264 let epoch = 1;
265 let cp_sequence_number = 100;
266 let tx_hi = 1000;
267 let timestamp_ms = 1234567890;
268 let values: Vec<<TestProcessor as Processor>::Value> = vec![];
269
270 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
271 epoch,
272 cp_sequence_number,
273 tx_hi,
274 timestamp_ms,
275 values,
276 );
277
278 assert_eq!(checkpoint.len(), 0);
279 assert_eq!(checkpoint.checkpoint(), 100);
280 }
281}