sui_rpc_api/grpc/v2/
subscription_service.rs1use std::pin::Pin;
5
6use crate::RpcService;
7use sui_rpc::field::FieldMaskTree;
8use sui_rpc::merge::Merge;
9use sui_rpc::proto::sui::rpc::v2::Checkpoint;
10use sui_rpc::proto::sui::rpc::v2::SubscribeCheckpointsRequest;
11use sui_rpc::proto::sui::rpc::v2::SubscribeCheckpointsResponse;
12use sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionService;
13
14#[tonic::async_trait]
15impl SubscriptionService for RpcService {
16 type SubscribeCheckpointsStream = Pin<
18 Box<
19 dyn tokio_stream::Stream<Item = Result<SubscribeCheckpointsResponse, tonic::Status>>
20 + Send,
21 >,
22 >;
23
24 async fn subscribe_checkpoints(
25 &self,
26 request: tonic::Request<SubscribeCheckpointsRequest>,
27 ) -> Result<tonic::Response<Self::SubscribeCheckpointsStream>, tonic::Status> {
28 let subscription_service_handle = self
29 .subscription_service_handle
30 .as_ref()
31 .ok_or_else(|| tonic::Status::unimplemented("subscription service not enabled"))?;
32 let read_mask = request.into_inner().read_mask.unwrap_or_default();
33 let read_mask = FieldMaskTree::from(read_mask);
34
35 let Some(mut receiver) = subscription_service_handle.register_subscription().await else {
36 return Err(tonic::Status::unavailable(
37 "too many existing subscriptions",
38 ));
39 };
40
41 let store = self.reader.clone();
42 let response = Box::pin(async_stream::stream! {
43 while let Some(checkpoint) = receiver.recv().await {
44 let cursor = checkpoint.summary.sequence_number;
45
46 let mut checkpoint_message = Checkpoint::merge_from(
47 checkpoint.as_ref(),
48 &read_mask
49 );
50
51 if read_mask.contains("transactions.balance_changes") {
52 for (txn, txn_digest) in checkpoint_message.transactions_mut().iter_mut().zip(
53 checkpoint
54 .transactions
55 .iter()
56 .map(|t| t.transaction.digest()),
57 ) {
58 if let Some(info) = store.get_transaction_info(&txn_digest)
59 {
60 *txn.balance_changes_mut() = info.balance_changes
61 .into_iter()
62 .map(sui_rpc::proto::sui::rpc::v2::BalanceChange::from)
63 .collect::<Vec<_>>();
64 }
65 }
66 }
67
68 let mut response = SubscribeCheckpointsResponse::default();
69 response.cursor = Some(cursor);
70 response.checkpoint = Some(checkpoint_message);
71
72 yield Ok(response);
73 }
74 });
75
76 Ok(tonic::Response::new(response))
77 }
78}