pub trait Handler: Processor {
    type Store: TransactionalStore;
    type Batch: Default + Send + Sync + 'static;

    const MIN_EAGER_ROWS: usize = 50usize;
    const MAX_BATCH_CHECKPOINTS: usize = 300usize;

    // Required methods
    fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>);
    fn commit<'a, 'life0, 'life1, 'async_trait>(
        batch: &'life0 Self::Batch,
        conn: &'life1 mut <Self::Store as Store>::Connection<'a>,
    ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'a: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
}
Expand description

Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by implementing Processor) into rows for their table, how to combine multiple rows into a single DB operation, and then how to write those rows atomically to the database.

The handler is also responsible for tuning the various parameters of the pipeline (provided as associated values).

Sequential handlers can only be used in sequential pipelines, where checkpoint data is processed out-of-order, but then gathered and written in order. If multiple checkpoints are available, the pipeline will attempt to combine their writes taking advantage of batching to avoid emitting redundant writes.

Back-pressure is handled by setting a high watermark on the ingestion service: The pipeline notifies the ingestion service of the checkpoint it last successfully wrote to the database for, and in turn the ingestion service will only run ahead by its buffer size. This guarantees liveness and limits the amount of memory the pipeline can consume, by bounding the number of checkpoints that can be received before the next checkpoint.

Provided Associated Constants§

Source

const MIN_EAGER_ROWS: usize = 50usize

If at least this many rows are pending, the committer will commit them eagerly.

Source

const MAX_BATCH_CHECKPOINTS: usize = 300usize

Maximum number of checkpoints to try and write in a single batch. The larger this number is, the more chances the pipeline has to merge redundant writes, but the longer each write transaction is likely to be.

Required Associated Types§

Source

type Store: TransactionalStore

Source

type Batch: Default + Send + Sync + 'static

A type to combine multiple Self::Value-s into. This can be used to avoid redundant writes by combining multiple rows into one (e.g. if one row supersedes another, the latter can be omitted).

Required Methods§

Source

fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>)

Add values from processing a checkpoint to the current batch. Checkpoints are guaranteed to be presented to the batch in checkpoint order.

Source

fn commit<'a, 'life0, 'life1, 'async_trait>( batch: &'life0 Self::Batch, conn: &'life1 mut <Self::Store as Store>::Connection<'a>, ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
where Self: 'async_trait, 'a: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Take a batch of values and commit them to the database, returning the number of rows affected.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§