pub trait Handler: Processor {
type Store: Store;
type Batch: Default + Send + Sync + 'static;
const MIN_EAGER_ROWS: usize = 50usize;
const MAX_PENDING_ROWS: usize = 5_000usize;
const MAX_WATERMARK_UPDATES: usize = 10_000usize;
// Required methods
fn batch(
&self,
batch: &mut Self::Batch,
values: &mut IntoIter<Self::Value>,
) -> BatchStatus;
fn commit<'a, 'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
batch: &'life1 Self::Batch,
conn: &'life2 mut <Self::Store as Store>::Connection<'a>,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
where Self: 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
// Provided method
fn prune<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
_from: u64,
_to_exclusive: u64,
_conn: &'life1 mut <Self::Store as Store>::Connection<'a>,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
where Self: Sync + 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
}Expand description
Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by implementing Processor) into rows for their table, and how to write those rows to the database.
The handler is also responsible for tuning the various parameters of the pipeline (provided as associated values). Reasonable defaults have been chosen to balance concurrency with memory usage, but each handle may choose to override these defaults, e.g.
- Handlers that produce many small rows may wish to increase their batch/chunk/max-pending sizes).
- Handlers that do more work during processing may wish to increase their fanout so more of it can be done concurrently, to preserve throughput.
Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is processed and committed out-of-order and a watermark table is kept up-to-date with the latest checkpoint below which all data has been committed.
Back-pressure is handled through the MAX_PENDING_SIZE constant – if more than this many rows
build up, the collector will stop accepting new checkpoints, which will eventually propagate
back to the ingestion service.
Provided Associated Constants§
Sourceconst MIN_EAGER_ROWS: usize = 50usize
const MIN_EAGER_ROWS: usize = 50usize
If at least this many rows are pending, the committer will commit them eagerly.
Sourceconst MAX_PENDING_ROWS: usize = 5_000usize
const MAX_PENDING_ROWS: usize = 5_000usize
If there are more than this many rows pending, the committer applies backpressure.
Sourceconst MAX_WATERMARK_UPDATES: usize = 10_000usize
const MAX_WATERMARK_UPDATES: usize = 10_000usize
The maximum number of watermarks that can show up in a single batch. This limit exists to deal with pipelines that produce no data for a majority of checkpoints – the size of these pipeline’s batches will be dominated by watermark updates.
Required Associated Types§
Required Methods§
Sourcefn batch(
&self,
batch: &mut Self::Batch,
values: &mut IntoIter<Self::Value>,
) -> BatchStatus
fn batch( &self, batch: &mut Self::Batch, values: &mut IntoIter<Self::Value>, ) -> BatchStatus
Add values from the iterator to the batch. The implementation may take all, some, or none
of the values from the iterator by calling .next().
Returns BatchStatus::Ready if the batch is full and should be committed,
or BatchStatus::Pending if the batch can accept more values.
Note: The handler can signal batch readiness via BatchStatus::Ready, but the framework
may also decide to commit a batch based on the trait parameters above.
Sourcefn commit<'a, 'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
batch: &'life1 Self::Batch,
conn: &'life2 mut <Self::Store as Store>::Connection<'a>,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>where
Self: 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn commit<'a, 'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
batch: &'life1 Self::Batch,
conn: &'life2 mut <Self::Store as Store>::Connection<'a>,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>where
Self: 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Commit the batch to the database, returning the number of rows affected.
Provided Methods§
Sourcefn prune<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
_from: u64,
_to_exclusive: u64,
_conn: &'life1 mut <Self::Store as Store>::Connection<'a>,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn prune<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
_from: u64,
_to_exclusive: u64,
_conn: &'life1 mut <Self::Store as Store>::Connection<'a>,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Clean up data between checkpoints _from and _to_exclusive (exclusive) in the database, returning
the number of rows affected. This function is optional, and defaults to not pruning at all.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.