sui_indexer_alt_framework/pipeline/
mod.rs1use std::time::Duration;
5
6pub use crate::config::ConcurrencyConfig;
7use crate::store::CommitterWatermark;
8pub use processor::Processor;
9use rand::Rng;
10use serde::Deserialize;
11use serde::Serialize;
12
13pub mod concurrent;
14mod logging;
15mod processor;
16pub mod sequential;
17
18const WARN_PENDING_WATERMARKS: usize = 10000;
23
24#[derive(Serialize, Deserialize, Debug, Clone)]
25pub struct CommitterConfig {
26 pub write_concurrency: usize,
28
29 pub collect_interval_ms: u64,
31
32 pub watermark_interval_ms: u64,
34
35 pub watermark_interval_jitter_ms: u64,
37}
38
39struct IndexedCheckpoint<P: Processor> {
42 values: Vec<P::Value>,
44 watermark: CommitterWatermark,
46}
47
48#[derive(Debug, Clone)]
50struct WatermarkPart {
51 watermark: CommitterWatermark,
53 batch_rows: usize,
55 total_rows: usize,
57}
58
59impl CommitterConfig {
60 pub fn collect_interval(&self) -> Duration {
61 Duration::from_millis(self.collect_interval_ms)
62 }
63
64 pub fn watermark_interval(&self) -> Duration {
65 Duration::from_millis(self.watermark_interval_ms)
66 }
67
68 pub fn watermark_interval_with_jitter(&self) -> tokio::time::Instant {
71 let jitter = if self.watermark_interval_jitter_ms == 0 {
72 0
73 } else {
74 rand::thread_rng().gen_range(0..=self.watermark_interval_jitter_ms)
75 };
76 tokio::time::Instant::now() + Duration::from_millis(self.watermark_interval_ms + jitter)
77 }
78}
79
80impl<P: Processor> IndexedCheckpoint<P> {
81 fn new(
82 epoch: u64,
83 cp_sequence_number: u64,
84 tx_hi: u64,
85 timestamp_ms: u64,
86 values: Vec<P::Value>,
87 ) -> Self {
88 Self {
89 watermark: CommitterWatermark {
90 epoch_hi_inclusive: epoch,
91 checkpoint_hi_inclusive: cp_sequence_number,
92 tx_hi,
93 timestamp_ms_hi_inclusive: timestamp_ms,
94 },
95 values,
96 }
97 }
98
99 fn len(&self) -> usize {
101 self.values.len()
102 }
103
104 fn checkpoint(&self) -> u64 {
106 self.watermark.checkpoint_hi_inclusive
107 }
108}
109
110impl WatermarkPart {
111 fn checkpoint(&self) -> u64 {
112 self.watermark.checkpoint_hi_inclusive
113 }
114
115 fn timestamp_ms(&self) -> u64 {
116 self.watermark.timestamp_ms_hi_inclusive
117 }
118
119 fn is_complete(&self) -> bool {
121 self.batch_rows == self.total_rows
122 }
123
124 fn add(&mut self, other: WatermarkPart) {
126 assert_eq!(self.checkpoint(), other.checkpoint());
127 self.batch_rows += other.batch_rows;
128 assert!(
129 self.batch_rows <= self.total_rows,
130 "batch_rows ({}) exceeded total_rows ({})",
131 self.batch_rows,
132 self.total_rows,
133 );
134 }
135
136 fn take(&mut self, rows: usize) -> WatermarkPart {
138 assert!(
139 self.batch_rows >= rows,
140 "Can't take more rows than are available"
141 );
142
143 self.batch_rows -= rows;
144 WatermarkPart {
145 watermark: self.watermark,
146 batch_rows: rows,
147 total_rows: self.total_rows,
148 }
149 }
150}
151
152impl Default for CommitterConfig {
153 fn default() -> Self {
154 Self {
155 write_concurrency: 5,
156 collect_interval_ms: 500,
157 watermark_interval_ms: 500,
158 watermark_interval_jitter_ms: 0,
159 }
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use async_trait::async_trait;
167 use std::sync::Arc;
168 use sui_types::full_checkpoint_content::Checkpoint;
169
170 struct TestProcessor;
172 #[async_trait]
173 impl Processor for TestProcessor {
174 const NAME: &'static str = "test";
175 type Value = i32;
176
177 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
178 Ok(vec![1, 2, 3])
179 }
180 }
181
182 #[test]
183 fn test_watermark_part_getters() {
184 let watermark = CommitterWatermark {
185 epoch_hi_inclusive: 1,
186 checkpoint_hi_inclusive: 100,
187 tx_hi: 1000,
188 timestamp_ms_hi_inclusive: 1234567890,
189 };
190
191 let part = WatermarkPart {
192 watermark,
193 batch_rows: 50,
194 total_rows: 200,
195 };
196
197 assert_eq!(part.checkpoint(), 100);
198 assert_eq!(part.timestamp_ms(), 1234567890);
199 }
200
201 #[test]
202 fn test_watermark_part_is_complete() {
203 let part = WatermarkPart {
204 watermark: CommitterWatermark::default(),
205 batch_rows: 200,
206 total_rows: 200,
207 };
208
209 assert!(part.is_complete());
210 }
211
212 #[test]
213 fn test_watermark_part_is_not_complete() {
214 let part = WatermarkPart {
215 watermark: CommitterWatermark::default(),
216 batch_rows: 199,
217 total_rows: 200,
218 };
219
220 assert!(!part.is_complete());
221 }
222
223 #[test]
224 fn test_watermark_part_becomes_complete_after_adding_new_batch() {
225 let mut part = WatermarkPart {
226 watermark: CommitterWatermark::default(),
227 batch_rows: 199,
228 total_rows: 200,
229 };
230
231 part.add(WatermarkPart {
233 watermark: CommitterWatermark::default(),
234 batch_rows: 1,
235 total_rows: 200,
236 });
237
238 assert!(part.is_complete());
239 assert_eq!(part.batch_rows, 200);
240 }
241
242 #[test]
243 fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
244 let mut part = WatermarkPart {
245 watermark: CommitterWatermark::default(),
246 batch_rows: 200,
247 total_rows: 200,
248 };
249 assert!(part.is_complete(), "Initial part should be complete");
250
251 let extracted_part = part.take(10);
253
254 assert!(!extracted_part.is_complete());
256 assert_eq!(extracted_part.batch_rows, 10);
257 assert_eq!(extracted_part.total_rows, 200);
258 }
259
260 #[test]
261 fn test_indexed_checkpoint() {
262 let epoch = 1;
263 let cp_sequence_number = 100;
264 let tx_hi = 1000;
265 let timestamp_ms = 1234567890;
266 let values = vec![1, 2, 3];
267
268 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
269 epoch,
270 cp_sequence_number,
271 tx_hi,
272 timestamp_ms,
273 values,
274 );
275
276 assert_eq!(checkpoint.len(), 3);
277 assert_eq!(checkpoint.checkpoint(), 100);
278 }
279
280 #[test]
281 fn test_indexed_checkpoint_with_empty_values() {
282 let epoch = 1;
283 let cp_sequence_number = 100;
284 let tx_hi = 1000;
285 let timestamp_ms = 1234567890;
286 let values: Vec<<TestProcessor as Processor>::Value> = vec![];
287
288 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
289 epoch,
290 cp_sequence_number,
291 tx_hi,
292 timestamp_ms,
293 values,
294 );
295
296 assert_eq!(checkpoint.len(), 0);
297 assert_eq!(checkpoint.checkpoint(), 100);
298 }
299}