sui_rpc_api/
subscription.rs1use crate::metrics::SubscriptionMetrics;
5use std::sync::Arc;
6use sui_types::full_checkpoint_content::Checkpoint;
7use tokio::sync::mpsc;
8use tokio::sync::oneshot;
9use tracing::info;
10use tracing::trace;
11
12const CHECKPOINT_MAILBOX_SIZE: usize = 1024;
13const MAILBOX_SIZE: usize = 128;
14const SUBSCRIPTION_CHANNEL_SIZE: usize = 256;
15const MAX_SUBSCRIBERS: usize = 1024;
16
17struct SubscriptionRequest {
18 sender: oneshot::Sender<mpsc::Receiver<Arc<Checkpoint>>>,
19}
20
21#[derive(Clone)]
22pub struct SubscriptionServiceHandle {
23 sender: mpsc::Sender<SubscriptionRequest>,
24}
25
26impl SubscriptionServiceHandle {
27 pub async fn register_subscription(&self) -> Option<mpsc::Receiver<Arc<Checkpoint>>> {
28 let (sender, reciever) = oneshot::channel();
29 let request = SubscriptionRequest { sender };
30 self.sender.send(request).await.ok()?;
31
32 reciever.await.ok()
33 }
34}
35
36pub struct SubscriptionService {
37 checkpoint_mailbox: mpsc::Receiver<Checkpoint>,
41 mailbox: mpsc::Receiver<SubscriptionRequest>,
42 subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
43
44 metrics: SubscriptionMetrics,
45}
46
47impl SubscriptionService {
48 pub fn build(
49 registry: &prometheus::Registry,
50 ) -> (mpsc::Sender<Checkpoint>, SubscriptionServiceHandle) {
51 let metrics = SubscriptionMetrics::new(registry);
52 let (checkpoint_sender, checkpoint_mailbox) = mpsc::channel(CHECKPOINT_MAILBOX_SIZE);
53 let (subscription_request_sender, mailbox) = mpsc::channel(MAILBOX_SIZE);
54
55 tokio::spawn(
56 Self {
57 checkpoint_mailbox,
58 mailbox,
59 subscribers: Vec::new(),
60 metrics,
61 }
62 .start(),
63 );
64
65 (
66 checkpoint_sender,
67 SubscriptionServiceHandle {
68 sender: subscription_request_sender,
69 },
70 )
71 }
72
73 async fn start(mut self) {
74 loop {
76 tokio::select! {
77 maybe_checkpoint = self.checkpoint_mailbox.recv() => {
78 if let Some(checkpoint) = maybe_checkpoint {
81 self.handle_checkpoint(checkpoint);
82 } else {
83 break;
84 }
85 },
86 maybe_message = self.mailbox.recv() => {
87 if let Some(message) = maybe_message {
90 self.handle_message(message);
91 } else {
92 break;
93 }
94 },
95 }
96 }
97
98 info!("RPC Subscription Services ended");
99 }
100
101 fn handle_checkpoint(&mut self, checkpoint: Checkpoint) {
102 {
104 let last_sequence_number = self.metrics.last_recieved_checkpoint.get();
105 let sequence_number = *checkpoint.summary.sequence_number() as i64;
106
107 if last_sequence_number != 0 && (last_sequence_number + 1) != sequence_number {
108 panic!(
109 "recieved checkpoint out-of-order. expected checkpoint {}, recieved {}",
110 last_sequence_number + 1,
111 sequence_number
112 );
113 }
114
115 self.metrics.last_recieved_checkpoint.set(sequence_number);
117 }
118
119 let checkpoint = Arc::new(checkpoint);
120
121 self.subscribers.retain(|subscriber| {
124 match subscriber.try_send(Arc::clone(&checkpoint)) {
125 Ok(()) => {
126 trace!("succesfully enqueued checkpont for subscriber");
127 true }
129 Err(e) => {
130 trace!("unable to enqueue checkpoint for subscriber: {e}");
132 self.metrics.inflight_subscribers.dec();
133 false }
135 }
136 });
137 }
138
139 fn handle_message(&mut self, request: SubscriptionRequest) {
140 if self.subscribers.len() >= MAX_SUBSCRIBERS {
142 trace!(
143 "failed to register new subscriber: hit maximum number of subscribers {}",
144 MAX_SUBSCRIBERS
145 );
146 return;
147 }
148
149 let (sender, reciever) = mpsc::channel(SUBSCRIPTION_CHANNEL_SIZE);
150 match request.sender.send(reciever) {
151 Ok(()) => {
152 trace!("succesfully registered new subscriber");
153 self.metrics.inflight_subscribers.inc();
154 self.subscribers.push(sender);
155 }
156 Err(e) => {
157 trace!("failed to register new subscriber: {e:?}");
158 }
159 }
160 }
161}