pub trait Handler: Processor {
    type Store: Store;
    type Batch: Default + Send + Sync + 'static;

    const MIN_EAGER_ROWS: usize = 50usize;
    const MAX_PENDING_ROWS: usize = 5_000usize;
    const MAX_WATERMARK_UPDATES: usize = 10_000usize;

    // Required methods
    fn batch(
        &self,
        batch: &mut Self::Batch,
        values: &mut IntoIter<Self::Value>,
    ) -> BatchStatus;
    fn commit<'a, 'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        batch: &'life1 Self::Batch,
        conn: &'life2 mut <Self::Store as Store>::Connection<'a>,
    ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'a: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;

    // Provided method
    fn prune<'a, 'life0, 'life1, 'async_trait>(
        &'life0 self,
        _from: u64,
        _to_exclusive: u64,
        _conn: &'life1 mut <Self::Store as Store>::Connection<'a>,
    ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
       where Self: Sync + 'async_trait,
             'a: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait { ... }
}
Expand description

Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by implementing Processor) into rows for their table, and how to write those rows to the database.

The handler is also responsible for tuning the various parameters of the pipeline (provided as associated values). Reasonable defaults have been chosen to balance concurrency with memory usage, but each handle may choose to override these defaults, e.g.

  • Handlers that produce many small rows may wish to increase their batch/chunk/max-pending sizes).
  • Handlers that do more work during processing may wish to increase their fanout so more of it can be done concurrently, to preserve throughput.

Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is processed and committed out-of-order and a watermark table is kept up-to-date with the latest checkpoint below which all data has been committed.

Back-pressure is handled through the MAX_PENDING_SIZE constant – if more than this many rows build up, the collector will stop accepting new checkpoints, which will eventually propagate back to the ingestion service.

Provided Associated Constants§

Source

const MIN_EAGER_ROWS: usize = 50usize

If at least this many rows are pending, the committer will commit them eagerly.

Source

const MAX_PENDING_ROWS: usize = 5_000usize

If there are more than this many rows pending, the committer applies backpressure.

Source

const MAX_WATERMARK_UPDATES: usize = 10_000usize

The maximum number of watermarks that can show up in a single batch. This limit exists to deal with pipelines that produce no data for a majority of checkpoints – the size of these pipeline’s batches will be dominated by watermark updates.

Required Associated Types§

Source

type Store: Store

Source

type Batch: Default + Send + Sync + 'static

Required Methods§

Source

fn batch( &self, batch: &mut Self::Batch, values: &mut IntoIter<Self::Value>, ) -> BatchStatus

Add values from the iterator to the batch. The implementation may take all, some, or none of the values from the iterator by calling .next().

Returns BatchStatus::Ready if the batch is full and should be committed, or BatchStatus::Pending if the batch can accept more values.

Note: The handler can signal batch readiness via BatchStatus::Ready, but the framework may also decide to commit a batch based on the trait parameters above.

Source

fn commit<'a, 'life0, 'life1, 'life2, 'async_trait>( &'life0 self, batch: &'life1 Self::Batch, conn: &'life2 mut <Self::Store as Store>::Connection<'a>, ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
where Self: 'async_trait, 'a: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Commit the batch to the database, returning the number of rows affected.

Provided Methods§

Source

fn prune<'a, 'life0, 'life1, 'async_trait>( &'life0 self, _from: u64, _to_exclusive: u64, _conn: &'life1 mut <Self::Store as Store>::Connection<'a>, ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
where Self: Sync + 'async_trait, 'a: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Clean up data between checkpoints _from and _to_exclusive (exclusive) in the database, returning the number of rows affected. This function is optional, and defaults to not pruning at all.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<H> Handler for H
where H: Handler + Send + Sync + 'static, H::Value: FieldCount + Send + Sync,

Blanket implementation of the framework’s Handler trait for any type implementing the Postgres-specific Handler trait.

Source§

const MIN_EAGER_ROWS: usize = H::MIN_EAGER_ROWS

Source§

const MAX_PENDING_ROWS: usize = H::MAX_PENDING_ROWS

Source§

const MAX_WATERMARK_UPDATES: usize = H::MAX_WATERMARK_UPDATES

Source§

type Store = Db

Source§

type Batch = Vec<<H as Processor>::Value>