sui_core/
subscription_handler.rs1use std::sync::Arc;
5
6use prometheus::{
7 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounterVec,
8 IntGaugeVec, Registry,
9};
10use tokio_stream::Stream;
11use tracing::{error, instrument, trace};
12
13use crate::streamer::Streamer;
14use sui_json_rpc_types::{
15 EffectsWithInput, EventFilter, SuiTransactionBlockEffects, SuiTransactionBlockEvents,
16 TransactionFilter,
17};
18use sui_json_rpc_types::{SuiEvent, SuiTransactionBlockEffectsAPI};
19use sui_types::error::SuiResult;
20use sui_types::transaction::TransactionData;
21
22#[cfg(test)]
23#[path = "unit_tests/subscription_handler_tests.rs"]
24mod subscription_handler_tests;
25
26pub const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;
27
28pub struct SubscriptionMetrics {
29 pub streaming_success: IntCounterVec,
30 pub streaming_failure: IntCounterVec,
31 pub streaming_active_subscriber_number: IntGaugeVec,
32 pub dropped_submissions: IntCounterVec,
33}
34
35impl SubscriptionMetrics {
36 pub fn new(registry: &Registry) -> Self {
37 Self {
38 streaming_success: register_int_counter_vec_with_registry!(
39 "streaming_success",
40 "Total number of items that are streamed successfully",
41 &["type"],
42 registry,
43 )
44 .unwrap(),
45 streaming_failure: register_int_counter_vec_with_registry!(
46 "streaming_failure",
47 "Total number of items that fail to be streamed",
48 &["type"],
49 registry,
50 )
51 .unwrap(),
52 streaming_active_subscriber_number: register_int_gauge_vec_with_registry!(
53 "streaming_active_subscriber_number",
54 "Current number of active subscribers",
55 &["type"],
56 registry,
57 )
58 .unwrap(),
59 dropped_submissions: register_int_counter_vec_with_registry!(
60 "streaming_dropped_submissions",
61 "Total number of submissions that are dropped",
62 &["type"],
63 registry,
64 )
65 .unwrap(),
66 }
67 }
68}
69
70pub struct SubscriptionHandler {
71 event_streamer: Streamer<SuiEvent, SuiEvent, EventFilter>,
72 transaction_streamer: Streamer<EffectsWithInput, SuiTransactionBlockEffects, TransactionFilter>,
73}
74
75impl SubscriptionHandler {
76 pub fn new(registry: &Registry) -> Self {
77 let metrics = Arc::new(SubscriptionMetrics::new(registry));
78 Self {
79 event_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics.clone(), "event"),
80 transaction_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics, "tx"),
81 }
82 }
83}
84
85impl SubscriptionHandler {
86 #[instrument(level = "trace", skip_all, fields(tx_digest =? effects.transaction_digest()), err)]
87 pub fn process_tx(
88 &self,
89 input: &TransactionData,
90 effects: &SuiTransactionBlockEffects,
91 events: &SuiTransactionBlockEvents,
92 ) -> SuiResult {
93 trace!(
94 num_events = events.data.len(),
95 tx_digest =? effects.transaction_digest(),
96 "Processing tx/event subscription"
97 );
98
99 if let Err(e) = self.transaction_streamer.try_send(EffectsWithInput {
100 input: input.clone(),
101 effects: effects.clone(),
102 }) {
103 error!(error =? e, "Failed to send transaction to dispatch");
104 }
105
106 for event in events.data.clone() {
108 if let Err(e) = self.event_streamer.try_send(event) {
109 error!(error =? e, "Failed to send event to dispatch");
110 }
111 }
112 Ok(())
113 }
114
115 pub fn subscribe_events(&self, filter: EventFilter) -> impl Stream<Item = SuiEvent> {
116 self.event_streamer.subscribe(filter)
117 }
118
119 pub fn subscribe_transactions(
120 &self,
121 filter: TransactionFilter,
122 ) -> impl Stream<Item = SuiTransactionBlockEffects> {
123 self.transaction_streamer.subscribe(filter)
124 }
125}