sui_core/validator_client_monitor/
monitor.rs1use crate::authority_aggregator::AuthorityAggregator;
5use crate::authority_client::AuthorityAPI;
6use crate::validator_client_monitor::stats::ClientObservedStats;
7use crate::validator_client_monitor::{
8 OperationFeedback, OperationType, metrics::ValidatorClientMetrics,
9};
10use arc_swap::ArcSwap;
11use parking_lot::RwLock;
12use rand::seq::SliceRandom;
13use std::collections::HashMap;
14use std::{
15 sync::Arc,
16 time::{Duration, Instant},
17};
18use strum::IntoEnumIterator;
19use sui_config::validator_client_monitor_config::ValidatorClientMonitorConfig;
20use sui_types::committee::Committee;
21use sui_types::messages_grpc::TxType;
22use sui_types::{base_types::AuthorityName, messages_grpc::ValidatorHealthRequest};
23use tokio::{
24 task::JoinSet,
25 time::{interval, timeout},
26};
27use tracing::{debug, info, warn};
28
29pub struct ValidatorClientMonitor<A: Clone> {
41 config: ValidatorClientMonitorConfig,
42 metrics: Arc<ValidatorClientMetrics>,
43 client_stats: RwLock<ClientObservedStats>,
44 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
45 cached_latencies: RwLock<HashMap<TxType, HashMap<AuthorityName, Duration>>>,
46}
47
48impl<A> ValidatorClientMonitor<A>
49where
50 A: AuthorityAPI + Send + Sync + 'static + Clone,
51{
52 pub fn new(
53 config: ValidatorClientMonitorConfig,
54 metrics: Arc<ValidatorClientMetrics>,
55 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
56 ) -> Arc<Self> {
57 info!(
58 "Validator client monitor starting with config: {:?}",
59 config
60 );
61
62 let monitor = Arc::new(Self {
63 config: config.clone(),
64 metrics,
65 client_stats: RwLock::new(ClientObservedStats::new(config)),
66 authority_aggregator,
67 cached_latencies: RwLock::new(HashMap::new()),
68 });
69
70 let monitor_clone = monitor.clone();
71 tokio::spawn(async move {
72 monitor_clone.run_health_checks().await;
73 });
74
75 monitor
76 }
77
78 #[cfg(test)]
79 pub fn new_for_test(authority_aggregator: Arc<AuthorityAggregator<A>>) -> Arc<Self> {
80 use prometheus::Registry;
81
82 Self::new(
83 ValidatorClientMonitorConfig::default(),
84 Arc::new(ValidatorClientMetrics::new(&Registry::default())),
85 Arc::new(ArcSwap::new(authority_aggregator)),
86 )
87 }
88
89 async fn run_health_checks(self: Arc<Self>) {
95 let mut interval = interval(self.config.health_check_interval);
96
97 loop {
98 interval.tick().await;
99
100 let authority_agg = self.authority_aggregator.load();
101
102 let current_validators: Vec<_> = authority_agg.committee.names().cloned().collect();
103 self.client_stats
104 .write()
105 .retain_validators(¤t_validators);
106
107 let mut tasks = JoinSet::new();
108
109 for (name, safe_client) in authority_agg.authority_clients.iter() {
110 let name = *name;
111 let display_name = authority_agg.get_display_name(&name);
112 let client = safe_client.clone();
113 let timeout_duration = self.config.health_check_timeout;
114 let monitor = self.clone();
115
116 tasks.spawn(async move {
117 let start = Instant::now();
118 match timeout(
119 timeout_duration,
120 client.validator_health(ValidatorHealthRequest {}),
121 )
122 .await
123 {
124 Ok(Ok(_response)) => {
126 let latency = start.elapsed();
127 monitor.record_interaction_result(OperationFeedback {
128 authority_name: name,
129 display_name: display_name.clone(),
130 operation: OperationType::HealthCheck,
131 ping_type: None,
132 result: Ok(latency),
133 });
134 }
135 Ok(Err(_)) => {
136 let _latency = start.elapsed();
137 monitor.record_interaction_result(OperationFeedback {
138 authority_name: name,
139 display_name: display_name.clone(),
140 operation: OperationType::HealthCheck,
141 ping_type: None,
142 result: Err(()),
143 });
144 }
145 Err(_) => {
146 monitor.record_interaction_result(OperationFeedback {
147 authority_name: name,
148 display_name,
149 operation: OperationType::HealthCheck,
150 ping_type: None,
151 result: Err(()),
152 });
153 }
154 }
155 });
156 }
157
158 while let Some(result) = tasks.join_next().await {
159 if let Err(e) = result {
160 warn!("Health check task failed: {}", e);
161 }
162 }
163
164 self.update_cached_latencies(&authority_agg);
165 }
166 }
167}
168
169impl<A: Clone> ValidatorClientMonitor<A> {
170 fn update_cached_latencies(&self, authority_agg: &AuthorityAggregator<A>) {
176 let committee = &authority_agg.committee;
177 let mut cached_latencies = self.cached_latencies.write();
178
179 for tx_type in TxType::iter() {
180 let latencies_map = self
181 .client_stats
182 .read()
183 .get_all_validator_stats(committee, tx_type);
184
185 for (validator, latency) in latencies_map.iter() {
186 debug!(
187 "Validator {}, tx type {}: latency {}",
188 validator,
189 tx_type.as_str(),
190 latency.as_secs_f64()
191 );
192 let display_name = authority_agg.get_display_name(validator);
193 self.metrics
194 .performance
195 .with_label_values(&[&display_name, tx_type.as_str()])
196 .set(latency.as_secs_f64());
197 }
198
199 cached_latencies.insert(tx_type, latencies_map);
200 }
201 }
202
203 pub fn record_interaction_result(&self, feedback: OperationFeedback) {
211 let operation_str = match feedback.operation {
212 OperationType::Submit => "submit",
213 OperationType::Effects => "effects",
214 OperationType::HealthCheck => "health_check",
215 OperationType::FastPath => "fast_path",
216 OperationType::Consensus => "consensus",
217 };
218 let ping_label = if feedback.ping_type.is_some() {
219 "true"
220 } else {
221 "false"
222 };
223
224 match feedback.result {
225 Ok(latency) => {
226 self.metrics
227 .observed_latency
228 .with_label_values(&[&feedback.display_name, operation_str, ping_label])
229 .observe(latency.as_secs_f64());
230 self.metrics
231 .operation_success
232 .with_label_values(&[&feedback.display_name, operation_str, ping_label])
233 .inc();
234 }
235 Err(()) => {
236 self.metrics
237 .operation_failure
238 .with_label_values(&[&feedback.display_name, operation_str, ping_label])
239 .inc();
240 }
241 }
242
243 let mut client_stats = self.client_stats.write();
244 client_stats.record_interaction_result(feedback);
245 }
246
247 pub fn select_shuffled_preferred_validators(
261 &self,
262 committee: &Committee,
263 tx_type: TxType,
264 delta: f64,
265 ) -> Vec<AuthorityName> {
266 let mut rng = rand::thread_rng();
267
268 let cached_latencies = self.cached_latencies.read();
269 let Some(cached_latencies) = cached_latencies.get(&tx_type) else {
270 let mut validators: Vec<_> = committee.names().cloned().collect();
271 validators.shuffle(&mut rng);
272 return validators;
273 };
274
275 let mut validator_with_latencies: Vec<_> = committee
278 .names()
279 .map(|v| {
280 (
281 *v,
282 cached_latencies.get(v).cloned().unwrap_or(Duration::ZERO),
283 )
284 })
285 .collect();
286 if validator_with_latencies.is_empty() {
287 return vec![];
288 }
289 validator_with_latencies.shuffle(&mut rng);
291 validator_with_latencies.sort_by_key(|(_, latency)| *latency);
293
294 let lowest_latency = validator_with_latencies[0].1;
296 let threshold = lowest_latency.mul_f64(1.0 + delta);
297 let k = validator_with_latencies
298 .iter()
299 .enumerate()
300 .find(|(_, (_, latency))| *latency > threshold)
301 .map(|(i, _)| i)
302 .unwrap_or(validator_with_latencies.len());
303 validator_with_latencies[..k].shuffle(&mut rng);
304 self.metrics
305 .shuffled_validators
306 .with_label_values(&[tx_type.as_str()])
307 .observe(k as f64);
308
309 validator_with_latencies
310 .into_iter()
311 .map(|(v, _)| v)
312 .collect()
313 }
314
315 #[cfg(test)]
316 pub fn force_update_cached_latencies(&self, authority_agg: &AuthorityAggregator<A>) {
317 self.update_cached_latencies(authority_agg);
318 }
319
320 #[cfg(test)]
321 pub fn get_client_stats_len(&self) -> usize {
322 self.client_stats.read().validator_stats.len()
323 }
324
325 #[cfg(test)]
326 pub fn has_validator_stats(&self, validator: &AuthorityName) -> bool {
327 self.client_stats
328 .read()
329 .validator_stats
330 .contains_key(validator)
331 }
332}