sui_rpc_api/grpc/v2/
subscription_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::pin::Pin;
5
6use crate::RpcService;
7use sui_rpc::field::FieldMaskTree;
8use sui_rpc::merge::Merge;
9use sui_rpc::proto::sui::rpc::v2::Checkpoint;
10use sui_rpc::proto::sui::rpc::v2::SubscribeCheckpointsRequest;
11use sui_rpc::proto::sui::rpc::v2::SubscribeCheckpointsResponse;
12use sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionService;
13
14#[tonic::async_trait]
15impl SubscriptionService for RpcService {
16    /// Server streaming response type for the SubscribeCheckpoints method.
17    type SubscribeCheckpointsStream = Pin<
18        Box<
19            dyn tokio_stream::Stream<Item = Result<SubscribeCheckpointsResponse, tonic::Status>>
20                + Send,
21        >,
22    >;
23
24    async fn subscribe_checkpoints(
25        &self,
26        request: tonic::Request<SubscribeCheckpointsRequest>,
27    ) -> Result<tonic::Response<Self::SubscribeCheckpointsStream>, tonic::Status> {
28        let subscription_service_handle = self
29            .subscription_service_handle
30            .as_ref()
31            .ok_or_else(|| tonic::Status::unimplemented("subscription service not enabled"))?;
32        let read_mask = request.into_inner().read_mask.unwrap_or_default();
33        let read_mask = FieldMaskTree::from(read_mask);
34
35        let Some(mut receiver) = subscription_service_handle.register_subscription().await else {
36            return Err(tonic::Status::unavailable(
37                "too many existing subscriptions",
38            ));
39        };
40
41        let store = self.reader.clone();
42        let response = Box::pin(async_stream::stream! {
43            while let Some(checkpoint) = receiver.recv().await {
44                let cursor = checkpoint.summary.sequence_number;
45
46                let mut checkpoint_message = Checkpoint::merge_from(
47                    checkpoint.as_ref(),
48                    &read_mask
49                );
50
51                if read_mask.contains("transactions.balance_changes") {
52                    for (txn, txn_digest) in checkpoint_message.transactions_mut().iter_mut().zip(
53                        checkpoint
54                            .transactions
55                            .iter()
56                            .map(|t| t.transaction.digest()),
57                    ) {
58                        if let Some(info) = store.get_transaction_info(&txn_digest)
59                        {
60                            *txn.balance_changes_mut() = info.balance_changes
61                                .into_iter()
62                                .map(sui_rpc::proto::sui::rpc::v2::BalanceChange::from)
63                                .collect::<Vec<_>>();
64                        }
65                    }
66                }
67
68                let mut response = SubscribeCheckpointsResponse::default();
69                response.cursor = Some(cursor);
70                response.checkpoint = Some(checkpoint_message);
71
72                yield Ok(response);
73            }
74        });
75
76        Ok(tonic::Response::new(response))
77    }
78}