sui_indexer_alt_framework/pipeline/
mod.rs1use std::time::Duration;
5
6pub use processor::Processor;
7use serde::{Deserialize, Serialize};
8
9use crate::store::CommitterWatermark;
10
11pub mod concurrent;
12mod logging;
13mod processor;
14pub mod sequential;
15
16const PIPELINE_BUFFER: usize = 5;
19
20const WARN_PENDING_WATERMARKS: usize = 10000;
25
26#[derive(Serialize, Deserialize, Debug, Clone)]
27pub struct CommitterConfig {
28 pub write_concurrency: usize,
30
31 pub collect_interval_ms: u64,
33
34 pub watermark_interval_ms: u64,
36}
37
38struct IndexedCheckpoint<P: Processor> {
41 values: Vec<P::Value>,
43 watermark: CommitterWatermark,
45}
46
47#[derive(Debug, Clone)]
49struct WatermarkPart {
50 watermark: CommitterWatermark,
52 batch_rows: usize,
54 total_rows: usize,
56}
57
58#[derive(thiserror::Error, Debug)]
61enum Break {
62 #[error("Shutdown received")]
63 Cancel,
64
65 #[error(transparent)]
66 Err(#[from] anyhow::Error),
67}
68
69impl CommitterConfig {
70 pub fn collect_interval(&self) -> Duration {
71 Duration::from_millis(self.collect_interval_ms)
72 }
73
74 pub fn watermark_interval(&self) -> Duration {
75 Duration::from_millis(self.watermark_interval_ms)
76 }
77}
78
79impl<P: Processor> IndexedCheckpoint<P> {
80 fn new(
81 epoch: u64,
82 cp_sequence_number: u64,
83 tx_hi: u64,
84 timestamp_ms: u64,
85 values: Vec<P::Value>,
86 ) -> Self {
87 Self {
88 watermark: CommitterWatermark {
89 epoch_hi_inclusive: epoch,
90 checkpoint_hi_inclusive: cp_sequence_number,
91 tx_hi,
92 timestamp_ms_hi_inclusive: timestamp_ms,
93 },
94 values,
95 }
96 }
97
98 fn len(&self) -> usize {
100 self.values.len()
101 }
102
103 fn checkpoint(&self) -> u64 {
105 self.watermark.checkpoint_hi_inclusive
106 }
107}
108
109impl WatermarkPart {
110 fn checkpoint(&self) -> u64 {
111 self.watermark.checkpoint_hi_inclusive
112 }
113
114 fn timestamp_ms(&self) -> u64 {
115 self.watermark.timestamp_ms_hi_inclusive
116 }
117
118 fn is_complete(&self) -> bool {
120 self.batch_rows == self.total_rows
121 }
122
123 fn add(&mut self, other: WatermarkPart) {
125 debug_assert_eq!(self.checkpoint(), other.checkpoint());
126 self.batch_rows += other.batch_rows;
127 }
128
129 fn take(&mut self, rows: usize) -> WatermarkPart {
131 debug_assert!(
132 self.batch_rows >= rows,
133 "Can't take more rows than are available"
134 );
135
136 self.batch_rows -= rows;
137 WatermarkPart {
138 watermark: self.watermark,
139 batch_rows: rows,
140 total_rows: self.total_rows,
141 }
142 }
143}
144
145impl Default for CommitterConfig {
146 fn default() -> Self {
147 Self {
148 write_concurrency: 5,
149 collect_interval_ms: 500,
150 watermark_interval_ms: 500,
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use async_trait::async_trait;
159 use std::sync::Arc;
160 use sui_types::full_checkpoint_content::Checkpoint;
161
162 struct TestProcessor;
164 #[async_trait]
165 impl Processor for TestProcessor {
166 const NAME: &'static str = "test";
167 type Value = i32;
168
169 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
170 Ok(vec![1, 2, 3])
171 }
172 }
173
174 #[test]
175 fn test_watermark_part_getters() {
176 let watermark = CommitterWatermark {
177 epoch_hi_inclusive: 1,
178 checkpoint_hi_inclusive: 100,
179 tx_hi: 1000,
180 timestamp_ms_hi_inclusive: 1234567890,
181 };
182
183 let part = WatermarkPart {
184 watermark,
185 batch_rows: 50,
186 total_rows: 200,
187 };
188
189 assert_eq!(part.checkpoint(), 100);
190 assert_eq!(part.timestamp_ms(), 1234567890);
191 }
192
193 #[test]
194 fn test_watermark_part_is_complete() {
195 let part = WatermarkPart {
196 watermark: CommitterWatermark::default(),
197 batch_rows: 200,
198 total_rows: 200,
199 };
200
201 assert!(part.is_complete());
202 }
203
204 #[test]
205 fn test_watermark_part_is_not_complete() {
206 let part = WatermarkPart {
207 watermark: CommitterWatermark::default(),
208 batch_rows: 199,
209 total_rows: 200,
210 };
211
212 assert!(!part.is_complete());
213 }
214
215 #[test]
216 fn test_watermark_part_becomes_complete_after_adding_new_batch() {
217 let mut part = WatermarkPart {
218 watermark: CommitterWatermark::default(),
219 batch_rows: 199,
220 total_rows: 200,
221 };
222
223 part.add(WatermarkPart {
225 watermark: CommitterWatermark::default(),
226 batch_rows: 1,
227 total_rows: 200,
228 });
229
230 assert!(part.is_complete());
231 assert_eq!(part.batch_rows, 200);
232 }
233
234 #[test]
235 fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
236 let mut part = WatermarkPart {
237 watermark: CommitterWatermark::default(),
238 batch_rows: 200,
239 total_rows: 200,
240 };
241 assert!(part.is_complete(), "Initial part should be complete");
242
243 let extracted_part = part.take(10);
245
246 assert!(!extracted_part.is_complete());
248 assert_eq!(extracted_part.batch_rows, 10);
249 assert_eq!(extracted_part.total_rows, 200);
250 }
251
252 #[test]
253 fn test_indexed_checkpoint() {
254 let epoch = 1;
255 let cp_sequence_number = 100;
256 let tx_hi = 1000;
257 let timestamp_ms = 1234567890;
258 let values = vec![1, 2, 3];
259
260 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
261 epoch,
262 cp_sequence_number,
263 tx_hi,
264 timestamp_ms,
265 values,
266 );
267
268 assert_eq!(checkpoint.len(), 3);
269 assert_eq!(checkpoint.checkpoint(), 100);
270 }
271
272 #[test]
273 fn test_indexed_checkpoint_with_empty_values() {
274 let epoch = 1;
275 let cp_sequence_number = 100;
276 let tx_hi = 1000;
277 let timestamp_ms = 1234567890;
278 let values: Vec<<TestProcessor as Processor>::Value> = vec![];
279
280 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
281 epoch,
282 cp_sequence_number,
283 tx_hi,
284 timestamp_ms,
285 values,
286 );
287
288 assert_eq!(checkpoint.len(), 0);
289 assert_eq!(checkpoint.checkpoint(), 100);
290 }
291}