sui_core/
streamer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::subscription_handler::{EVENT_DISPATCH_BUFFER_SIZE, SubscriptionMetrics};
5use futures::Stream;
6use mysten_metrics::metered_channel::Sender;
7use mysten_metrics::spawn_monitored_task;
8use parking_lot::RwLock;
9use prometheus::Registry;
10use std::collections::BTreeMap;
11use std::fmt::Debug;
12use std::sync::Arc;
13use sui_json_rpc_types::Filter;
14use sui_types::base_types::ObjectID;
15use sui_types::error::{SuiError, SuiErrorKind};
16use tokio::sync::mpsc;
17use tokio_stream::wrappers::ReceiverStream;
18use tracing::{debug, warn};
19
20type Subscribers<T, F> = Arc<RwLock<BTreeMap<String, (tokio::sync::mpsc::Sender<T>, F)>>>;
21
22/// The Streamer splits a mpsc channel into multiple mpsc channels using the subscriber's `Filter<T>` object.
23/// Data will be sent to the subscribers in parallel and the subscription will be dropped if it received a send error.
24pub struct Streamer<T, S, F: Filter<T>> {
25    streamer_queue: Sender<T>,
26    subscribers: Subscribers<S, F>,
27    metrics: Arc<SubscriptionMetrics>,
28    metrics_label: &'static str,
29}
30
31impl<T, S, F> Streamer<T, S, F>
32where
33    S: From<T> + Clone + Debug + Send + Sync + 'static,
34    T: Clone + Send + Sync + 'static,
35    F: Filter<T> + Clone + Send + Sync + 'static + Clone,
36{
37    pub fn spawn(
38        buffer: usize,
39        metrics: Arc<SubscriptionMetrics>,
40        metrics_label: &'static str,
41    ) -> Self {
42        let channel_label = format!("streamer_{}", metrics_label);
43        let gauge = if let Some(metrics) = mysten_metrics::get_metrics() {
44            metrics
45                .channel_inflight
46                .with_label_values(&[&channel_label])
47        } else {
48            // We call init_metrics very early when starting a node. Therefore when this happens,
49            // it's probably in a test.
50            mysten_metrics::init_metrics(&Registry::default());
51            mysten_metrics::get_metrics()
52                .unwrap()
53                .channel_inflight
54                .with_label_values(&[&channel_label])
55        };
56
57        let (tx, rx) = mysten_metrics::metered_channel::channel(buffer, &gauge);
58        let streamer = Self {
59            streamer_queue: tx,
60            subscribers: Default::default(),
61            metrics: metrics.clone(),
62            metrics_label,
63        };
64        let mut rx = rx;
65        let subscribers = streamer.subscribers.clone();
66        spawn_monitored_task!(async move {
67            while let Some(data) = rx.recv().await {
68                Self::send_to_all_subscribers(
69                    subscribers.clone(),
70                    data,
71                    metrics.clone(),
72                    metrics_label,
73                )
74                .await;
75            }
76        });
77        streamer
78    }
79
80    async fn send_to_all_subscribers(
81        subscribers: Subscribers<S, F>,
82        data: T,
83        metrics: Arc<SubscriptionMetrics>,
84        metrics_label: &'static str,
85    ) {
86        let success_counter = metrics
87            .streaming_success
88            .with_label_values(&[metrics_label]);
89        let failure_counter = metrics
90            .streaming_failure
91            .with_label_values(&[metrics_label]);
92        let subscriber_count = metrics
93            .streaming_active_subscriber_number
94            .with_label_values(&[metrics_label]);
95
96        let to_remove = {
97            let mut to_remove = vec![];
98            let subscribers_snapshot = subscribers.read();
99            subscriber_count.set(subscribers_snapshot.len() as i64);
100
101            for (id, (subscriber, filter)) in subscribers_snapshot.iter() {
102                if !(filter.matches(&data)) {
103                    continue;
104                }
105                let data = data.clone();
106                match subscriber.try_send(data.into()) {
107                    Ok(_) => {
108                        debug!(subscription_id = id, "Streaming data to subscriber.");
109                        success_counter.inc();
110                    }
111                    Err(e) => {
112                        warn!(
113                            subscription_id = id,
114                            "Error when streaming data, removing subscriber. Error: {e}"
115                        );
116                        // It does not matter what the error is - channel full or closed, we remove the subscriber.
117                        // In the case of a full channel, this nudges the subscriber to catch up separately and not
118                        // miss any data.
119                        to_remove.push(id.clone());
120                        failure_counter.inc();
121                    }
122                }
123            }
124            to_remove
125        };
126        if !to_remove.is_empty() {
127            let mut subscribers = subscribers.write();
128            for sub in to_remove {
129                subscribers.remove(&sub);
130            }
131        }
132    }
133
134    /// Subscribe to the data stream filtered by the filter object.
135    pub fn subscribe(&self, filter: F) -> impl Stream<Item = S> + use<T, S, F> {
136        let (tx, rx) = mpsc::channel::<S>(EVENT_DISPATCH_BUFFER_SIZE);
137        self.subscribers
138            .write()
139            .insert(ObjectID::random().to_string(), (tx, filter));
140        ReceiverStream::new(rx)
141    }
142
143    pub fn try_send(&self, data: T) -> Result<(), SuiError> {
144        self.streamer_queue.try_send(data).map_err(|e| {
145            self.metrics
146                .dropped_submissions
147                .with_label_values(&[self.metrics_label])
148                .inc();
149
150            SuiErrorKind::FailedToDispatchSubscription {
151                error: e.to_string(),
152            }
153            .into()
154        })
155    }
156}