sui_indexer_alt_framework/postgres/handler.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Postgres-specific handler trait for concurrent indexing pipelines.
5//!
6//! This module provides an interface for handlers that need to respect
7//! PostgreSQL's bind parameter limit (32,767 parameters per query). When inserting multiple rows,
8//! each field becomes a bind parameter, so the maximum number of rows per batch is:
9//!
10//! ```text
11//! max_rows = 32,767 / fields_per_row
12//! ```
13//!
14//! The `Handler` trait in this module extends the framework's concurrent Handler trait with
15//! Postgres-specific batching logic that automatically handles this limitation.
16//!
17//! # TODO: Consider moving FieldCount to sui-pg-db
18//!
19//! Currently, FieldCount lives in this framework crate but is Postgres-specific. Ideally it should
20//! live in sui-pg-db. However, this creates a circular dependency:
21//! - sui-indexer-alt-framework depends on sui-pg-db (for IndexerCluster and other utilities)
22//! - This blanket impl needs FieldCount to implement concurrent::Handler for postgres indexers
23//! - Moving FieldCount to sui-pg-db would require framework to depend on sui-pg-db (circular!)
24//!
25//! To fully decouple, we'd need to move all postgres-specific code (including IndexerCluster) to
26//! sui-pg-db, which would be a much larger breaking change. Consider this for a future refactor.
27//!
28//! See: <https://github.com/MystenLabs/sui/pull/24055#issuecomment-3471278182>
29
30use async_trait::async_trait;
31
32use crate::pipeline::Processor;
33use crate::pipeline::concurrent;
34use crate::postgres::Connection;
35use crate::postgres::Db;
36use crate::postgres::FieldCount;
37
38/// Postgres-specific handler trait for concurrent indexing pipelines.
39///
40/// The trait automatically implements the framework's Handler trait with a PgBatch that respects
41/// the 32,767 bind parameter limit.
42#[async_trait]
43pub trait Handler: Processor<Value: FieldCount> {
44 /// If at least this many rows are pending, the committer will commit them eagerly.
45 const MIN_EAGER_ROWS: usize = 50;
46
47 /// If there are more than this many rows pending, the committer applies backpressure.
48 const MAX_PENDING_ROWS: usize = 5000;
49
50 /// The maximum number of watermarks that can show up in a single batch.
51 const MAX_WATERMARK_UPDATES: usize = 10_000;
52
53 /// Take a chunk of values and commit them to the database, returning the number of rows
54 /// affected.
55 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>)
56 -> anyhow::Result<usize>;
57
58 /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database,
59 /// returning the number of rows affected. This function is optional, and defaults to not
60 /// pruning at all.
61 async fn prune<'a>(
62 &self,
63 _from: u64,
64 _to_exclusive: u64,
65 _conn: &mut Connection<'a>,
66 ) -> anyhow::Result<usize> {
67 Ok(0)
68 }
69}
70
71/// Calculate the maximum number of rows that can be inserted in a single batch,
72/// given the number of fields per row.
73const fn max_chunk_rows<T: FieldCount>() -> usize {
74 if T::FIELD_COUNT == 0 {
75 i16::MAX as usize
76 } else {
77 i16::MAX as usize / T::FIELD_COUNT
78 }
79}
80
81/// Blanket implementation of the framework's Handler trait for any type implementing the
82/// Postgres-specific Handler trait.
83#[async_trait]
84impl<H> concurrent::Handler for H
85where
86 H: Handler + Send + Sync + 'static,
87 H::Value: FieldCount + Send + Sync,
88{
89 type Store = Db;
90 type Batch = Vec<H::Value>;
91
92 const MIN_EAGER_ROWS: usize = H::MIN_EAGER_ROWS;
93 const MAX_PENDING_ROWS: usize = H::MAX_PENDING_ROWS;
94 const MAX_WATERMARK_UPDATES: usize = H::MAX_WATERMARK_UPDATES;
95
96 fn batch(
97 &self,
98 batch: &mut Self::Batch,
99 values: &mut std::vec::IntoIter<Self::Value>,
100 ) -> crate::pipeline::concurrent::BatchStatus {
101 let max_chunk_rows = max_chunk_rows::<H::Value>();
102 let current_len = batch.len();
103
104 if current_len + values.len() > max_chunk_rows {
105 // Batch would exceed the limit, take only what fits
106 let remaining_capacity = max_chunk_rows - current_len;
107 batch.extend(values.take(remaining_capacity));
108 crate::pipeline::concurrent::BatchStatus::Ready
109 } else {
110 // All values fit, take them all
111 batch.extend(values);
112 crate::pipeline::concurrent::BatchStatus::Pending
113 }
114 }
115
116 async fn commit<'a>(
117 &self,
118 batch: &Self::Batch,
119 conn: &mut <Self::Store as crate::store::Store>::Connection<'a>,
120 ) -> anyhow::Result<usize> {
121 H::commit(batch, conn).await
122 }
123
124 async fn prune<'a>(
125 &self,
126 from: u64,
127 to_exclusive: u64,
128 conn: &mut <Self::Store as crate::store::Store>::Connection<'a>,
129 ) -> anyhow::Result<usize> {
130 <H as Handler>::prune(self, from, to_exclusive, conn).await
131 }
132}