sui_core/
subscription_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use prometheus::{
7    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounterVec,
8    IntGaugeVec, Registry,
9};
10use tokio_stream::Stream;
11use tracing::{error, instrument, trace};
12
13use crate::streamer::Streamer;
14use sui_json_rpc_types::{
15    EffectsWithInput, EventFilter, SuiTransactionBlockEffects, SuiTransactionBlockEvents,
16    TransactionFilter,
17};
18use sui_json_rpc_types::{SuiEvent, SuiTransactionBlockEffectsAPI};
19use sui_types::error::SuiResult;
20use sui_types::transaction::TransactionData;
21
22#[cfg(test)]
23#[path = "unit_tests/subscription_handler_tests.rs"]
24mod subscription_handler_tests;
25
26pub const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;
27
28pub struct SubscriptionMetrics {
29    pub streaming_success: IntCounterVec,
30    pub streaming_failure: IntCounterVec,
31    pub streaming_active_subscriber_number: IntGaugeVec,
32    pub dropped_submissions: IntCounterVec,
33}
34
35impl SubscriptionMetrics {
36    pub fn new(registry: &Registry) -> Self {
37        Self {
38            streaming_success: register_int_counter_vec_with_registry!(
39                "streaming_success",
40                "Total number of items that are streamed successfully",
41                &["type"],
42                registry,
43            )
44            .unwrap(),
45            streaming_failure: register_int_counter_vec_with_registry!(
46                "streaming_failure",
47                "Total number of items that fail to be streamed",
48                &["type"],
49                registry,
50            )
51            .unwrap(),
52            streaming_active_subscriber_number: register_int_gauge_vec_with_registry!(
53                "streaming_active_subscriber_number",
54                "Current number of active subscribers",
55                &["type"],
56                registry,
57            )
58            .unwrap(),
59            dropped_submissions: register_int_counter_vec_with_registry!(
60                "streaming_dropped_submissions",
61                "Total number of submissions that are dropped",
62                &["type"],
63                registry,
64            )
65            .unwrap(),
66        }
67    }
68}
69
70pub struct SubscriptionHandler {
71    event_streamer: Streamer<SuiEvent, SuiEvent, EventFilter>,
72    transaction_streamer: Streamer<EffectsWithInput, SuiTransactionBlockEffects, TransactionFilter>,
73}
74
75impl SubscriptionHandler {
76    pub fn new(registry: &Registry) -> Self {
77        let metrics = Arc::new(SubscriptionMetrics::new(registry));
78        Self {
79            event_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics.clone(), "event"),
80            transaction_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics, "tx"),
81        }
82    }
83}
84
85impl SubscriptionHandler {
86    #[instrument(level = "trace", skip_all, fields(tx_digest =? effects.transaction_digest()), err)]
87    pub fn process_tx(
88        &self,
89        input: &TransactionData,
90        effects: &SuiTransactionBlockEffects,
91        events: &SuiTransactionBlockEvents,
92    ) -> SuiResult {
93        trace!(
94            num_events = events.data.len(),
95            tx_digest =? effects.transaction_digest(),
96            "Processing tx/event subscription"
97        );
98
99        if let Err(e) = self.transaction_streamer.try_send(EffectsWithInput {
100            input: input.clone(),
101            effects: effects.clone(),
102        }) {
103            error!(error =? e, "Failed to send transaction to dispatch");
104        }
105
106        // serially dispatch event processing to honor events' orders.
107        for event in events.data.clone() {
108            if let Err(e) = self.event_streamer.try_send(event) {
109                error!(error =? e, "Failed to send event to dispatch");
110            }
111        }
112        Ok(())
113    }
114
115    pub fn subscribe_events(&self, filter: EventFilter) -> impl Stream<Item = SuiEvent> {
116        self.event_streamer.subscribe(filter)
117    }
118
119    pub fn subscribe_transactions(
120        &self,
121        filter: TransactionFilter,
122    ) -> impl Stream<Item = SuiTransactionBlockEffects> {
123        self.transaction_streamer.subscribe(filter)
124    }
125}