sui_rpc_store/indexer/checkpoint_broadcast.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that re-publishes each ingested checkpoint to
5//! a [`tokio::sync::broadcast`] channel, in checkpoint order.
6//!
7//! Unlike the other pipelines this one writes nothing to the
8//! [`RpcStoreSchema`](crate::RpcStoreSchema): its "batch" *is* the
9//! [`Checkpoint`] itself (`type Batch = Option<Arc<Checkpoint>>`). The
10//! framework's sequential committer drives `commit` in strict
11//! checkpoint order, and — because pipelines registered with the
12//! [`Synchronizer`](sui_consistent_store::Synchronizer) set
13//! `MAX_BATCH_CHECKPOINTS = 1` — each `commit` corresponds to exactly
14//! one checkpoint. So sending on the broadcast channel from `commit`
15//! yields a gap-free, in-order checkpoint stream.
16//!
17//! This is the standalone `sui-rpc-node`'s analog of the fullnode's
18//! checkpoint-executor broadcast: it lets the node host
19//! `sui-rpc-api`'s checkpoint-subscription service over the same
20//! checkpoints it indexes. The pipeline is *not* part of
21//! [`PipelineLayer`](crate::config::PipelineLayer) — it carries a
22//! runtime `broadcast::Sender` rather than a config toggle, so callers
23//! register it explicitly via
24//! [`Indexer::add_checkpoint_broadcast`](crate::Indexer::add_checkpoint_broadcast).
25
26use std::sync::Arc;
27
28use anyhow::Context as _;
29use async_trait::async_trait;
30use sui_consistent_store::Db;
31use sui_consistent_store::FrameworkSchema;
32use sui_consistent_store::PipelineTaskKey;
33use sui_consistent_store::Watermark;
34use sui_indexer_alt_framework::pipeline::Processor;
35use sui_indexer_alt_framework::pipeline::sequential;
36use sui_types::full_checkpoint_content::Checkpoint;
37use tokio::sync::broadcast;
38
39use crate::indexer::Schema;
40use crate::indexer::Store;
41
42/// Pipeline that broadcasts each committed checkpoint. Holds the
43/// send half of the broadcast channel the subscription service reads
44/// from.
45pub struct CheckpointBroadcast {
46 sender: broadcast::Sender<Arc<Checkpoint>>,
47}
48
49impl CheckpointBroadcast {
50 /// Pipeline name; also the `__watermark` key the synchronizer
51 /// tracks this pipeline under.
52 pub const NAME: &'static str = "checkpoint_broadcast";
53
54 pub fn new(sender: broadcast::Sender<Arc<Checkpoint>>) -> Self {
55 Self { sender }
56 }
57}
58
59#[async_trait]
60impl Processor for CheckpointBroadcast {
61 const NAME: &'static str = CheckpointBroadcast::NAME;
62 type Value = Arc<Checkpoint>;
63
64 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
65 // Cheap: clones the `Arc`, not the checkpoint. The payload is
66 // carried to `commit` (rather than reloaded from the store)
67 // because the framework already has it in hand here.
68 Ok(vec![checkpoint.clone()])
69 }
70}
71
72#[async_trait]
73impl sequential::Handler for CheckpointBroadcast {
74 type Store = Store;
75 /// The "batch" is the checkpoint to broadcast. `Option` because
76 /// `Batch` must be `Default`; `MAX_BATCH_CHECKPOINTS = 1` (enforced
77 /// at registration) guarantees at most one checkpoint lands here
78 /// per commit.
79 type Batch = Option<Arc<Checkpoint>>;
80
81 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
82 // With one checkpoint per batch there is exactly one value;
83 // `last()` also stays correct if batching is ever widened
84 // (the highest checkpoint is the one to publish).
85 if let Some(checkpoint) = values.last() {
86 *batch = Some(checkpoint);
87 }
88 }
89
90 async fn commit<'a>(
91 &self,
92 batch: &Self::Batch,
93 _conn: &mut sui_consistent_store::Connection<'a, Schema>,
94 ) -> anyhow::Result<usize> {
95 // No CF writes: the framework still advances this pipeline's
96 // watermark atomically through the connection, so the
97 // synchronizer commits an (otherwise empty) ordered batch.
98 let Some(checkpoint) = batch else {
99 return Ok(0);
100 };
101 // `send` errors only when there are no live subscribers, which
102 // is the common idle case — not a failure. Drop it.
103 let _ = self.sender.send(checkpoint.clone());
104 Ok(1)
105 }
106}
107
108/// Seed the broadcast pipeline's resume watermark to `checkpoint` when
109/// it has none yet, so it starts at the live tip rather than genesis.
110///
111/// The broadcast pipeline is registered like any other (it persists a
112/// watermark and resumes from it), but it is not covered by the formal
113/// snapshot or [`floor_unrestored_pipelines`](crate::floor_unrestored_pipelines),
114/// so on the first run after a restore it would have no watermark and
115/// the framework would start it — and, since the shared ingestion start
116/// is the minimum across pipelines, *all* pipelines — back at genesis.
117/// Seeding it to the current tip avoids that. A no-op once a watermark
118/// exists (subsequent runs resume normally), so callers pass the tip
119/// and skip the call on a fresh database with none.
120pub fn seed_watermark_to_tip(db: &Db, checkpoint: u64) -> anyhow::Result<()> {
121 let framework = FrameworkSchema::new(db.clone());
122 let key = PipelineTaskKey::new(CheckpointBroadcast::NAME);
123 if framework
124 .watermarks
125 .get(&key)
126 .context("reading checkpoint_broadcast watermark")?
127 .is_some()
128 {
129 return Ok(());
130 }
131 let mut batch = db.batch();
132 batch
133 .put(
134 &framework.watermarks,
135 &key,
136 &Watermark::for_checkpoint(checkpoint),
137 )
138 .context("staging checkpoint_broadcast watermark seed")?;
139 batch
140 .commit()
141 .context("committing checkpoint_broadcast watermark seed")?;
142 Ok(())
143}
144
145#[cfg(test)]
146mod tests {
147 use sui_consistent_store::DbOptions;
148 use sui_indexer_alt_framework::pipeline::sequential::Handler as _;
149 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
150 use tempfile::TempDir;
151
152 use super::*;
153 use crate::RpcStoreSchema;
154
155 #[tokio::test]
156 async fn process_emits_the_checkpoint() {
157 let checkpoint = Arc::new(TestCheckpointBuilder::new(7).build_checkpoint());
158 let sender = broadcast::channel(16).0;
159 let values = CheckpointBroadcast::new(sender)
160 .process(&checkpoint)
161 .await
162 .unwrap();
163 assert_eq!(values.len(), 1);
164 assert_eq!(values[0].summary.sequence_number, 7);
165 }
166
167 #[tokio::test]
168 async fn batch_keeps_the_checkpoint() {
169 let checkpoint = Arc::new(TestCheckpointBuilder::new(9).build_checkpoint());
170 let handler = CheckpointBroadcast::new(broadcast::channel(16).0);
171 let mut batch = None;
172 handler.batch(&mut batch, vec![checkpoint].into_iter());
173 assert_eq!(batch.unwrap().summary.sequence_number, 9);
174 }
175
176 #[test]
177 fn seed_watermark_writes_once_and_does_not_overwrite() {
178 let dir = TempDir::new().unwrap();
179 let (db, _schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
180 let key = PipelineTaskKey::new(CheckpointBroadcast::NAME);
181 let framework = FrameworkSchema::new(db.clone());
182
183 // Fresh: seeds to the requested tip.
184 seed_watermark_to_tip(&db, 100).unwrap();
185 assert_eq!(
186 framework.watermarks.get(&key).unwrap(),
187 Some(Watermark::for_checkpoint(100)),
188 );
189
190 // Already seeded: a later call is a no-op so the pipeline
191 // resumes from where it left off rather than jumping.
192 seed_watermark_to_tip(&db, 200).unwrap();
193 assert_eq!(
194 framework.watermarks.get(&key).unwrap(),
195 Some(Watermark::for_checkpoint(100)),
196 );
197 }
198}