sui_rpc_api/grpc/v2/
subscription_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::RpcService;
5use mysten_common::ZipDebugEqIteratorExt;
6use sui_rpc::field::FieldMaskTree;
7use sui_rpc::merge::Merge;
8use sui_rpc::proto::sui::rpc::v2::Checkpoint;
9use sui_rpc::proto::sui::rpc::v2::SubscribeCheckpointsRequest;
10use sui_rpc::proto::sui::rpc::v2::SubscribeCheckpointsResponse;
11use sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionService;
12use sui_types::balance_change::derive_balance_changes_2;
13use tonic::codegen::BoxStream;
14
15#[tonic::async_trait]
16impl SubscriptionService for RpcService {
17    async fn subscribe_checkpoints(
18        &self,
19        request: tonic::Request<SubscribeCheckpointsRequest>,
20    ) -> Result<tonic::Response<BoxStream<SubscribeCheckpointsResponse>>, tonic::Status> {
21        let subscription_service_handle = self
22            .subscription_service_handle
23            .as_ref()
24            .ok_or_else(|| tonic::Status::unimplemented("subscription service not enabled"))?;
25        let read_mask = request.into_inner().read_mask.unwrap_or_default();
26        let read_mask = FieldMaskTree::from(read_mask);
27
28        let Some(mut receiver) = subscription_service_handle.register_subscription().await else {
29            return Err(tonic::Status::unavailable(
30                "too many existing subscriptions",
31            ));
32        };
33
34        let response = Box::pin(async_stream::stream! {
35            while let Some(checkpoint) = receiver.recv().await {
36                let cursor = checkpoint.summary.sequence_number;
37
38                let mut checkpoint_message = Checkpoint::merge_from(
39                    checkpoint.as_ref(),
40                    &read_mask
41                );
42
43                if read_mask.contains("transactions.balance_changes") {
44                    for (txn, effects) in checkpoint_message.transactions_mut().iter_mut().zip_debug_eq(
45                        checkpoint
46                            .transactions
47                            .iter()
48                            .map(|t| &t.effects),
49                    ) {
50                        *txn.balance_changes_mut() = derive_balance_changes_2(effects, &checkpoint.object_set)
51                            .into_iter()
52                            .map(Into::into)
53                            .collect();
54                    }
55                }
56
57                let mut response = SubscribeCheckpointsResponse::default();
58                response.cursor = Some(cursor);
59                response.checkpoint = Some(checkpoint_message);
60
61                yield Ok(response);
62            }
63        });
64
65        Ok(tonic::Response::new(response))
66    }
67}