sui_indexer_alt_framework/pipeline/
mod.rsuse std::time::Duration;
pub use processor::Processor;
use serde::{Deserialize, Serialize};
use crate::store::CommitterWatermark;
pub mod concurrent;
mod logging;
mod processor;
pub mod sequential;
const PIPELINE_BUFFER: usize = 5;
const WARN_PENDING_WATERMARKS: usize = 10000;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CommitterConfig {
pub write_concurrency: usize,
pub collect_interval_ms: u64,
pub watermark_interval_ms: u64,
}
struct IndexedCheckpoint<P: Processor> {
values: Vec<P::Value>,
watermark: CommitterWatermark,
}
#[derive(Debug, Clone)]
struct WatermarkPart {
watermark: CommitterWatermark,
batch_rows: usize,
total_rows: usize,
}
#[derive(thiserror::Error, Debug)]
enum Break {
#[error("Shutdown received")]
Cancel,
#[error(transparent)]
Err(#[from] anyhow::Error),
}
impl CommitterConfig {
pub fn collect_interval(&self) -> Duration {
Duration::from_millis(self.collect_interval_ms)
}
pub fn watermark_interval(&self) -> Duration {
Duration::from_millis(self.watermark_interval_ms)
}
}
impl<P: Processor> IndexedCheckpoint<P> {
fn new(
epoch: u64,
cp_sequence_number: u64,
tx_hi: u64,
timestamp_ms: u64,
values: Vec<P::Value>,
) -> Self {
Self {
watermark: CommitterWatermark {
epoch_hi_inclusive: epoch,
checkpoint_hi_inclusive: cp_sequence_number,
tx_hi,
timestamp_ms_hi_inclusive: timestamp_ms,
},
values,
}
}
fn len(&self) -> usize {
self.values.len()
}
fn checkpoint(&self) -> u64 {
self.watermark.checkpoint_hi_inclusive
}
}
impl WatermarkPart {
fn checkpoint(&self) -> u64 {
self.watermark.checkpoint_hi_inclusive
}
fn timestamp_ms(&self) -> u64 {
self.watermark.timestamp_ms_hi_inclusive
}
fn is_complete(&self) -> bool {
self.batch_rows == self.total_rows
}
fn add(&mut self, other: WatermarkPart) {
debug_assert_eq!(self.checkpoint(), other.checkpoint());
self.batch_rows += other.batch_rows;
}
fn take(&mut self, rows: usize) -> WatermarkPart {
debug_assert!(
self.batch_rows >= rows,
"Can't take more rows than are available"
);
self.batch_rows -= rows;
WatermarkPart {
watermark: self.watermark,
batch_rows: rows,
total_rows: self.total_rows,
}
}
}
impl Default for CommitterConfig {
fn default() -> Self {
Self {
write_concurrency: 5,
collect_interval_ms: 500,
watermark_interval_ms: 500,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use sui_types::full_checkpoint_content::CheckpointData;
struct TestProcessor;
impl Processor for TestProcessor {
const NAME: &'static str = "test";
type Value = i32;
fn process(&self, _checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
Ok(vec![1, 2, 3])
}
}
#[test]
fn test_watermark_part_getters() {
let watermark = CommitterWatermark {
epoch_hi_inclusive: 1,
checkpoint_hi_inclusive: 100,
tx_hi: 1000,
timestamp_ms_hi_inclusive: 1234567890,
};
let part = WatermarkPart {
watermark,
batch_rows: 50,
total_rows: 200,
};
assert_eq!(part.checkpoint(), 100);
assert_eq!(part.timestamp_ms(), 1234567890);
}
#[test]
fn test_watermark_part_is_complete() {
let part = WatermarkPart {
watermark: CommitterWatermark::default(),
batch_rows: 200,
total_rows: 200,
};
assert!(part.is_complete());
}
#[test]
fn test_watermark_part_is_not_complete() {
let part = WatermarkPart {
watermark: CommitterWatermark::default(),
batch_rows: 199,
total_rows: 200,
};
assert!(!part.is_complete());
}
#[test]
fn test_watermark_part_becomes_complete_after_adding_new_batch() {
let mut part = WatermarkPart {
watermark: CommitterWatermark::default(),
batch_rows: 199,
total_rows: 200,
};
part.add(WatermarkPart {
watermark: CommitterWatermark::default(),
batch_rows: 1,
total_rows: 200,
});
assert!(part.is_complete());
assert_eq!(part.batch_rows, 200);
}
#[test]
fn test_watermark_part_becomes_incomplete_after_taking_away_batch() {
let mut part = WatermarkPart {
watermark: CommitterWatermark::default(),
batch_rows: 200,
total_rows: 200,
};
assert!(part.is_complete(), "Initial part should be complete");
let extracted_part = part.take(10);
assert!(!extracted_part.is_complete());
assert_eq!(extracted_part.batch_rows, 10);
assert_eq!(extracted_part.total_rows, 200);
}
#[test]
fn test_indexed_checkpoint() {
let epoch = 1;
let cp_sequence_number = 100;
let tx_hi = 1000;
let timestamp_ms = 1234567890;
let values = vec![1, 2, 3];
let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
epoch,
cp_sequence_number,
tx_hi,
timestamp_ms,
values,
);
assert_eq!(checkpoint.len(), 3);
assert_eq!(checkpoint.checkpoint(), 100);
}
#[test]
fn test_indexed_checkpoint_with_empty_values() {
let epoch = 1;
let cp_sequence_number = 100;
let tx_hi = 1000;
let timestamp_ms = 1234567890;
let values: Vec<<TestProcessor as Processor>::Value> = vec![];
let checkpoint = IndexedCheckpoint::<TestProcessor>::new(
epoch,
cp_sequence_number,
tx_hi,
timestamp_ms,
values,
);
assert_eq!(checkpoint.len(), 0);
assert_eq!(checkpoint.checkpoint(), 100);
}
}