sui_indexer_alt_framework/postgres/
handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Postgres-specific handler trait for concurrent indexing pipelines.
5//!
6//! This module provides an interface for handlers that need to respect
7//! PostgreSQL's bind parameter limit (32,767 parameters per query). When inserting multiple rows,
8//! each field becomes a bind parameter, so the maximum number of rows per batch is:
9//!
10//! ```text
11//! max_rows = 32,767 / fields_per_row
12//! ```
13//!
14//! The `Handler` trait in this module extends the framework's concurrent Handler trait with
15//! Postgres-specific batching logic that automatically handles this limitation.
16//!
17//! # TODO: Consider moving FieldCount to sui-pg-db
18//!
19//! Currently, FieldCount lives in this framework crate but is Postgres-specific. Ideally it should
20//! live in sui-pg-db. However, this creates a circular dependency:
21//! - sui-indexer-alt-framework depends on sui-pg-db (for IndexerCluster and other utilities)
22//! - This blanket impl needs FieldCount to implement concurrent::Handler for postgres indexers
23//! - Moving FieldCount to sui-pg-db would require framework to depend on sui-pg-db (circular!)
24//!
25//! To fully decouple, we'd need to move all postgres-specific code (including IndexerCluster) to
26//! sui-pg-db, which would be a much larger breaking change. Consider this for a future refactor.
27//!
28//! See: <https://github.com/MystenLabs/sui/pull/24055#issuecomment-3471278182>
29
30use async_trait::async_trait;
31
32use crate::pipeline::Processor;
33use crate::pipeline::concurrent;
34use crate::postgres::Connection;
35use crate::postgres::Db;
36use crate::postgres::FieldCount;
37
38/// Postgres-specific handler trait for concurrent indexing pipelines.
39///
40/// The trait automatically implements the framework's Handler trait with a PgBatch that respects
41/// the 32,767 bind parameter limit.
42#[async_trait]
43pub trait Handler: Processor<Value: FieldCount> {
44    /// If at least this many rows are pending, the committer will commit them eagerly.
45    const MIN_EAGER_ROWS: usize = 50;
46
47    /// If there are more than this many rows pending, the committer applies backpressure.
48    const MAX_PENDING_ROWS: usize = 5000;
49
50    /// The maximum number of watermarks that can show up in a single batch.
51    const MAX_WATERMARK_UPDATES: usize = 10_000;
52
53    /// Take a chunk of values and commit them to the database, returning the number of rows
54    /// affected.
55    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>)
56    -> anyhow::Result<usize>;
57
58    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database,
59    /// returning the number of rows affected. This function is optional, and defaults to not
60    /// pruning at all.
61    async fn prune<'a>(
62        &self,
63        _from: u64,
64        _to_exclusive: u64,
65        _conn: &mut Connection<'a>,
66    ) -> anyhow::Result<usize> {
67        Ok(0)
68    }
69}
70
71/// Calculate the maximum number of rows that can be inserted in a single batch,
72/// given the number of fields per row.
73const fn max_chunk_rows<T: FieldCount>() -> usize {
74    if T::FIELD_COUNT == 0 {
75        i16::MAX as usize
76    } else {
77        i16::MAX as usize / T::FIELD_COUNT
78    }
79}
80
81/// Blanket implementation of the framework's Handler trait for any type implementing the
82/// Postgres-specific Handler trait.
83#[async_trait]
84impl<H> concurrent::Handler for H
85where
86    H: Handler + Send + Sync + 'static,
87    H::Value: FieldCount + Send + Sync,
88{
89    type Store = Db;
90    type Batch = Vec<H::Value>;
91
92    const MIN_EAGER_ROWS: usize = H::MIN_EAGER_ROWS;
93    const MAX_PENDING_ROWS: usize = H::MAX_PENDING_ROWS;
94    const MAX_WATERMARK_UPDATES: usize = H::MAX_WATERMARK_UPDATES;
95
96    fn batch(
97        &self,
98        batch: &mut Self::Batch,
99        values: &mut std::vec::IntoIter<Self::Value>,
100    ) -> crate::pipeline::concurrent::BatchStatus {
101        let max_chunk_rows = max_chunk_rows::<H::Value>();
102        let current_len = batch.len();
103
104        if current_len + values.len() > max_chunk_rows {
105            // Batch would exceed the limit, take only what fits
106            let remaining_capacity = max_chunk_rows - current_len;
107            batch.extend(values.take(remaining_capacity));
108            crate::pipeline::concurrent::BatchStatus::Ready
109        } else {
110            // All values fit, take them all
111            batch.extend(values);
112            crate::pipeline::concurrent::BatchStatus::Pending
113        }
114    }
115
116    async fn commit<'a>(
117        &self,
118        batch: &Self::Batch,
119        conn: &mut <Self::Store as crate::store::Store>::Connection<'a>,
120    ) -> anyhow::Result<usize> {
121        H::commit(batch, conn).await
122    }
123
124    async fn prune<'a>(
125        &self,
126        from: u64,
127        to_exclusive: u64,
128        conn: &mut <Self::Store as crate::store::Store>::Connection<'a>,
129    ) -> anyhow::Result<usize> {
130        <H as Handler>::prune(self, from, to_exclusive, conn).await
131    }
132}