sui_indexer_alt_framework/pipeline/
mod.rs1use std::time::Duration;
5
6pub use crate::config::ConcurrencyConfig;
7use crate::store::CommitterWatermark;
8pub use processor::Processor;
9use rand::Rng;
10use serde::Deserialize;
11use serde::Serialize;
12
13pub mod concurrent;
14mod logging;
15mod processor;
16pub mod sequential;
17
18const WARN_PENDING_WATERMARKS: usize = 10000;
23
24#[derive(Serialize, Deserialize, Debug, Clone)]
25pub struct CommitterConfig {
26 pub write_concurrency: usize,
28
29 pub collect_interval_ms: u64,
31
32 pub watermark_interval_ms: u64,
34
35 pub watermark_interval_jitter_ms: u64,
37}
38
39#[derive(Serialize, Deserialize, Debug, Clone, Default)]
41pub struct IngestionConfig {
42 pub subscriber_channel_size: Option<usize>,
45}
46
47impl IngestionConfig {
48 pub fn subscriber_channel_size(&self) -> usize {
56 self.subscriber_channel_size
57 .unwrap_or_else(|| (num_cpus::get() / 2).max(4))
58 }
59}
60
61struct IndexedCheckpoint<P: Processor> {
64 values: Vec<P::Value>,
66 watermark: CommitterWatermark,
68}
69
70#[derive(Debug, Clone)]
72struct WatermarkPart {
73 watermark: CommitterWatermark,
75 batch_rows: usize,
77 total_rows: usize,
79}
80
81impl CommitterConfig {
82 pub fn collect_interval(&self) -> Duration {
83 Duration::from_millis(self.collect_interval_ms)
84 }
85
86 pub fn watermark_interval(&self) -> Duration {
87 Duration::from_millis(self.watermark_interval_ms)
88 }
89
90 pub fn watermark_interval_with_jitter(&self) -> tokio::time::Instant {
93 let jitter = if self.watermark_interval_jitter_ms == 0 {
94 0
95 } else {
96 rand::thread_rng().gen_range(0..=self.watermark_interval_jitter_ms)
97 };
98 tokio::time::Instant::now() + Duration::from_millis(self.watermark_interval_ms + jitter)
99 }
100}
101
102impl<P: Processor> IndexedCheckpoint<P> {
103 fn new(
104 epoch: u64,
105 cp_sequence_number: u64,
106 tx_hi: u64,
107 timestamp_ms: u64,
108 values: Vec<P::Value>,
109 ) -> Self {
110 Self {
111 watermark: CommitterWatermark {
112 epoch_hi_inclusive: epoch,
113 checkpoint_hi_inclusive: cp_sequence_number,
114 tx_hi,
115 timestamp_ms_hi_inclusive: timestamp_ms,
116 },
117 values,
118 }
119 }
120
121 fn len(&self) -> usize {
123 self.values.len()
124 }
125
126 fn checkpoint(&self) -> u64 {
128 self.watermark.checkpoint_hi_inclusive
129 }
130}
131
132impl WatermarkPart {
133 fn checkpoint(&self) -> u64 {
134 self.watermark.checkpoint_hi_inclusive
135 }
136
137 fn timestamp_ms(&self) -> u64 {
138 self.watermark.timestamp_ms_hi_inclusive
139 }
140
141 fn is_complete(&self) -> bool {
143 self.batch_rows == self.total_rows
144 }
145
146 fn add(&mut self, other: WatermarkPart) {
148 assert_eq!(self.checkpoint(), other.checkpoint());
149 self.batch_rows += other.batch_rows;
150 assert!(
151 self.batch_rows <= self.total_rows,
152 "batch_rows ({}) exceeded total_rows ({})",
153 self.batch_rows,
154 self.total_rows,
155 );
156 }
157
158 fn take(&mut self, rows: usize) -> WatermarkPart {
160 assert!(
161 self.batch_rows >= rows,
162 "Can't take more rows than are available"
163 );
164
165 self.batch_rows -= rows;
166 WatermarkPart {
167 watermark: self.watermark,
168 batch_rows: rows,
169 total_rows: self.total_rows,
170 }
171 }
172}
173
174impl Default for CommitterConfig {
175 fn default() -> Self {
176 Self {
177 write_concurrency: 5,
178 collect_interval_ms: 500,
179 watermark_interval_ms: 500,
180 watermark_interval_jitter_ms: 0,
181 }
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use async_trait::async_trait;
189 use std::sync::Arc;
190 use sui_types::full_checkpoint_content::Checkpoint;
191
192 struct TestProcessor;
194 #[async_trait]
195 impl Processor for TestProcessor {
196 const NAME: &'static str = "test";
197 type Value = i32;
198
199 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
200 Ok(vec![1, 2, 3])
201 }
202 }
203
204 #[test]
205 fn test_watermark_part_getters() {
206 let watermark = CommitterWatermark {
207 epoch_hi_inclusive: 1,
208 checkpoint_hi_inclusive: 100,
209 tx_hi: 1000,
210 timestamp_ms_hi_inclusive: 1234567890,
211 };
212
213 let part = WatermarkPart {
214 watermark,
215 batch_rows: 50,
216 total_rows: 200,
217 };
218
219 assert_eq!(part.checkpoint(), 100);
220 assert_eq!(part.timestamp_ms(), 1234567890);
221 }
222
223 #[test]
224 fn test_watermark_part_is_complete() {
225 let part = WatermarkPart {
226 watermark: CommitterWatermark::default(),
227 batch_rows: 200,
228 total_rows: 200,
229 };
230
231 assert!(part.is_complete());
232 }
233
234 #[test]
235 fn test_watermark_part_is_not_complete() {
236 let part = WatermarkPart {
237 watermark: CommitterWatermark::default(),
238 batch_rows: 199,
239 total_rows: 200,
240 };
241
242 assert!(!part.is_complete());
243 }
244
245 #[test]
246 fn test_watermark_part_becomes_complete_after_adding_new_batch() {
247 let mut part = WatermarkPart {
248 watermark: CommitterWatermark::default(),
249 batch_rows: 199,
250 total_rows: 200,
251 };
252
253 part.add(WatermarkPart {
255 watermark: CommitterWatermark::default(),
256 batch_rows: 1,
257 total_rows: 200,
258 });
259
260 assert!(part.is_complete());
261 assert_eq!(part.batch_rows, 200);
262 }
263
264 #[test]
265 fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
266 let mut part = WatermarkPart {
267 watermark: CommitterWatermark::default(),
268 batch_rows: 200,
269 total_rows: 200,
270 };
271 assert!(part.is_complete(), "Initial part should be complete");
272
273 let extracted_part = part.take(10);
275
276 assert!(!extracted_part.is_complete());
278 assert_eq!(extracted_part.batch_rows, 10);
279 assert_eq!(extracted_part.total_rows, 200);
280 }
281
282 #[test]
283 fn test_indexed_checkpoint() {
284 let epoch = 1;
285 let cp_sequence_number = 100;
286 let tx_hi = 1000;
287 let timestamp_ms = 1234567890;
288 let values = vec![1, 2, 3];
289
290 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
291 epoch,
292 cp_sequence_number,
293 tx_hi,
294 timestamp_ms,
295 values,
296 );
297
298 assert_eq!(checkpoint.len(), 3);
299 assert_eq!(checkpoint.checkpoint(), 100);
300 }
301
302 #[test]
303 fn test_indexed_checkpoint_with_empty_values() {
304 let epoch = 1;
305 let cp_sequence_number = 100;
306 let tx_hi = 1000;
307 let timestamp_ms = 1234567890;
308 let values: Vec<<TestProcessor as Processor>::Value> = vec![];
309
310 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
311 epoch,
312 cp_sequence_number,
313 tx_hi,
314 timestamp_ms,
315 values,
316 );
317
318 assert_eq!(checkpoint.len(), 0);
319 assert_eq!(checkpoint.checkpoint(), 100);
320 }
321}