sui_indexer_alt_framework/postgres/handler.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Postgres-specific handler trait for concurrent indexing pipelines.
5//!
6//! This module provides an interface for handlers that need to respect
7//! PostgreSQL's bind parameter limit (32,767 parameters per query). When inserting multiple rows,
8//! each field becomes a bind parameter, so the maximum number of rows per batch is:
9//!
10//! ```text
11//! max_rows = 32,767 / fields_per_row
12//! ```
13//!
14//! The `Handler` trait in this module extends the framework's concurrent Handler trait with
15//! Postgres-specific batching logic that automatically handles this limitation.
16//!
17//! # TODO: Consider moving FieldCount to sui-pg-db
18//!
19//! Currently, FieldCount lives in this framework crate but is Postgres-specific. Ideally it should
20//! live in sui-pg-db. However, this creates a circular dependency:
21//! - sui-indexer-alt-framework depends on sui-pg-db (for IndexerCluster and other utilities)
22//! - This blanket impl needs FieldCount to implement concurrent::Handler for postgres indexers
23//! - Moving FieldCount to sui-pg-db would require framework to depend on sui-pg-db (circular!)
24//!
25//! To fully decouple, we'd need to move all postgres-specific code (including IndexerCluster) to
26//! sui-pg-db, which would be a much larger breaking change. Consider this for a future refactor.
27//!
28//! See: <https://github.com/MystenLabs/sui/pull/24055#issuecomment-3471278182>
29
30use async_trait::async_trait;
31
32use super::{Connection, Db, FieldCount};
33use crate::pipeline::{Processor, concurrent};
34
35/// Postgres-specific handler trait for concurrent indexing pipelines.
36///
37/// The trait automatically implements the framework's Handler trait with a PgBatch that respects
38/// the 32,767 bind parameter limit.
39#[async_trait]
40pub trait Handler: Processor<Value: FieldCount> {
41 /// If at least this many rows are pending, the committer will commit them eagerly.
42 const MIN_EAGER_ROWS: usize = 50;
43
44 /// If there are more than this many rows pending, the committer applies backpressure.
45 const MAX_PENDING_ROWS: usize = 5000;
46
47 /// The maximum number of watermarks that can show up in a single batch.
48 const MAX_WATERMARK_UPDATES: usize = 10_000;
49
50 /// Take a chunk of values and commit them to the database, returning the number of rows
51 /// affected.
52 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>)
53 -> anyhow::Result<usize>;
54
55 /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database,
56 /// returning the number of rows affected. This function is optional, and defaults to not
57 /// pruning at all.
58 async fn prune<'a>(
59 &self,
60 _from: u64,
61 _to_exclusive: u64,
62 _conn: &mut Connection<'a>,
63 ) -> anyhow::Result<usize> {
64 Ok(0)
65 }
66}
67
68/// Calculate the maximum number of rows that can be inserted in a single batch,
69/// given the number of fields per row.
70const fn max_chunk_rows<T: FieldCount>() -> usize {
71 if T::FIELD_COUNT == 0 {
72 i16::MAX as usize
73 } else {
74 i16::MAX as usize / T::FIELD_COUNT
75 }
76}
77
78/// Blanket implementation of the framework's Handler trait for any type implementing the
79/// Postgres-specific Handler trait.
80#[async_trait]
81impl<H> concurrent::Handler for H
82where
83 H: Handler + Send + Sync + 'static,
84 H::Value: FieldCount + Send + Sync,
85{
86 type Store = Db;
87 type Batch = Vec<H::Value>;
88
89 const MIN_EAGER_ROWS: usize = H::MIN_EAGER_ROWS;
90 const MAX_PENDING_ROWS: usize = H::MAX_PENDING_ROWS;
91 const MAX_WATERMARK_UPDATES: usize = H::MAX_WATERMARK_UPDATES;
92
93 fn batch(
94 &self,
95 batch: &mut Self::Batch,
96 values: &mut std::vec::IntoIter<Self::Value>,
97 ) -> crate::pipeline::concurrent::BatchStatus {
98 let max_chunk_rows = max_chunk_rows::<H::Value>();
99 let current_len = batch.len();
100
101 if current_len + values.len() > max_chunk_rows {
102 // Batch would exceed the limit, take only what fits
103 let remaining_capacity = max_chunk_rows - current_len;
104 batch.extend(values.take(remaining_capacity));
105 crate::pipeline::concurrent::BatchStatus::Ready
106 } else {
107 // All values fit, take them all
108 batch.extend(values);
109 crate::pipeline::concurrent::BatchStatus::Pending
110 }
111 }
112
113 async fn commit<'a>(
114 &self,
115 batch: &Self::Batch,
116 conn: &mut <Self::Store as crate::store::Store>::Connection<'a>,
117 ) -> anyhow::Result<usize> {
118 H::commit(batch, conn).await
119 }
120
121 async fn prune<'a>(
122 &self,
123 from: u64,
124 to_exclusive: u64,
125 conn: &mut <Self::Store as crate::store::Store>::Connection<'a>,
126 ) -> anyhow::Result<usize> {
127 <H as Handler>::prune(self, from, to_exclusive, conn).await
128 }
129}