sui_indexer_alt_framework/pipeline/
mod.rs1use std::time::Duration;
5
6pub use processor::Processor;
7use serde::{Deserialize, Serialize};
8
9use crate::store::CommitterWatermark;
10
11pub mod concurrent;
12mod logging;
13mod processor;
14pub mod sequential;
15
16const PIPELINE_BUFFER: usize = 5;
19
20const WARN_PENDING_WATERMARKS: usize = 10000;
25
26#[derive(Serialize, Deserialize, Debug, Clone)]
27pub struct CommitterConfig {
28 pub write_concurrency: usize,
30
31 pub collect_interval_ms: u64,
33
34 pub watermark_interval_ms: u64,
36}
37
38struct IndexedCheckpoint<P: Processor> {
41 values: Vec<P::Value>,
43 watermark: CommitterWatermark,
45}
46
47#[derive(Debug, Clone)]
49struct WatermarkPart {
50 watermark: CommitterWatermark,
52 batch_rows: usize,
54 total_rows: usize,
56}
57
58impl CommitterConfig {
59 pub fn collect_interval(&self) -> Duration {
60 Duration::from_millis(self.collect_interval_ms)
61 }
62
63 pub fn watermark_interval(&self) -> Duration {
64 Duration::from_millis(self.watermark_interval_ms)
65 }
66}
67
68impl<P: Processor> IndexedCheckpoint<P> {
69 fn new(
70 epoch: u64,
71 cp_sequence_number: u64,
72 tx_hi: u64,
73 timestamp_ms: u64,
74 values: Vec<P::Value>,
75 ) -> Self {
76 Self {
77 watermark: CommitterWatermark {
78 epoch_hi_inclusive: epoch,
79 checkpoint_hi_inclusive: cp_sequence_number,
80 tx_hi,
81 timestamp_ms_hi_inclusive: timestamp_ms,
82 },
83 values,
84 }
85 }
86
87 fn len(&self) -> usize {
89 self.values.len()
90 }
91
92 fn checkpoint(&self) -> u64 {
94 self.watermark.checkpoint_hi_inclusive
95 }
96}
97
98impl WatermarkPart {
99 fn checkpoint(&self) -> u64 {
100 self.watermark.checkpoint_hi_inclusive
101 }
102
103 fn timestamp_ms(&self) -> u64 {
104 self.watermark.timestamp_ms_hi_inclusive
105 }
106
107 fn is_complete(&self) -> bool {
109 self.batch_rows == self.total_rows
110 }
111
112 fn add(&mut self, other: WatermarkPart) {
114 debug_assert_eq!(self.checkpoint(), other.checkpoint());
115 self.batch_rows += other.batch_rows;
116 }
117
118 fn take(&mut self, rows: usize) -> WatermarkPart {
120 debug_assert!(
121 self.batch_rows >= rows,
122 "Can't take more rows than are available"
123 );
124
125 self.batch_rows -= rows;
126 WatermarkPart {
127 watermark: self.watermark,
128 batch_rows: rows,
129 total_rows: self.total_rows,
130 }
131 }
132}
133
134impl Default for CommitterConfig {
135 fn default() -> Self {
136 Self {
137 write_concurrency: 5,
138 collect_interval_ms: 500,
139 watermark_interval_ms: 500,
140 }
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use async_trait::async_trait;
148 use std::sync::Arc;
149 use sui_types::full_checkpoint_content::Checkpoint;
150
151 struct TestProcessor;
153 #[async_trait]
154 impl Processor for TestProcessor {
155 const NAME: &'static str = "test";
156 type Value = i32;
157
158 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
159 Ok(vec![1, 2, 3])
160 }
161 }
162
163 #[test]
164 fn test_watermark_part_getters() {
165 let watermark = CommitterWatermark {
166 epoch_hi_inclusive: 1,
167 checkpoint_hi_inclusive: 100,
168 tx_hi: 1000,
169 timestamp_ms_hi_inclusive: 1234567890,
170 };
171
172 let part = WatermarkPart {
173 watermark,
174 batch_rows: 50,
175 total_rows: 200,
176 };
177
178 assert_eq!(part.checkpoint(), 100);
179 assert_eq!(part.timestamp_ms(), 1234567890);
180 }
181
182 #[test]
183 fn test_watermark_part_is_complete() {
184 let part = WatermarkPart {
185 watermark: CommitterWatermark::default(),
186 batch_rows: 200,
187 total_rows: 200,
188 };
189
190 assert!(part.is_complete());
191 }
192
193 #[test]
194 fn test_watermark_part_is_not_complete() {
195 let part = WatermarkPart {
196 watermark: CommitterWatermark::default(),
197 batch_rows: 199,
198 total_rows: 200,
199 };
200
201 assert!(!part.is_complete());
202 }
203
204 #[test]
205 fn test_watermark_part_becomes_complete_after_adding_new_batch() {
206 let mut part = WatermarkPart {
207 watermark: CommitterWatermark::default(),
208 batch_rows: 199,
209 total_rows: 200,
210 };
211
212 part.add(WatermarkPart {
214 watermark: CommitterWatermark::default(),
215 batch_rows: 1,
216 total_rows: 200,
217 });
218
219 assert!(part.is_complete());
220 assert_eq!(part.batch_rows, 200);
221 }
222
223 #[test]
224 fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
225 let mut part = WatermarkPart {
226 watermark: CommitterWatermark::default(),
227 batch_rows: 200,
228 total_rows: 200,
229 };
230 assert!(part.is_complete(), "Initial part should be complete");
231
232 let extracted_part = part.take(10);
234
235 assert!(!extracted_part.is_complete());
237 assert_eq!(extracted_part.batch_rows, 10);
238 assert_eq!(extracted_part.total_rows, 200);
239 }
240
241 #[test]
242 fn test_indexed_checkpoint() {
243 let epoch = 1;
244 let cp_sequence_number = 100;
245 let tx_hi = 1000;
246 let timestamp_ms = 1234567890;
247 let values = vec![1, 2, 3];
248
249 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
250 epoch,
251 cp_sequence_number,
252 tx_hi,
253 timestamp_ms,
254 values,
255 );
256
257 assert_eq!(checkpoint.len(), 3);
258 assert_eq!(checkpoint.checkpoint(), 100);
259 }
260
261 #[test]
262 fn test_indexed_checkpoint_with_empty_values() {
263 let epoch = 1;
264 let cp_sequence_number = 100;
265 let tx_hi = 1000;
266 let timestamp_ms = 1234567890;
267 let values: Vec<<TestProcessor as Processor>::Value> = vec![];
268
269 let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
270 epoch,
271 cp_sequence_number,
272 tx_hi,
273 timestamp_ms,
274 values,
275 );
276
277 assert_eq!(checkpoint.len(), 0);
278 assert_eq!(checkpoint.checkpoint(), 100);
279 }
280}