sui_rpc_api/
subscription.rsuse crate::metrics::SubscriptionMetrics;
use crate::proto::rpc::v2beta::Checkpoint;
use std::sync::Arc;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::error;
use tracing::info;
use tracing::trace;
const CHECKPOINT_MAILBOX_SIZE: usize = 1024;
const MAILBOX_SIZE: usize = 128;
const SUBSCRIPTION_CHANNEL_SIZE: usize = 256;
const MAX_SUBSCRIBERS: usize = 1024;
struct SubscriptionRequest {
sender: oneshot::Sender<mpsc::Receiver<Arc<Checkpoint>>>,
}
#[derive(Clone)]
pub struct SubscriptionServiceHandle {
sender: mpsc::Sender<SubscriptionRequest>,
}
impl SubscriptionServiceHandle {
pub async fn register_subscription(&self) -> Option<mpsc::Receiver<Arc<Checkpoint>>> {
let (sender, reciever) = oneshot::channel();
let request = SubscriptionRequest { sender };
self.sender.send(request).await.ok()?;
reciever.await.ok()
}
}
pub struct SubscriptionService {
checkpoint_mailbox: mpsc::Receiver<CheckpointData>,
mailbox: mpsc::Receiver<SubscriptionRequest>,
subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
metrics: SubscriptionMetrics,
}
impl SubscriptionService {
pub fn build(
registry: &prometheus::Registry,
) -> (mpsc::Sender<CheckpointData>, SubscriptionServiceHandle) {
let metrics = SubscriptionMetrics::new(registry);
let (checkpoint_sender, checkpoint_mailbox) = mpsc::channel(CHECKPOINT_MAILBOX_SIZE);
let (subscription_request_sender, mailbox) = mpsc::channel(MAILBOX_SIZE);
tokio::spawn(
Self {
checkpoint_mailbox,
mailbox,
subscribers: Vec::new(),
metrics,
}
.start(),
);
(
checkpoint_sender,
SubscriptionServiceHandle {
sender: subscription_request_sender,
},
)
}
async fn start(mut self) {
loop {
tokio::select! {
maybe_checkpoint = self.checkpoint_mailbox.recv() => {
if let Some(checkpoint) = maybe_checkpoint {
self.handle_checkpoint(checkpoint);
} else {
break;
}
},
maybe_message = self.mailbox.recv() => {
if let Some(message) = maybe_message {
self.handle_message(message);
} else {
break;
}
},
}
}
info!("RPC Subscription Services ended");
}
fn handle_checkpoint(&mut self, checkpoint: CheckpointData) {
{
let last_sequence_number = self.metrics.last_recieved_checkpoint.get();
let sequence_number = *checkpoint.checkpoint_summary.sequence_number() as i64;
if last_sequence_number != 0 && (last_sequence_number + 1) != sequence_number {
panic!(
"recieved checkpoint out-of-order. expected checkpoint {}, recieved {}",
last_sequence_number + 1,
sequence_number
);
}
self.metrics.last_recieved_checkpoint.set(sequence_number);
}
let checkpoint =
match crate::grpc::v2beta::ledger_service::get_checkpoint::checkpoint_data_to_checkpoint_proto(
checkpoint,
&crate::field_mask::FieldMaskTree::new_wildcard(),
) {
Ok(checkpoint) => Arc::new(checkpoint),
Err(e) => {
error!("unable to convert checkpoint to proto: {e:?}");
return;
}
};
self.subscribers.retain(|subscriber| {
match subscriber.try_send(Arc::clone(&checkpoint)) {
Ok(()) => {
trace!("succesfully enqueued checkpont for subscriber");
true }
Err(e) => {
trace!("unable to enqueue checkpoint for subscriber: {e}");
self.metrics.inflight_subscribers.dec();
false }
}
});
}
fn handle_message(&mut self, request: SubscriptionRequest) {
if self.subscribers.len() >= MAX_SUBSCRIBERS {
trace!(
"failed to register new subscriber: hit maximum number of subscribers {}",
MAX_SUBSCRIBERS
);
return;
}
let (sender, reciever) = mpsc::channel(SUBSCRIPTION_CHANNEL_SIZE);
match request.sender.send(reciever) {
Ok(()) => {
trace!("succesfully registered new subscriber");
self.metrics.inflight_subscribers.inc();
self.subscribers.push(sender);
}
Err(e) => {
trace!("failed to register new subscriber: {e:?}");
}
}
}
}