sui_core/
rpc_store_streaming_client.rs1use std::sync::Arc;
21
22use anyhow::anyhow;
23use async_trait::async_trait;
24use futures::StreamExt;
25use futures::stream;
26use sui_indexer_alt_framework::ingestion::error::Error;
27use sui_indexer_alt_framework::ingestion::streaming_client::CheckpointStream;
28use sui_indexer_alt_framework::ingestion::streaming_client::CheckpointStreamingClient;
29use sui_types::digests::ChainIdentifier;
30use sui_types::full_checkpoint_content::Checkpoint;
31use sui_types::storage::ReadStore;
32use tokio::sync::broadcast;
33
34pub struct BroadcastStreamingClient<R> {
53 sender: broadcast::Sender<Arc<Checkpoint>>,
54 chain_id: ChainIdentifier,
55 store: R,
56}
57
58impl<R> BroadcastStreamingClient<R> {
59 pub fn new(
60 sender: broadcast::Sender<Arc<Checkpoint>>,
61 chain_id: ChainIdentifier,
62 store: R,
63 ) -> Self {
64 Self {
65 sender,
66 chain_id,
67 store,
68 }
69 }
70}
71
72impl<R: ReadStore> BroadcastStreamingClient<R> {
73 fn current_tip(&self) -> Result<Checkpoint, Error> {
75 let seq = self
76 .store
77 .get_latest_checkpoint_sequence_number()
78 .map_err(|e| Error::StreamingError(e.into()))?;
79 let summary = self
80 .store
81 .get_checkpoint_by_sequence_number(seq)
82 .ok_or_else(|| Error::StreamingError(anyhow!("checkpoint {seq} summary missing")))?;
83 let contents = self
84 .store
85 .get_checkpoint_contents_by_digest(&summary.content_digest)
86 .ok_or_else(|| Error::StreamingError(anyhow!("checkpoint {seq} contents missing")))?;
87 self.store
88 .get_checkpoint_data(summary, contents)
89 .map_err(|e| Error::StreamingError(e.into()))
90 }
91}
92
93#[async_trait]
94impl<R: ReadStore + Send + Sync + 'static> CheckpointStreamingClient
95 for BroadcastStreamingClient<R>
96{
97 async fn connect(&mut self) -> Result<CheckpointStream, Error> {
98 let receiver = self.sender.subscribe();
102
103 let tip = self.current_tip()?;
106
107 let live = stream::unfold(receiver, |mut receiver| async move {
111 match receiver.recv().await {
112 Ok(checkpoint) => Some((Ok((*checkpoint).clone()), receiver)),
113 Err(broadcast::error::RecvError::Lagged(skipped)) => Some((
114 Err(Error::StreamingError(anyhow!(
115 "broadcast stream lagged by {skipped} checkpoints"
116 ))),
117 receiver,
118 )),
119 Err(broadcast::error::RecvError::Closed) => None,
120 }
121 });
122
123 let stream = stream::once(async move { Ok(tip) }).chain(live).boxed();
127
128 Ok(CheckpointStream {
129 stream: tokio_stream::StreamExt::peekable(stream),
130 chain_id: self.chain_id,
131 })
132 }
133
134 async fn latest_checkpoint_number(&mut self) -> Result<u64, Error> {
135 self.store
138 .get_latest_checkpoint_sequence_number()
139 .map_err(|e| Error::StreamingError(e.into()))
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::rpc_store_test_utils::checkpoint;
147 use crate::rpc_store_test_utils::store_with;
148 use crate::rpc_store_test_utils::test_chain_id;
149
150 #[tokio::test]
151 async fn seeds_tip_then_streams_published_checkpoints_in_order() {
152 let (sender, _keep) = broadcast::channel(16);
153 let mut client =
155 BroadcastStreamingClient::new(sender.clone(), test_chain_id(), store_with([5]));
156
157 let mut connected = client.connect().await.unwrap();
158 assert_eq!(connected.chain_id, test_chain_id());
159
160 sender.send(checkpoint(6)).unwrap();
161 sender.send(checkpoint(7)).unwrap();
162
163 let tip = connected.stream.next().await.unwrap().unwrap();
165 let second = connected.stream.next().await.unwrap().unwrap();
166 let third = connected.stream.next().await.unwrap().unwrap();
167 assert_eq!(*tip.summary.sequence_number(), 5);
168 assert_eq!(*second.summary.sequence_number(), 6);
169 assert_eq!(*third.summary.sequence_number(), 7);
170 }
171
172 #[tokio::test]
173 async fn latest_checkpoint_number_reads_the_local_tip() {
174 let (sender, _keep) = broadcast::channel(16);
175 let mut client =
176 BroadcastStreamingClient::new(sender, test_chain_id(), store_with([0, 4, 9]));
177 assert_eq!(client.latest_checkpoint_number().await.unwrap(), 9);
178 }
179
180 #[tokio::test]
181 async fn lagging_receiver_yields_a_stream_error() {
182 let (sender, _keep) = broadcast::channel(2);
185 let mut client =
187 BroadcastStreamingClient::new(sender.clone(), test_chain_id(), store_with([9]));
188 let mut connected = client.connect().await.unwrap();
189
190 for seq in 0..4 {
191 sender.send(checkpoint(seq)).unwrap();
192 }
193
194 let tip = connected.stream.next().await.unwrap().unwrap();
196 assert_eq!(*tip.summary.sequence_number(), 9);
197 assert!(matches!(
198 connected.stream.next().await,
199 Some(Err(Error::StreamingError(_)))
200 ));
201 let next = connected.stream.next().await.unwrap().unwrap();
203 assert_eq!(*next.summary.sequence_number(), 2);
204 }
205
206 #[tokio::test]
207 async fn stream_ends_when_all_senders_dropped() {
208 let (sender, _initial_rx) = broadcast::channel(16);
209 let mut client =
210 BroadcastStreamingClient::new(sender.clone(), test_chain_id(), store_with([5]));
211 let mut connected = client.connect().await.unwrap();
212
213 sender.send(checkpoint(6)).unwrap();
214 drop(sender);
217 drop(client);
218
219 let tip = connected.stream.next().await.unwrap().unwrap();
221 assert_eq!(*tip.summary.sequence_number(), 5);
222 let live = connected.stream.next().await.unwrap().unwrap();
223 assert_eq!(*live.summary.sequence_number(), 6);
224 assert!(connected.stream.next().await.is_none());
225 }
226}