sui_core/
rpc_store_streaming_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! [`CheckpointStreamingClient`] backed by the checkpoint executor's broadcast
5//! stream.
6//!
7//! The embedded `sui-rpc-store` indexer consumes the tip of the chain from the
8//! same `tokio::sync::broadcast` channel the executor publishes to (see
9//! `CheckpointExecutor`), avoiding a gRPC round-trip to the node's own
10//! subscription endpoint. The ingestion framework only engages the streaming
11//! client once the indexer is within catch-up range of the tip; any gap (for
12//! example after the receiver lags) surfaces as a stream error, which the
13//! framework fills from the [`PerpetualStoreIngestionClient`] instead.
14//!
15//! [`CheckpointStreamingClient`]:
16//!     sui_indexer_alt_framework::ingestion::streaming_client::CheckpointStreamingClient
17//! [`PerpetualStoreIngestionClient`]:
18//!     crate::rpc_store_ingestion_client::PerpetualStoreIngestionClient
19
20use std::sync::Arc;
21
22use anyhow::anyhow;
23use async_trait::async_trait;
24use futures::StreamExt;
25use futures::stream;
26use sui_indexer_alt_framework::ingestion::error::Error;
27use sui_indexer_alt_framework::ingestion::streaming_client::CheckpointStream;
28use sui_indexer_alt_framework::ingestion::streaming_client::CheckpointStreamingClient;
29use sui_types::digests::ChainIdentifier;
30use sui_types::full_checkpoint_content::Checkpoint;
31use sui_types::storage::ReadStore;
32use tokio::sync::broadcast;
33
34/// A [`CheckpointStreamingClient`] that subscribes to the executor's checkpoint
35/// broadcast channel. `connect` seeds the stream with the current tip read from
36/// the local store, then follows the broadcast for checkpoints published after
37/// the call.
38///
39/// The tip seed is essential: a fresh `tokio::sync::broadcast` subscription only
40/// carries checkpoints published *after* it is taken, so on an idle chain the
41/// stream would be empty and the framework's `peek()` -- which it uses to learn
42/// the network tip before starting ingestion -- would block forever. Reading the
43/// tip from the local store mirrors a gRPC stream that opens at the latest
44/// checkpoint, letting ingestion fill `[start, tip)` immediately while streaming
45/// takes over from `tip`.
46///
47/// `R` is the local checkpoint store (the fullnode's [`RocksDbStore`] in
48/// production); the generic bound keeps it unit-testable against an in-memory
49/// store.
50///
51/// [`RocksDbStore`]: crate::storage::RocksDbStore
52pub struct BroadcastStreamingClient<R> {
53    sender: broadcast::Sender<Arc<Checkpoint>>,
54    chain_id: ChainIdentifier,
55    store: R,
56}
57
58impl<R> BroadcastStreamingClient<R> {
59    pub fn new(
60        sender: broadcast::Sender<Arc<Checkpoint>>,
61        chain_id: ChainIdentifier,
62        store: R,
63    ) -> Self {
64        Self {
65            sender,
66            chain_id,
67            store,
68        }
69    }
70}
71
72impl<R: ReadStore> BroadcastStreamingClient<R> {
73    /// Read the current tip's full checkpoint from the local store.
74    fn current_tip(&self) -> Result<Checkpoint, Error> {
75        let seq = self
76            .store
77            .get_latest_checkpoint_sequence_number()
78            .map_err(|e| Error::StreamingError(e.into()))?;
79        let summary = self
80            .store
81            .get_checkpoint_by_sequence_number(seq)
82            .ok_or_else(|| Error::StreamingError(anyhow!("checkpoint {seq} summary missing")))?;
83        let contents = self
84            .store
85            .get_checkpoint_contents_by_digest(&summary.content_digest)
86            .ok_or_else(|| Error::StreamingError(anyhow!("checkpoint {seq} contents missing")))?;
87        self.store
88            .get_checkpoint_data(summary, contents)
89            .map_err(|e| Error::StreamingError(e.into()))
90    }
91}
92
93#[async_trait]
94impl<R: ReadStore + Send + Sync + 'static> CheckpointStreamingClient
95    for BroadcastStreamingClient<R>
96{
97    async fn connect(&mut self) -> Result<CheckpointStream, Error> {
98        // Subscribe before reading the tip so no checkpoint published between
99        // the two is missed (the broadcast only carries checkpoints published
100        // after `subscribe`).
101        let receiver = self.sender.subscribe();
102
103        // Seed the stream with the current tip so `peek()` resolves immediately
104        // even on an idle chain (see the type-level docs).
105        let tip = self.current_tip()?;
106
107        // A `Lagged` receiver missed checkpoints, so we surface a stream error
108        // rather than silently skipping ahead: the framework reconnects and
109        // fills the gap from the ingestion client. `Closed` ends the stream.
110        let live = stream::unfold(receiver, |mut receiver| async move {
111            match receiver.recv().await {
112                Ok(checkpoint) => Some((Ok((*checkpoint).clone()), receiver)),
113                Err(broadcast::error::RecvError::Lagged(skipped)) => Some((
114                    Err(Error::StreamingError(anyhow!(
115                        "broadcast stream lagged by {skipped} checkpoints"
116                    ))),
117                    receiver,
118                )),
119                Err(broadcast::error::RecvError::Closed) => None,
120            }
121        });
122
123        // The broadcaster skips checkpoints below its watermark and breaks to
124        // ingestion on a gap, so a tip that duplicates or precedes the first
125        // live checkpoint needs no special handling here.
126        let stream = stream::once(async move { Ok(tip) }).chain(live).boxed();
127
128        Ok(CheckpointStream {
129            stream: tokio_stream::StreamExt::peekable(stream),
130            chain_id: self.chain_id,
131        })
132    }
133
134    async fn latest_checkpoint_number(&mut self) -> Result<u64, Error> {
135        // Read the local store directly; the default trait impl peeks the
136        // stream, which blocks on an idle chain (see `connect`).
137        self.store
138            .get_latest_checkpoint_sequence_number()
139            .map_err(|e| Error::StreamingError(e.into()))
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use crate::rpc_store_test_utils::checkpoint;
147    use crate::rpc_store_test_utils::store_with;
148    use crate::rpc_store_test_utils::test_chain_id;
149
150    #[tokio::test]
151    async fn seeds_tip_then_streams_published_checkpoints_in_order() {
152        let (sender, _keep) = broadcast::channel(16);
153        // The local store is at checkpoint 5; the broadcast then publishes 6, 7.
154        let mut client =
155            BroadcastStreamingClient::new(sender.clone(), test_chain_id(), store_with([5]));
156
157        let mut connected = client.connect().await.unwrap();
158        assert_eq!(connected.chain_id, test_chain_id());
159
160        sender.send(checkpoint(6)).unwrap();
161        sender.send(checkpoint(7)).unwrap();
162
163        // The stream opens at the local tip, then follows the broadcast.
164        let tip = connected.stream.next().await.unwrap().unwrap();
165        let second = connected.stream.next().await.unwrap().unwrap();
166        let third = connected.stream.next().await.unwrap().unwrap();
167        assert_eq!(*tip.summary.sequence_number(), 5);
168        assert_eq!(*second.summary.sequence_number(), 6);
169        assert_eq!(*third.summary.sequence_number(), 7);
170    }
171
172    #[tokio::test]
173    async fn latest_checkpoint_number_reads_the_local_tip() {
174        let (sender, _keep) = broadcast::channel(16);
175        let mut client =
176            BroadcastStreamingClient::new(sender, test_chain_id(), store_with([0, 4, 9]));
177        assert_eq!(client.latest_checkpoint_number().await.unwrap(), 9);
178    }
179
180    #[tokio::test]
181    async fn lagging_receiver_yields_a_stream_error() {
182        // Capacity 2: publishing four checkpoints before reading drops the two
183        // oldest, so the first live read observes `Lagged`.
184        let (sender, _keep) = broadcast::channel(2);
185        // Tip 9 is distinct from the published 0..4 so we can tell them apart.
186        let mut client =
187            BroadcastStreamingClient::new(sender.clone(), test_chain_id(), store_with([9]));
188        let mut connected = client.connect().await.unwrap();
189
190        for seq in 0..4 {
191            sender.send(checkpoint(seq)).unwrap();
192        }
193
194        // The seeded tip comes first, then the lag surfaces on the live stream.
195        let tip = connected.stream.next().await.unwrap().unwrap();
196        assert_eq!(*tip.summary.sequence_number(), 9);
197        assert!(matches!(
198            connected.stream.next().await,
199            Some(Err(Error::StreamingError(_)))
200        ));
201        // After the lag the still-buffered checkpoints are delivered in order.
202        let next = connected.stream.next().await.unwrap().unwrap();
203        assert_eq!(*next.summary.sequence_number(), 2);
204    }
205
206    #[tokio::test]
207    async fn stream_ends_when_all_senders_dropped() {
208        let (sender, _initial_rx) = broadcast::channel(16);
209        let mut client =
210            BroadcastStreamingClient::new(sender.clone(), test_chain_id(), store_with([5]));
211        let mut connected = client.connect().await.unwrap();
212
213        sender.send(checkpoint(6)).unwrap();
214        // Drop every sender (the original plus the clone the client holds) so
215        // the channel closes once the buffered checkpoint is drained.
216        drop(sender);
217        drop(client);
218
219        // The seeded tip, then the buffered live checkpoint, then end-of-stream.
220        let tip = connected.stream.next().await.unwrap().unwrap();
221        assert_eq!(*tip.summary.sequence_number(), 5);
222        let live = connected.stream.next().await.unwrap().unwrap();
223        assert_eq!(*live.summary.sequence_number(), 6);
224        assert!(connected.stream.next().await.is_none());
225    }
226}