sui_core/validator_client_monitor/
monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority_aggregator::AuthorityAggregator;
5use crate::authority_client::AuthorityAPI;
6use crate::validator_client_monitor::stats::ClientObservedStats;
7use crate::validator_client_monitor::{
8    OperationFeedback, OperationType, metrics::ValidatorClientMetrics,
9};
10use arc_swap::ArcSwap;
11use parking_lot::RwLock;
12use rand::seq::SliceRandom;
13use std::collections::HashMap;
14use std::{
15    sync::Arc,
16    time::{Duration, Instant},
17};
18use strum::IntoEnumIterator;
19use sui_config::validator_client_monitor_config::ValidatorClientMonitorConfig;
20use sui_types::committee::Committee;
21use sui_types::messages_grpc::TxType;
22use sui_types::{base_types::AuthorityName, messages_grpc::ValidatorHealthRequest};
23use tokio::{
24    task::JoinSet,
25    time::{interval, timeout},
26};
27use tracing::{debug, info, warn};
28
29/// Monitors validator interactions from the client's perspective.
30///
31/// This component:
32/// - Collects client-side metrics from TransactionDriver operations
33/// - Runs periodic health checks on all validators from the client
34/// - Maintains client-observed statistics for reliability and latency
35/// - Provides intelligent validator selection based on client-observed performance
36/// - Handles epoch changes by cleaning up stale validator data
37///
38/// The monitor runs a background task for health checks and uses
39/// moving averages to smooth client-side measurements.
40pub struct ValidatorClientMonitor<A: Clone> {
41    config: ValidatorClientMonitorConfig,
42    metrics: Arc<ValidatorClientMetrics>,
43    client_stats: RwLock<ClientObservedStats>,
44    authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
45    cached_latencies: RwLock<HashMap<TxType, HashMap<AuthorityName, Duration>>>,
46}
47
48impl<A> ValidatorClientMonitor<A>
49where
50    A: AuthorityAPI + Send + Sync + 'static + Clone,
51{
52    pub fn new(
53        config: ValidatorClientMonitorConfig,
54        metrics: Arc<ValidatorClientMetrics>,
55        authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
56    ) -> Arc<Self> {
57        info!(
58            "Validator client monitor starting with config: {:?}",
59            config
60        );
61
62        let monitor = Arc::new(Self {
63            config: config.clone(),
64            metrics,
65            client_stats: RwLock::new(ClientObservedStats::new(config)),
66            authority_aggregator,
67            cached_latencies: RwLock::new(HashMap::new()),
68        });
69
70        let monitor_clone = monitor.clone();
71        tokio::spawn(async move {
72            monitor_clone.run_health_checks().await;
73        });
74
75        monitor
76    }
77
78    #[cfg(test)]
79    pub fn new_for_test(authority_aggregator: Arc<AuthorityAggregator<A>>) -> Arc<Self> {
80        use prometheus::Registry;
81
82        Self::new(
83            ValidatorClientMonitorConfig::default(),
84            Arc::new(ValidatorClientMetrics::new(&Registry::default())),
85            Arc::new(ArcSwap::new(authority_aggregator)),
86        )
87    }
88
89    /// Background task that runs periodic health checks on all validators.
90    ///
91    /// Sends health check requests to all validators in parallel and records
92    /// the results (success/failure and latency). Timeouts are treated as
93    /// failures without recording latency to avoid polluting latency statistics.
94    async fn run_health_checks(self: Arc<Self>) {
95        let mut interval = interval(self.config.health_check_interval);
96
97        loop {
98            interval.tick().await;
99
100            let authority_agg = self.authority_aggregator.load();
101
102            let current_validators: Vec<_> = authority_agg.committee.names().cloned().collect();
103            self.client_stats
104                .write()
105                .retain_validators(&current_validators);
106
107            let mut tasks = JoinSet::new();
108
109            for (name, safe_client) in authority_agg.authority_clients.iter() {
110                let name = *name;
111                let display_name = authority_agg.get_display_name(&name);
112                let client = safe_client.clone();
113                let timeout_duration = self.config.health_check_timeout;
114                let monitor = self.clone();
115
116                tasks.spawn(async move {
117                    let start = Instant::now();
118                    match timeout(
119                        timeout_duration,
120                        client.validator_health(ValidatorHealthRequest {}),
121                    )
122                    .await
123                    {
124                        // TODO: Actually use the response details.
125                        Ok(Ok(_response)) => {
126                            let latency = start.elapsed();
127                            monitor.record_interaction_result(OperationFeedback {
128                                authority_name: name,
129                                display_name: display_name.clone(),
130                                operation: OperationType::HealthCheck,
131                                ping_type: None,
132                                result: Ok(latency),
133                            });
134                        }
135                        Ok(Err(_)) => {
136                            let _latency = start.elapsed();
137                            monitor.record_interaction_result(OperationFeedback {
138                                authority_name: name,
139                                display_name: display_name.clone(),
140                                operation: OperationType::HealthCheck,
141                                ping_type: None,
142                                result: Err(()),
143                            });
144                        }
145                        Err(_) => {
146                            monitor.record_interaction_result(OperationFeedback {
147                                authority_name: name,
148                                display_name,
149                                operation: OperationType::HealthCheck,
150                                ping_type: None,
151                                result: Err(()),
152                            });
153                        }
154                    }
155                });
156            }
157
158            while let Some(result) = tasks.join_next().await {
159                if let Err(e) = result {
160                    warn!("Health check task failed: {}", e);
161                }
162            }
163
164            self.update_cached_latencies(&authority_agg);
165        }
166    }
167}
168
169impl<A: Clone> ValidatorClientMonitor<A> {
170    /// Calculate and cache latencies for all validators.
171    ///
172    /// This method is called periodically after health checks complete to update
173    /// the cached validator latencies. Those are the end to end latencies as calculated for each validator
174    /// taking into account the reliability of the validator.
175    fn update_cached_latencies(&self, authority_agg: &AuthorityAggregator<A>) {
176        let committee = &authority_agg.committee;
177        let mut cached_latencies = self.cached_latencies.write();
178
179        for tx_type in TxType::iter() {
180            let latencies_map = self
181                .client_stats
182                .read()
183                .get_all_validator_stats(committee, tx_type);
184
185            for (validator, latency) in latencies_map.iter() {
186                debug!(
187                    "Validator {}, tx type {}: latency {}",
188                    validator,
189                    tx_type.as_str(),
190                    latency.as_secs_f64()
191                );
192                let display_name = authority_agg.get_display_name(validator);
193                self.metrics
194                    .performance
195                    .with_label_values(&[&display_name, tx_type.as_str()])
196                    .set(latency.as_secs_f64());
197            }
198
199            cached_latencies.insert(tx_type, latencies_map);
200        }
201    }
202
203    /// Record client-observed interaction result with a validator.
204    ///
205    /// Records operation results including success/failure status and latency
206    /// from the client's perspective. Updates both Prometheus metrics and
207    /// internal client statistics. This is the primary interface for the
208    /// TransactionDriver to report client-observed validator interactions.
209    /// TODO: Consider adding a byzantine flag to the feedback.
210    pub fn record_interaction_result(&self, feedback: OperationFeedback) {
211        let operation_str = match feedback.operation {
212            OperationType::Submit => "submit",
213            OperationType::Effects => "effects",
214            OperationType::HealthCheck => "health_check",
215            OperationType::FastPath => "fast_path",
216            OperationType::Consensus => "consensus",
217        };
218        let ping_label = if feedback.ping_type.is_some() {
219            "true"
220        } else {
221            "false"
222        };
223
224        match feedback.result {
225            Ok(latency) => {
226                self.metrics
227                    .observed_latency
228                    .with_label_values(&[&feedback.display_name, operation_str, ping_label])
229                    .observe(latency.as_secs_f64());
230                self.metrics
231                    .operation_success
232                    .with_label_values(&[&feedback.display_name, operation_str, ping_label])
233                    .inc();
234            }
235            Err(()) => {
236                self.metrics
237                    .operation_failure
238                    .with_label_values(&[&feedback.display_name, operation_str, ping_label])
239                    .inc();
240            }
241        }
242
243        let mut client_stats = self.client_stats.write();
244        client_stats.record_interaction_result(feedback);
245    }
246
247    /// Select validators based on client-observed performance for the given transaction type.
248    ///
249    /// The current committee is passed in to ensure this function has the latest committee information.
250    ///
251    /// Also the tx type is passed in so that we can select validators based on their respective latencies
252    /// for the transaction type.
253    ///
254    /// Validators with latencies within `delta` of the lowest latency in the given transaction type
255    /// are shuffled, to balance the load among the fastest validators.
256    ///
257    /// Returns a vector containing all validators, where
258    /// 1. Fast validators within `delta` of the lowest latency are shuffled.
259    /// 2. Remaining slow validators are sorted by latency in ascending order.
260    pub fn select_shuffled_preferred_validators(
261        &self,
262        committee: &Committee,
263        tx_type: TxType,
264        delta: f64,
265    ) -> Vec<AuthorityName> {
266        let mut rng = rand::thread_rng();
267
268        let cached_latencies = self.cached_latencies.read();
269        let Some(cached_latencies) = cached_latencies.get(&tx_type) else {
270            let mut validators: Vec<_> = committee.names().cloned().collect();
271            validators.shuffle(&mut rng);
272            return validators;
273        };
274
275        // Since the cached latencies are updated periodically, it is possible that it was ran on
276        // an out-of-date committee.
277        let mut validator_with_latencies: Vec<_> = committee
278            .names()
279            .map(|v| {
280                (
281                    *v,
282                    cached_latencies.get(v).cloned().unwrap_or(Duration::ZERO),
283                )
284            })
285            .collect();
286        if validator_with_latencies.is_empty() {
287            return vec![];
288        }
289        // Shuffle the validators to balance the load among validators with the same latency.
290        validator_with_latencies.shuffle(&mut rng);
291        // Sort by latency in ascending order. We want to select the validators with the lowest latencies.
292        validator_with_latencies.sort_by_key(|(_, latency)| *latency);
293
294        // Shuffle the validators within delta of the lowest latency, for load balancing.
295        let lowest_latency = validator_with_latencies[0].1;
296        let threshold = lowest_latency.mul_f64(1.0 + delta);
297        let k = validator_with_latencies
298            .iter()
299            .enumerate()
300            .find(|(_, (_, latency))| *latency > threshold)
301            .map(|(i, _)| i)
302            .unwrap_or(validator_with_latencies.len());
303        validator_with_latencies[..k].shuffle(&mut rng);
304        self.metrics
305            .shuffled_validators
306            .with_label_values(&[tx_type.as_str()])
307            .observe(k as f64);
308
309        validator_with_latencies
310            .into_iter()
311            .map(|(v, _)| v)
312            .collect()
313    }
314
315    #[cfg(test)]
316    pub fn force_update_cached_latencies(&self, authority_agg: &AuthorityAggregator<A>) {
317        self.update_cached_latencies(authority_agg);
318    }
319
320    #[cfg(test)]
321    pub fn get_client_stats_len(&self) -> usize {
322        self.client_stats.read().validator_stats.len()
323    }
324
325    #[cfg(test)]
326    pub fn has_validator_stats(&self, validator: &AuthorityName) -> bool {
327        self.client_stats
328            .read()
329            .validator_stats
330            .contains_key(validator)
331    }
332}