sui_core/validator_client_monitor/
monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority_aggregator::AuthorityAggregator;
5use crate::authority_client::AuthorityAPI;
6use crate::validator_client_monitor::stats::ClientObservedStats;
7use crate::validator_client_monitor::{
8    OperationFeedback, OperationType, metrics::ValidatorClientMetrics,
9};
10use arc_swap::ArcSwap;
11use parking_lot::RwLock;
12use rand::seq::SliceRandom;
13use std::collections::HashMap;
14use std::{
15    sync::Arc,
16    time::{Duration, Instant},
17};
18use sui_config::validator_client_monitor_config::ValidatorClientMonitorConfig;
19use sui_types::committee::Committee;
20use sui_types::{base_types::AuthorityName, messages_grpc::ValidatorHealthRequest};
21use tokio::{
22    task::JoinSet,
23    time::{interval, timeout},
24};
25use tracing::{debug, info, warn};
26
27/// Monitors validator interactions from the client's perspective.
28///
29/// This component:
30/// - Collects client-side metrics from TransactionDriver operations
31/// - Runs periodic health checks on all validators from the client
32/// - Maintains client-observed statistics for reliability and latency
33/// - Provides intelligent validator selection based on client-observed performance
34/// - Handles epoch changes by cleaning up stale validator data
35///
36/// The monitor runs a background task for health checks and uses
37/// moving averages to smooth client-side measurements.
38pub struct ValidatorClientMonitor<A: Clone> {
39    config: ValidatorClientMonitorConfig,
40    metrics: Arc<ValidatorClientMetrics>,
41    client_stats: RwLock<ClientObservedStats>,
42    authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
43    cached_latencies: RwLock<HashMap<AuthorityName, Duration>>,
44}
45
46impl<A> ValidatorClientMonitor<A>
47where
48    A: AuthorityAPI + Send + Sync + 'static + Clone,
49{
50    pub fn new(
51        config: ValidatorClientMonitorConfig,
52        metrics: Arc<ValidatorClientMetrics>,
53        authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
54    ) -> Arc<Self> {
55        info!(
56            "Validator client monitor starting with config: {:?}",
57            config
58        );
59
60        let monitor = Arc::new(Self {
61            config: config.clone(),
62            metrics,
63            client_stats: RwLock::new(ClientObservedStats::new(config)),
64            authority_aggregator,
65            cached_latencies: RwLock::new(HashMap::new()),
66        });
67
68        let monitor_clone = monitor.clone();
69        tokio::spawn(async move {
70            monitor_clone.run_health_checks().await;
71        });
72
73        monitor
74    }
75
76    #[cfg(test)]
77    pub fn new_for_test(authority_aggregator: Arc<AuthorityAggregator<A>>) -> Arc<Self> {
78        use prometheus::Registry;
79
80        Self::new(
81            ValidatorClientMonitorConfig::default(),
82            Arc::new(ValidatorClientMetrics::new(&Registry::default())),
83            Arc::new(ArcSwap::new(authority_aggregator)),
84        )
85    }
86
87    /// Background task that runs periodic health checks on all validators.
88    ///
89    /// Sends health check requests to all validators in parallel and records
90    /// the results (success/failure and latency). Timeouts are treated as
91    /// failures without recording latency to avoid polluting latency statistics.
92    async fn run_health_checks(self: Arc<Self>) {
93        let mut interval = interval(self.config.health_check_interval);
94
95        loop {
96            interval.tick().await;
97
98            let authority_agg = self.authority_aggregator.load();
99
100            let current_validators: Vec<_> = authority_agg.committee.names().cloned().collect();
101            self.client_stats
102                .write()
103                .retain_validators(&current_validators);
104
105            let mut tasks = JoinSet::new();
106
107            for (name, safe_client) in authority_agg.authority_clients.iter() {
108                let name = *name;
109                let display_name = authority_agg.get_display_name(&name);
110                let client = safe_client.clone();
111                let timeout_duration = self.config.health_check_timeout;
112                let monitor = self.clone();
113
114                tasks.spawn(async move {
115                    let start = Instant::now();
116                    match timeout(
117                        timeout_duration,
118                        client.validator_health(ValidatorHealthRequest {}),
119                    )
120                    .await
121                    {
122                        // TODO: Actually use the response details.
123                        Ok(Ok(_response)) => {
124                            let latency = start.elapsed();
125                            monitor.record_interaction_result(OperationFeedback {
126                                authority_name: name,
127                                display_name: display_name.clone(),
128                                operation: OperationType::HealthCheck,
129                                ping_type: None,
130                                result: Ok(latency),
131                            });
132                        }
133                        Ok(Err(_)) => {
134                            let _latency = start.elapsed();
135                            monitor.record_interaction_result(OperationFeedback {
136                                authority_name: name,
137                                display_name: display_name.clone(),
138                                operation: OperationType::HealthCheck,
139                                ping_type: None,
140                                result: Err(()),
141                            });
142                        }
143                        Err(_) => {
144                            monitor.record_interaction_result(OperationFeedback {
145                                authority_name: name,
146                                display_name,
147                                operation: OperationType::HealthCheck,
148                                ping_type: None,
149                                result: Err(()),
150                            });
151                        }
152                    }
153                });
154            }
155
156            while let Some(result) = tasks.join_next().await {
157                if let Err(e) = result {
158                    warn!("Health check task failed: {}", e);
159                }
160            }
161
162            self.update_cached_latencies(&authority_agg);
163        }
164    }
165}
166
167impl<A: Clone> ValidatorClientMonitor<A> {
168    /// Calculate and cache latencies for all validators.
169    ///
170    /// This method is called periodically after health checks complete to update
171    /// the cached validator latencies. Those are the end to end latencies as calculated for each validator
172    /// taking into account the reliability of the validator.
173    fn update_cached_latencies(&self, authority_agg: &AuthorityAggregator<A>) {
174        let committee = &authority_agg.committee;
175        let mut cached_latencies = self.cached_latencies.write();
176
177        let latencies_map = self.client_stats.read().get_all_validator_stats(committee);
178
179        for (validator, latency) in latencies_map.iter() {
180            debug!("Validator {}, latency {}", validator, latency.as_secs_f64());
181            let display_name = authority_agg.get_display_name(validator);
182            self.metrics
183                .performance
184                .with_label_values(&[display_name.as_str()])
185                .set(latency.as_secs_f64());
186            cached_latencies.insert(*validator, *latency);
187        }
188    }
189
190    /// Record client-observed interaction result with a validator.
191    ///
192    /// Records operation results including success/failure status and latency
193    /// from the client's perspective. Updates both Prometheus metrics and
194    /// internal client statistics. This is the primary interface for the
195    /// TransactionDriver to report client-observed validator interactions.
196    /// TODO: Consider adding a byzantine flag to the feedback.
197    pub fn record_interaction_result(&self, feedback: OperationFeedback) {
198        let operation_str = match feedback.operation {
199            OperationType::Submit => "submit",
200            OperationType::Effects => "effects",
201            OperationType::HealthCheck => "health_check",
202            OperationType::FastPath => "fast_path",
203            OperationType::Consensus => "consensus",
204        };
205        let ping_label = if feedback.ping_type.is_some() {
206            "true"
207        } else {
208            "false"
209        };
210
211        match feedback.result {
212            Ok(latency) => {
213                self.metrics
214                    .observed_latency
215                    .with_label_values(&[feedback.display_name.as_str(), operation_str, ping_label])
216                    .observe(latency.as_secs_f64());
217                self.metrics
218                    .operation_success
219                    .with_label_values(&[feedback.display_name.as_str(), operation_str, ping_label])
220                    .inc();
221            }
222            Err(()) => {
223                self.metrics
224                    .operation_failure
225                    .with_label_values(&[feedback.display_name.as_str(), operation_str, ping_label])
226                    .inc();
227            }
228        }
229
230        let mut client_stats = self.client_stats.write();
231        client_stats.record_interaction_result(feedback);
232    }
233
234    /// Select validators based on client-observed performance for the given transaction type.
235    ///
236    /// The current committee is passed in to ensure this function has the latest committee information.
237    ///
238    /// Also the tx type is passed in so that we can select validators based on their respective latencies
239    /// for the transaction type.
240    ///
241    /// Validators with latencies within `delta` of the lowest latency in the given transaction type
242    /// are shuffled, to balance the load among the fastest validators.
243    ///
244    /// Returns a vector containing all validators, where
245    /// 1. Fast validators within `delta` of the lowest latency are shuffled.
246    /// 2. Remaining slow validators are sorted by latency in ascending order.
247    pub fn select_shuffled_preferred_validators(
248        &self,
249        committee: &Committee,
250        delta: f64,
251    ) -> Vec<AuthorityName> {
252        let mut rng = rand::thread_rng();
253
254        let cached_latencies = self.cached_latencies.read();
255        if cached_latencies.is_empty() {
256            let mut validators: Vec<_> = committee.names().cloned().collect();
257            validators.shuffle(&mut rng);
258            return validators;
259        };
260
261        // Since the cached latencies are updated periodically, it is possible that it was ran on
262        // an out-of-date committee.
263        let mut validator_with_latencies: Vec<_> = committee
264            .names()
265            .map(|v| {
266                (
267                    *v,
268                    cached_latencies.get(v).cloned().unwrap_or(Duration::ZERO),
269                )
270            })
271            .collect();
272        if validator_with_latencies.is_empty() {
273            return vec![];
274        }
275        // Shuffle the validators to balance the load among validators with the same latency.
276        validator_with_latencies.shuffle(&mut rng);
277        // Sort by latency in ascending order. We want to select the validators with the lowest latencies.
278        validator_with_latencies.sort_by_key(|(_, latency)| *latency);
279
280        // Shuffle the validators within delta of the lowest latency, for load balancing.
281        let lowest_latency = validator_with_latencies[0].1;
282        let threshold = lowest_latency.mul_f64(1.0 + delta);
283        let k = validator_with_latencies
284            .iter()
285            .enumerate()
286            .find(|(_, (_, latency))| *latency > threshold)
287            .map(|(i, _)| i)
288            .unwrap_or(validator_with_latencies.len());
289        validator_with_latencies[..k].shuffle(&mut rng);
290        self.metrics.shuffled_validators.observe(k as f64);
291
292        validator_with_latencies
293            .into_iter()
294            .map(|(v, _)| v)
295            .collect()
296    }
297
298    #[cfg(test)]
299    pub fn force_update_cached_latencies(&self, authority_agg: &AuthorityAggregator<A>) {
300        self.update_cached_latencies(authority_agg);
301    }
302
303    #[cfg(test)]
304    pub fn get_client_stats_len(&self) -> usize {
305        self.client_stats.read().validator_stats.len()
306    }
307
308    #[cfg(test)]
309    pub fn has_validator_stats(&self, validator: &AuthorityName) -> bool {
310        self.client_stats
311            .read()
312            .validator_stats
313            .contains_key(validator)
314    }
315}