sui_rpc_api/
subscription.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::metrics::SubscriptionMetrics;
5use std::sync::Arc;
6use sui_types::full_checkpoint_content::Checkpoint;
7use tokio::sync::mpsc;
8use tokio::sync::oneshot;
9use tracing::info;
10use tracing::trace;
11
12const CHECKPOINT_MAILBOX_SIZE: usize = 1024;
13const MAILBOX_SIZE: usize = 128;
14const SUBSCRIPTION_CHANNEL_SIZE: usize = 256;
15const MAX_SUBSCRIBERS: usize = 1024;
16
17struct SubscriptionRequest {
18    sender: oneshot::Sender<mpsc::Receiver<Arc<Checkpoint>>>,
19}
20
21#[derive(Clone)]
22pub struct SubscriptionServiceHandle {
23    sender: mpsc::Sender<SubscriptionRequest>,
24}
25
26impl SubscriptionServiceHandle {
27    pub async fn register_subscription(&self) -> Option<mpsc::Receiver<Arc<Checkpoint>>> {
28        let (sender, reciever) = oneshot::channel();
29        let request = SubscriptionRequest { sender };
30        self.sender.send(request).await.ok()?;
31
32        reciever.await.ok()
33    }
34}
35
36pub struct SubscriptionService {
37    // Mailbox for recieving `Checkpoint` from the Checkpoint Executor
38    //
39    // Expectation is that checkpoints are recieved in-order
40    checkpoint_mailbox: mpsc::Receiver<Checkpoint>,
41    mailbox: mpsc::Receiver<SubscriptionRequest>,
42    subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
43
44    metrics: SubscriptionMetrics,
45}
46
47impl SubscriptionService {
48    pub fn build(
49        registry: &prometheus::Registry,
50    ) -> (mpsc::Sender<Checkpoint>, SubscriptionServiceHandle) {
51        let metrics = SubscriptionMetrics::new(registry);
52        let (checkpoint_sender, checkpoint_mailbox) = mpsc::channel(CHECKPOINT_MAILBOX_SIZE);
53        let (subscription_request_sender, mailbox) = mpsc::channel(MAILBOX_SIZE);
54
55        tokio::spawn(
56            Self {
57                checkpoint_mailbox,
58                mailbox,
59                subscribers: Vec::new(),
60                metrics,
61            }
62            .start(),
63        );
64
65        (
66            checkpoint_sender,
67            SubscriptionServiceHandle {
68                sender: subscription_request_sender,
69            },
70        )
71    }
72
73    async fn start(mut self) {
74        // Start main loop.
75        loop {
76            tokio::select! {
77                maybe_checkpoint = self.checkpoint_mailbox.recv() => {
78                    // Once all handles to our checkpoint_mailbox have been dropped this
79                    // will yield `None` and we can terminate the event loop
80                    if let Some(checkpoint) = maybe_checkpoint {
81                        self.handle_checkpoint(checkpoint);
82                    } else {
83                        break;
84                    }
85                },
86                maybe_message = self.mailbox.recv() => {
87                    // Once all handles to our mailbox have been dropped this
88                    // will yield `None` and we can terminate the event loop
89                    if let Some(message) = maybe_message {
90                        self.handle_message(message);
91                    } else {
92                        break;
93                    }
94                },
95            }
96        }
97
98        info!("RPC Subscription Services ended");
99    }
100
101    fn handle_checkpoint(&mut self, checkpoint: Checkpoint) {
102        // Check that we recieved checkpoints in-order
103        {
104            let last_sequence_number = self.metrics.last_recieved_checkpoint.get();
105            let sequence_number = *checkpoint.summary.sequence_number() as i64;
106
107            if last_sequence_number != 0 && (last_sequence_number + 1) != sequence_number {
108                panic!(
109                    "recieved checkpoint out-of-order. expected checkpoint {}, recieved {}",
110                    last_sequence_number + 1,
111                    sequence_number
112                );
113            }
114
115            // Update the metric marking the latest checkpoint we've seen
116            self.metrics.last_recieved_checkpoint.set(sequence_number);
117        }
118
119        let checkpoint = Arc::new(checkpoint);
120
121        // Try to send the latest checkpoint to all subscribers. If a subscriber's channel is full
122        // then they are likely too slow so we drop them.
123        self.subscribers.retain(|subscriber| {
124            match subscriber.try_send(Arc::clone(&checkpoint)) {
125                Ok(()) => {
126                    trace!("succesfully enqueued checkpont for subscriber");
127                    true // Retain this subscriber
128                }
129                Err(e) => {
130                    // It does not matter what the error is - channel full or closed, we drop the subscriber.
131                    trace!("unable to enqueue checkpoint for subscriber: {e}");
132                    self.metrics.inflight_subscribers.dec();
133                    false // Drop this subscriber
134                }
135            }
136        });
137    }
138
139    fn handle_message(&mut self, request: SubscriptionRequest) {
140        // Check if we've reached the limit to the number of subscribers we can have at one time.
141        if self.subscribers.len() >= MAX_SUBSCRIBERS {
142            trace!(
143                "failed to register new subscriber: hit maximum number of subscribers {}",
144                MAX_SUBSCRIBERS
145            );
146            return;
147        }
148
149        let (sender, reciever) = mpsc::channel(SUBSCRIPTION_CHANNEL_SIZE);
150        match request.sender.send(reciever) {
151            Ok(()) => {
152                trace!("succesfully registered new subscriber");
153                self.metrics.inflight_subscribers.inc();
154                self.subscribers.push(sender);
155            }
156            Err(e) => {
157                trace!("failed to register new subscriber: {e:?}");
158            }
159        }
160    }
161}