sui_core/validator_client_monitor/
monitor.rs1use crate::authority_aggregator::AuthorityAggregator;
5use crate::authority_client::AuthorityAPI;
6use crate::validator_client_monitor::stats::ClientObservedStats;
7use crate::validator_client_monitor::{
8 OperationFeedback, OperationType, metrics::ValidatorClientMetrics,
9};
10use arc_swap::ArcSwap;
11use parking_lot::RwLock;
12use rand::seq::SliceRandom;
13use std::collections::HashMap;
14use std::{
15 sync::Arc,
16 time::{Duration, Instant},
17};
18use sui_config::validator_client_monitor_config::ValidatorClientMonitorConfig;
19use sui_types::committee::Committee;
20use sui_types::{base_types::AuthorityName, messages_grpc::ValidatorHealthRequest};
21use tokio::{
22 task::JoinSet,
23 time::{interval, timeout},
24};
25use tracing::{debug, info, warn};
26
27pub struct ValidatorClientMonitor<A: Clone> {
39 config: ValidatorClientMonitorConfig,
40 metrics: Arc<ValidatorClientMetrics>,
41 client_stats: RwLock<ClientObservedStats>,
42 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
43 cached_latencies: RwLock<HashMap<AuthorityName, Duration>>,
44}
45
46impl<A> ValidatorClientMonitor<A>
47where
48 A: AuthorityAPI + Send + Sync + 'static + Clone,
49{
50 pub fn new(
51 config: ValidatorClientMonitorConfig,
52 metrics: Arc<ValidatorClientMetrics>,
53 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
54 ) -> Arc<Self> {
55 info!(
56 "Validator client monitor starting with config: {:?}",
57 config
58 );
59
60 let monitor = Arc::new(Self {
61 config: config.clone(),
62 metrics,
63 client_stats: RwLock::new(ClientObservedStats::new(config)),
64 authority_aggregator,
65 cached_latencies: RwLock::new(HashMap::new()),
66 });
67
68 let monitor_clone = monitor.clone();
69 tokio::spawn(async move {
70 monitor_clone.run_health_checks().await;
71 });
72
73 monitor
74 }
75
76 #[cfg(test)]
77 pub fn new_for_test(authority_aggregator: Arc<AuthorityAggregator<A>>) -> Arc<Self> {
78 use prometheus::Registry;
79
80 Self::new(
81 ValidatorClientMonitorConfig::default(),
82 Arc::new(ValidatorClientMetrics::new(&Registry::default())),
83 Arc::new(ArcSwap::new(authority_aggregator)),
84 )
85 }
86
87 async fn run_health_checks(self: Arc<Self>) {
93 let mut interval = interval(self.config.health_check_interval);
94
95 loop {
96 interval.tick().await;
97
98 let authority_agg = self.authority_aggregator.load();
99
100 let current_validators: Vec<_> = authority_agg.committee.names().cloned().collect();
101 self.client_stats
102 .write()
103 .retain_validators(¤t_validators);
104
105 let mut tasks = JoinSet::new();
106
107 for (name, safe_client) in authority_agg.authority_clients.iter() {
108 let name = *name;
109 let display_name = authority_agg.get_display_name(&name);
110 let client = safe_client.clone();
111 let timeout_duration = self.config.health_check_timeout;
112 let monitor = self.clone();
113
114 tasks.spawn(async move {
115 let start = Instant::now();
116 match timeout(
117 timeout_duration,
118 client.validator_health(ValidatorHealthRequest {}),
119 )
120 .await
121 {
122 Ok(Ok(_response)) => {
124 let latency = start.elapsed();
125 monitor.record_interaction_result(OperationFeedback {
126 authority_name: name,
127 display_name: display_name.clone(),
128 operation: OperationType::HealthCheck,
129 ping_type: None,
130 result: Ok(latency),
131 });
132 }
133 Ok(Err(_)) => {
134 let _latency = start.elapsed();
135 monitor.record_interaction_result(OperationFeedback {
136 authority_name: name,
137 display_name: display_name.clone(),
138 operation: OperationType::HealthCheck,
139 ping_type: None,
140 result: Err(()),
141 });
142 }
143 Err(_) => {
144 monitor.record_interaction_result(OperationFeedback {
145 authority_name: name,
146 display_name,
147 operation: OperationType::HealthCheck,
148 ping_type: None,
149 result: Err(()),
150 });
151 }
152 }
153 });
154 }
155
156 while let Some(result) = tasks.join_next().await {
157 if let Err(e) = result {
158 warn!("Health check task failed: {}", e);
159 }
160 }
161
162 self.update_cached_latencies(&authority_agg);
163 }
164 }
165}
166
167impl<A: Clone> ValidatorClientMonitor<A> {
168 fn update_cached_latencies(&self, authority_agg: &AuthorityAggregator<A>) {
174 let committee = &authority_agg.committee;
175 let mut cached_latencies = self.cached_latencies.write();
176
177 let latencies_map = self.client_stats.read().get_all_validator_stats(committee);
178
179 for (validator, latency) in latencies_map.iter() {
180 debug!("Validator {}, latency {}", validator, latency.as_secs_f64());
181 let display_name = authority_agg.get_display_name(validator);
182 self.metrics
183 .performance
184 .with_label_values(&[display_name.as_str()])
185 .set(latency.as_secs_f64());
186 cached_latencies.insert(*validator, *latency);
187 }
188 }
189
190 pub fn record_interaction_result(&self, feedback: OperationFeedback) {
198 let operation_str = match feedback.operation {
199 OperationType::Submit => "submit",
200 OperationType::Effects => "effects",
201 OperationType::HealthCheck => "health_check",
202 OperationType::FastPath => "fast_path",
203 OperationType::Consensus => "consensus",
204 };
205 let ping_label = if feedback.ping_type.is_some() {
206 "true"
207 } else {
208 "false"
209 };
210
211 match feedback.result {
212 Ok(latency) => {
213 self.metrics
214 .observed_latency
215 .with_label_values(&[feedback.display_name.as_str(), operation_str, ping_label])
216 .observe(latency.as_secs_f64());
217 self.metrics
218 .operation_success
219 .with_label_values(&[feedback.display_name.as_str(), operation_str, ping_label])
220 .inc();
221 }
222 Err(()) => {
223 self.metrics
224 .operation_failure
225 .with_label_values(&[feedback.display_name.as_str(), operation_str, ping_label])
226 .inc();
227 }
228 }
229
230 let mut client_stats = self.client_stats.write();
231 client_stats.record_interaction_result(feedback);
232 }
233
234 pub fn select_shuffled_preferred_validators(
248 &self,
249 committee: &Committee,
250 delta: f64,
251 ) -> Vec<AuthorityName> {
252 let mut rng = rand::thread_rng();
253
254 let cached_latencies = self.cached_latencies.read();
255 if cached_latencies.is_empty() {
256 let mut validators: Vec<_> = committee.names().cloned().collect();
257 validators.shuffle(&mut rng);
258 return validators;
259 };
260
261 let mut validator_with_latencies: Vec<_> = committee
264 .names()
265 .map(|v| {
266 (
267 *v,
268 cached_latencies.get(v).cloned().unwrap_or(Duration::ZERO),
269 )
270 })
271 .collect();
272 if validator_with_latencies.is_empty() {
273 return vec![];
274 }
275 validator_with_latencies.shuffle(&mut rng);
277 validator_with_latencies.sort_by_key(|(_, latency)| *latency);
279
280 let lowest_latency = validator_with_latencies[0].1;
282 let threshold = lowest_latency.mul_f64(1.0 + delta);
283 let k = validator_with_latencies
284 .iter()
285 .enumerate()
286 .find(|(_, (_, latency))| *latency > threshold)
287 .map(|(i, _)| i)
288 .unwrap_or(validator_with_latencies.len());
289 validator_with_latencies[..k].shuffle(&mut rng);
290 self.metrics.shuffled_validators.observe(k as f64);
291
292 validator_with_latencies
293 .into_iter()
294 .map(|(v, _)| v)
295 .collect()
296 }
297
298 #[cfg(test)]
299 pub fn force_update_cached_latencies(&self, authority_agg: &AuthorityAggregator<A>) {
300 self.update_cached_latencies(authority_agg);
301 }
302
303 #[cfg(test)]
304 pub fn get_client_stats_len(&self) -> usize {
305 self.client_stats.read().validator_stats.len()
306 }
307
308 #[cfg(test)]
309 pub fn has_validator_stats(&self, validator: &AuthorityName) -> bool {
310 self.client_stats
311 .read()
312 .validator_stats
313 .contains_key(validator)
314 }
315}