sui_indexer_alt_framework/pipeline/
mod.rs1use std::time::Duration;
5
6pub use crate::config::ConcurrencyConfig;
7use crate::store::CommitterWatermark;
8pub use processor::Processor;
9use rand::Rng;
10use serde::Deserialize;
11use serde::Serialize;
12
13pub mod concurrent;
14mod logging;
15mod processor;
16pub mod sequential;
17
18const WARN_PENDING_WATERMARKS: usize = 10000;
23
24#[derive(Serialize, Deserialize, Debug, Clone)]
25pub struct CommitterConfig {
26 pub write_concurrency: usize,
28
29 pub collect_interval_ms: u64,
31
32 pub watermark_interval_ms: u64,
34
35 pub watermark_interval_jitter_ms: u64,
37}
38
39struct IndexedCheckpoint<P: Processor> {
42 values: Vec<P::Value>,
44 watermark: CommitterWatermark,
46}
47
48#[derive(Debug, Clone)]
50struct WatermarkPart {
51 watermark: CommitterWatermark,
53 batch_rows: usize,
55 total_rows: usize,
57}
58
59impl CommitterConfig {
60 pub fn collect_interval(&self) -> Duration {
61 Duration::from_millis(self.collect_interval_ms)
62 }
63
64 pub fn watermark_interval(&self) -> Duration {
65 Duration::from_millis(self.watermark_interval_ms)
66 }
67
68 pub fn watermark_interval_with_jitter(&self) -> tokio::time::Instant {
71 let jitter = if self.watermark_interval_jitter_ms == 0 {
72 0
73 } else {
74 rand::thread_rng().gen_range(0..=self.watermark_interval_jitter_ms)
75 };
76 tokio::time::Instant::now() + Duration::from_millis(self.watermark_interval_ms + jitter)
77 }
78}
79
80impl<P: Processor> IndexedCheckpoint<P> {
81 fn new(
82 epoch: u64,
83 cp_sequence_number: u64,
84 tx_hi: u64,
85 timestamp_ms: u64,
86 values: Vec<P::Value>,
87 ) -> Self {
88 Self {
89 watermark: CommitterWatermark {
90 epoch_hi_inclusive: epoch,
91 checkpoint_hi_inclusive: cp_sequence_number,
92 tx_hi,
93 timestamp_ms_hi_inclusive: timestamp_ms,
94 },
95 values,
96 }
97 }
98
99 fn len(&self) -> usize {
101 self.values.len()
102 }
103
104 fn checkpoint(&self) -> u64 {
106 self.watermark.checkpoint_hi_inclusive
107 }
108}
109
110impl WatermarkPart {
111 fn checkpoint(&self) -> u64 {
112 self.watermark.checkpoint_hi_inclusive
113 }
114
115 fn timestamp_ms(&self) -> u64 {
116 self.watermark.timestamp_ms_hi_inclusive
117 }
118
119 fn is_complete(&self) -> bool {
121 self.batch_rows == self.total_rows
122 }
123
124 fn add(&mut self, other: WatermarkPart) {
126 debug_assert_eq!(self.checkpoint(), other.checkpoint());
127 self.batch_rows += other.batch_rows;
128 }
129
130 fn take(&mut self, rows: usize) -> WatermarkPart {
132 debug_assert!(
133 self.batch_rows >= rows,
134 "Can't take more rows than are available"
135 );
136
137 self.batch_rows -= rows;
138 WatermarkPart {
139 watermark: self.watermark,
140 batch_rows: rows,
141 total_rows: self.total_rows,
142 }
143 }
144}
145
146impl Default for CommitterConfig {
147 fn default() -> Self {
148 Self {
149 write_concurrency: 5,
150 collect_interval_ms: 500,
151 watermark_interval_ms: 500,
152 watermark_interval_jitter_ms: 0,
153 }
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use async_trait::async_trait;
161 use std::sync::Arc;
162 use sui_types::full_checkpoint_content::Checkpoint;
163
164 struct TestProcessor;
166 #[async_trait]
167 impl Processor for TestProcessor {
168 const NAME: &'static str = "test";
169 type Value = i32;
170
171 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
172 Ok(vec![1, 2, 3])
173 }
174 }
175
176 #[test]
177 fn test_watermark_part_getters() {
178 let watermark = CommitterWatermark {
179 epoch_hi_inclusive: 1,
180 checkpoint_hi_inclusive: 100,
181 tx_hi: 1000,
182 timestamp_ms_hi_inclusive: 1234567890,
183 };
184
185 let part = WatermarkPart {
186 watermark,
187 batch_rows: 50,
188 total_rows: 200,
189 };
190
191 assert_eq!(part.checkpoint(), 100);
192 assert_eq!(part.timestamp_ms(), 1234567890);
193 }
194
195 #[test]
196 fn test_watermark_part_is_complete() {
197 let part = WatermarkPart {
198 watermark: CommitterWatermark::default(),
199 batch_rows: 200,
200 total_rows: 200,
201 };
202
203 assert!(part.is_complete());
204 }
205
206 #[test]
207 fn test_watermark_part_is_not_complete() {
208 let part = WatermarkPart {
209 watermark: CommitterWatermark::default(),
210 batch_rows: 199,
211 total_rows: 200,
212 };
213
214 assert!(!part.is_complete());
215 }
216
217 #[test]
218 fn test_watermark_part_becomes_complete_after_adding_new_batch() {
219 let mut part = WatermarkPart {
220 watermark: CommitterWatermark::default(),
221 batch_rows: 199,
222 total_rows: 200,
223 };
224
225 part.add(WatermarkPart {
227 watermark: CommitterWatermark::default(),
228 batch_rows: 1,
229 total_rows: 200,
230 });
231
232 assert!(part.is_complete());
233 assert_eq!(part.batch_rows, 200);
234 }
235
236 #[test]
237 fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
238 let mut part = WatermarkPart {
239 watermark: CommitterWatermark::default(),
240 batch_rows: 200,
241 total_rows: 200,
242 };
243 assert!(part.is_complete(), "Initial part should be complete");
244
245 let extracted_part = part.take(10);
247
248 assert!(!extracted_part.is_complete());
250 assert_eq!(extracted_part.batch_rows, 10);
251 assert_eq!(extracted_part.total_rows, 200);
252 }
253
254 #[test]
255 fn test_indexed_checkpoint() {
256 let epoch = 1;
257 let cp_sequence_number = 100;
258 let tx_hi = 1000;
259 let timestamp_ms = 1234567890;
260 let values = vec![1, 2, 3];
261
262 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
263 epoch,
264 cp_sequence_number,
265 tx_hi,
266 timestamp_ms,
267 values,
268 );
269
270 assert_eq!(checkpoint.len(), 3);
271 assert_eq!(checkpoint.checkpoint(), 100);
272 }
273
274 #[test]
275 fn test_indexed_checkpoint_with_empty_values() {
276 let epoch = 1;
277 let cp_sequence_number = 100;
278 let tx_hi = 1000;
279 let timestamp_ms = 1234567890;
280 let values: Vec<<TestProcessor as Processor>::Value> = vec![];
281
282 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
283 epoch,
284 cp_sequence_number,
285 tx_hi,
286 timestamp_ms,
287 values,
288 );
289
290 assert_eq!(checkpoint.len(), 0);
291 assert_eq!(checkpoint.checkpoint(), 100);
292 }
293}