sui_indexer_alt_framework/postgres/
handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Postgres-specific handler trait for concurrent indexing pipelines.
5//!
6//! This module provides an interface for handlers that need to respect
7//! PostgreSQL's bind parameter limit (32,767 parameters per query). When inserting multiple rows,
8//! each field becomes a bind parameter, so the maximum number of rows per batch is:
9//!
10//! ```text
11//! max_rows = 32,767 / fields_per_row
12//! ```
13//!
14//! The `Handler` trait in this module extends the framework's concurrent Handler trait with
15//! Postgres-specific batching logic that automatically handles this limitation.
16//!
17//! # TODO: Consider moving FieldCount to sui-pg-db
18//!
19//! Currently, FieldCount lives in this framework crate but is Postgres-specific. Ideally it should
20//! live in sui-pg-db. However, this creates a circular dependency:
21//! - sui-indexer-alt-framework depends on sui-pg-db (for IndexerCluster and other utilities)
22//! - This blanket impl needs FieldCount to implement concurrent::Handler for postgres indexers
23//! - Moving FieldCount to sui-pg-db would require framework to depend on sui-pg-db (circular!)
24//!
25//! To fully decouple, we'd need to move all postgres-specific code (including IndexerCluster) to
26//! sui-pg-db, which would be a much larger breaking change. Consider this for a future refactor.
27//!
28//! See: <https://github.com/MystenLabs/sui/pull/24055#issuecomment-3471278182>
29
30use async_trait::async_trait;
31
32use super::{Connection, Db, FieldCount};
33use crate::pipeline::{Processor, concurrent};
34
35/// Postgres-specific handler trait for concurrent indexing pipelines.
36///
37/// The trait automatically implements the framework's Handler trait with a PgBatch that respects
38/// the 32,767 bind parameter limit.
39#[async_trait]
40pub trait Handler: Processor<Value: FieldCount> {
41    /// If at least this many rows are pending, the committer will commit them eagerly.
42    const MIN_EAGER_ROWS: usize = 50;
43
44    /// If there are more than this many rows pending, the committer applies backpressure.
45    const MAX_PENDING_ROWS: usize = 5000;
46
47    /// The maximum number of watermarks that can show up in a single batch.
48    const MAX_WATERMARK_UPDATES: usize = 10_000;
49
50    /// Take a chunk of values and commit them to the database, returning the number of rows
51    /// affected.
52    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>)
53    -> anyhow::Result<usize>;
54
55    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database,
56    /// returning the number of rows affected. This function is optional, and defaults to not
57    /// pruning at all.
58    async fn prune<'a>(
59        &self,
60        _from: u64,
61        _to_exclusive: u64,
62        _conn: &mut Connection<'a>,
63    ) -> anyhow::Result<usize> {
64        Ok(0)
65    }
66}
67
68/// Calculate the maximum number of rows that can be inserted in a single batch,
69/// given the number of fields per row.
70const fn max_chunk_rows<T: FieldCount>() -> usize {
71    if T::FIELD_COUNT == 0 {
72        i16::MAX as usize
73    } else {
74        i16::MAX as usize / T::FIELD_COUNT
75    }
76}
77
78/// Blanket implementation of the framework's Handler trait for any type implementing the
79/// Postgres-specific Handler trait.
80#[async_trait]
81impl<H> concurrent::Handler for H
82where
83    H: Handler + Send + Sync + 'static,
84    H::Value: FieldCount + Send + Sync,
85{
86    type Store = Db;
87    type Batch = Vec<H::Value>;
88
89    const MIN_EAGER_ROWS: usize = H::MIN_EAGER_ROWS;
90    const MAX_PENDING_ROWS: usize = H::MAX_PENDING_ROWS;
91    const MAX_WATERMARK_UPDATES: usize = H::MAX_WATERMARK_UPDATES;
92
93    fn batch(
94        &self,
95        batch: &mut Self::Batch,
96        values: &mut std::vec::IntoIter<Self::Value>,
97    ) -> crate::pipeline::concurrent::BatchStatus {
98        let max_chunk_rows = max_chunk_rows::<H::Value>();
99        let current_len = batch.len();
100
101        if current_len + values.len() > max_chunk_rows {
102            // Batch would exceed the limit, take only what fits
103            let remaining_capacity = max_chunk_rows - current_len;
104            batch.extend(values.take(remaining_capacity));
105            crate::pipeline::concurrent::BatchStatus::Ready
106        } else {
107            // All values fit, take them all
108            batch.extend(values);
109            crate::pipeline::concurrent::BatchStatus::Pending
110        }
111    }
112
113    async fn commit<'a>(
114        &self,
115        batch: &Self::Batch,
116        conn: &mut <Self::Store as crate::store::Store>::Connection<'a>,
117    ) -> anyhow::Result<usize> {
118        H::commit(batch, conn).await
119    }
120
121    async fn prune<'a>(
122        &self,
123        from: u64,
124        to_exclusive: u64,
125        conn: &mut <Self::Store as crate::store::Store>::Connection<'a>,
126    ) -> anyhow::Result<usize> {
127        <H as Handler>::prune(self, from, to_exclusive, conn).await
128    }
129}