sui_rpc_store/indexer/
checkpoint_broadcast.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that re-publishes each ingested checkpoint to
5//! a [`tokio::sync::broadcast`] channel, in checkpoint order.
6//!
7//! Unlike the other pipelines this one writes nothing to the
8//! [`RpcStoreSchema`](crate::RpcStoreSchema): its "batch" *is* the
9//! [`Checkpoint`] itself (`type Batch = Option<Arc<Checkpoint>>`). The
10//! framework's sequential committer drives `commit` in strict
11//! checkpoint order, and — because pipelines registered with the
12//! [`Synchronizer`](sui_consistent_store::Synchronizer) set
13//! `MAX_BATCH_CHECKPOINTS = 1` — each `commit` corresponds to exactly
14//! one checkpoint. So sending on the broadcast channel from `commit`
15//! yields a gap-free, in-order checkpoint stream.
16//!
17//! This is the standalone `sui-rpc-node`'s analog of the fullnode's
18//! checkpoint-executor broadcast: it lets the node host
19//! `sui-rpc-api`'s checkpoint-subscription service over the same
20//! checkpoints it indexes. The pipeline is *not* part of
21//! [`PipelineLayer`](crate::config::PipelineLayer) — it carries a
22//! runtime `broadcast::Sender` rather than a config toggle, so callers
23//! register it explicitly via
24//! [`Indexer::add_checkpoint_broadcast`](crate::Indexer::add_checkpoint_broadcast).
25
26use std::sync::Arc;
27
28use anyhow::Context as _;
29use async_trait::async_trait;
30use sui_consistent_store::Db;
31use sui_consistent_store::FrameworkSchema;
32use sui_consistent_store::PipelineTaskKey;
33use sui_consistent_store::Watermark;
34use sui_indexer_alt_framework::pipeline::Processor;
35use sui_indexer_alt_framework::pipeline::sequential;
36use sui_types::full_checkpoint_content::Checkpoint;
37use tokio::sync::broadcast;
38
39use crate::indexer::Schema;
40use crate::indexer::Store;
41
42/// Pipeline that broadcasts each committed checkpoint. Holds the
43/// send half of the broadcast channel the subscription service reads
44/// from.
45pub struct CheckpointBroadcast {
46    sender: broadcast::Sender<Arc<Checkpoint>>,
47}
48
49impl CheckpointBroadcast {
50    /// Pipeline name; also the `__watermark` key the synchronizer
51    /// tracks this pipeline under.
52    pub const NAME: &'static str = "checkpoint_broadcast";
53
54    pub fn new(sender: broadcast::Sender<Arc<Checkpoint>>) -> Self {
55        Self { sender }
56    }
57}
58
59#[async_trait]
60impl Processor for CheckpointBroadcast {
61    const NAME: &'static str = CheckpointBroadcast::NAME;
62    type Value = Arc<Checkpoint>;
63
64    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
65        // Cheap: clones the `Arc`, not the checkpoint. The payload is
66        // carried to `commit` (rather than reloaded from the store)
67        // because the framework already has it in hand here.
68        Ok(vec![checkpoint.clone()])
69    }
70}
71
72#[async_trait]
73impl sequential::Handler for CheckpointBroadcast {
74    type Store = Store;
75    /// The "batch" is the checkpoint to broadcast. `Option` because
76    /// `Batch` must be `Default`; `MAX_BATCH_CHECKPOINTS = 1` (enforced
77    /// at registration) guarantees at most one checkpoint lands here
78    /// per commit.
79    type Batch = Option<Arc<Checkpoint>>;
80
81    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
82        // With one checkpoint per batch there is exactly one value;
83        // `last()` also stays correct if batching is ever widened
84        // (the highest checkpoint is the one to publish).
85        if let Some(checkpoint) = values.last() {
86            *batch = Some(checkpoint);
87        }
88    }
89
90    async fn commit<'a>(
91        &self,
92        batch: &Self::Batch,
93        _conn: &mut sui_consistent_store::Connection<'a, Schema>,
94    ) -> anyhow::Result<usize> {
95        // No CF writes: the framework still advances this pipeline's
96        // watermark atomically through the connection, so the
97        // synchronizer commits an (otherwise empty) ordered batch.
98        let Some(checkpoint) = batch else {
99            return Ok(0);
100        };
101        // `send` errors only when there are no live subscribers, which
102        // is the common idle case — not a failure. Drop it.
103        let _ = self.sender.send(checkpoint.clone());
104        Ok(1)
105    }
106}
107
108/// Seed the broadcast pipeline's resume watermark to `checkpoint` when
109/// it has none yet, so it starts at the live tip rather than genesis.
110///
111/// The broadcast pipeline is registered like any other (it persists a
112/// watermark and resumes from it), but it is not covered by the formal
113/// snapshot or [`floor_unrestored_pipelines`](crate::floor_unrestored_pipelines),
114/// so on the first run after a restore it would have no watermark and
115/// the framework would start it — and, since the shared ingestion start
116/// is the minimum across pipelines, *all* pipelines — back at genesis.
117/// Seeding it to the current tip avoids that. A no-op once a watermark
118/// exists (subsequent runs resume normally), so callers pass the tip
119/// and skip the call on a fresh database with none.
120pub fn seed_watermark_to_tip(db: &Db, checkpoint: u64) -> anyhow::Result<()> {
121    let framework = FrameworkSchema::new(db.clone());
122    let key = PipelineTaskKey::new(CheckpointBroadcast::NAME);
123    if framework
124        .watermarks
125        .get(&key)
126        .context("reading checkpoint_broadcast watermark")?
127        .is_some()
128    {
129        return Ok(());
130    }
131    let mut batch = db.batch();
132    batch
133        .put(
134            &framework.watermarks,
135            &key,
136            &Watermark::for_checkpoint(checkpoint),
137        )
138        .context("staging checkpoint_broadcast watermark seed")?;
139    batch
140        .commit()
141        .context("committing checkpoint_broadcast watermark seed")?;
142    Ok(())
143}
144
145#[cfg(test)]
146mod tests {
147    use sui_consistent_store::DbOptions;
148    use sui_indexer_alt_framework::pipeline::sequential::Handler as _;
149    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
150    use tempfile::TempDir;
151
152    use super::*;
153    use crate::RpcStoreSchema;
154
155    #[tokio::test]
156    async fn process_emits_the_checkpoint() {
157        let checkpoint = Arc::new(TestCheckpointBuilder::new(7).build_checkpoint());
158        let sender = broadcast::channel(16).0;
159        let values = CheckpointBroadcast::new(sender)
160            .process(&checkpoint)
161            .await
162            .unwrap();
163        assert_eq!(values.len(), 1);
164        assert_eq!(values[0].summary.sequence_number, 7);
165    }
166
167    #[tokio::test]
168    async fn batch_keeps_the_checkpoint() {
169        let checkpoint = Arc::new(TestCheckpointBuilder::new(9).build_checkpoint());
170        let handler = CheckpointBroadcast::new(broadcast::channel(16).0);
171        let mut batch = None;
172        handler.batch(&mut batch, vec![checkpoint].into_iter());
173        assert_eq!(batch.unwrap().summary.sequence_number, 9);
174    }
175
176    #[test]
177    fn seed_watermark_writes_once_and_does_not_overwrite() {
178        let dir = TempDir::new().unwrap();
179        let (db, _schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
180        let key = PipelineTaskKey::new(CheckpointBroadcast::NAME);
181        let framework = FrameworkSchema::new(db.clone());
182
183        // Fresh: seeds to the requested tip.
184        seed_watermark_to_tip(&db, 100).unwrap();
185        assert_eq!(
186            framework.watermarks.get(&key).unwrap(),
187            Some(Watermark::for_checkpoint(100)),
188        );
189
190        // Already seeded: a later call is a no-op so the pipeline
191        // resumes from where it left off rather than jumping.
192        seed_watermark_to_tip(&db, 200).unwrap();
193        assert_eq!(
194            framework.watermarks.get(&key).unwrap(),
195            Some(Watermark::for_checkpoint(100)),
196        );
197    }
198}