1use crate::authority_client::AuthorityAPI;
6use crate::epoch::committee_store::CommitteeStore;
7use prometheus::core::GenericCounter;
8use prometheus::{
9 Histogram, HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry,
10 register_int_counter_vec_with_registry,
11};
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use sui_types::crypto::AuthorityPublicKeyBytes;
16use sui_types::digests::TransactionEventsDigest;
17use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI, TransactionEvents};
18use sui_types::messages_checkpoint::{
19 CertifiedCheckpointSummary, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
20};
21use sui_types::messages_grpc::{
22 ExecutedData, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest, SubmitTxResponse,
23 SystemStateRequest, TransactionInfoRequest, TransactionStatus, ValidatorHealthRequest,
24 ValidatorHealthResponse, VerifiedObjectInfoResponse, WaitForEffectsRequest,
25 WaitForEffectsResponse,
26};
27use sui_types::messages_safe_client::PlainTransactionInfoResponse;
28use sui_types::object::Object;
29use sui_types::sui_system_state::SuiSystemState;
30use sui_types::{base_types::*, committee::*, fp_ensure};
31use sui_types::{
32 error::{SuiError, SuiErrorKind, SuiResult},
33 transaction::*,
34};
35use tap::TapFallible;
36use tracing::{error, instrument};
37
38#[derive(Clone)]
39pub struct SafeClientMetricsBase {
40 total_requests_by_address_method: IntCounterVec,
41 total_responses_by_address_method: IntCounterVec,
42 latency: HistogramVec,
43}
44
45impl SafeClientMetricsBase {
46 pub fn new(registry: &Registry) -> Self {
47 Self {
48 total_requests_by_address_method: register_int_counter_vec_with_registry!(
49 "safe_client_total_requests_by_address_method",
50 "Total requests to validators group by address and method",
51 &["address", "method"],
52 registry,
53 )
54 .unwrap(),
55 total_responses_by_address_method: register_int_counter_vec_with_registry!(
56 "safe_client_total_responses_by_address_method",
57 "Total good (OK) responses from validators group by address and method",
58 &["address", "method"],
59 registry,
60 )
61 .unwrap(),
62 latency: register_histogram_vec_with_registry!(
64 "safe_client_latency",
65 "RPC latency observed by safe client aggregator, group by method",
66 &["method"],
67 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
68 registry,
69 )
70 .unwrap(),
71 }
72 }
73}
74
75#[derive(Clone)]
77pub struct SafeClientMetrics {
78 total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
79 total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
80 total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
81 total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
82 handle_certificate_latency: Histogram,
83 handle_obj_info_latency: Histogram,
84 handle_tx_info_latency: Histogram,
85}
86
87impl SafeClientMetrics {
88 pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
89 let validator_address = validator_address.to_string();
90
91 let total_requests_handle_transaction_info_request = metrics_base
92 .total_requests_by_address_method
93 .with_label_values(&[
94 validator_address.as_str(),
95 "handle_transaction_info_request",
96 ]);
97 let total_ok_responses_handle_transaction_info_request = metrics_base
98 .total_responses_by_address_method
99 .with_label_values(&[
100 validator_address.as_str(),
101 "handle_transaction_info_request",
102 ]);
103
104 let total_requests_handle_object_info_request = metrics_base
105 .total_requests_by_address_method
106 .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
107 let total_ok_responses_handle_object_info_request = metrics_base
108 .total_responses_by_address_method
109 .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
110
111 let handle_certificate_latency = metrics_base
112 .latency
113 .with_label_values(&["handle_certificate"]);
114 let handle_obj_info_latency = metrics_base
115 .latency
116 .with_label_values(&["handle_object_info_request"]);
117 let handle_tx_info_latency = metrics_base
118 .latency
119 .with_label_values(&["handle_transaction_info_request"]);
120
121 Self {
122 total_requests_handle_transaction_info_request,
123 total_ok_responses_handle_transaction_info_request,
124 total_requests_handle_object_info_request,
125 total_ok_responses_handle_object_info_request,
126 handle_certificate_latency,
127 handle_obj_info_latency,
128 handle_tx_info_latency,
129 }
130 }
131
132 pub fn new_for_tests(validator_address: AuthorityName) -> Self {
133 let registry = Registry::new();
134 let metrics_base = SafeClientMetricsBase::new(®istry);
135 Self::new(&metrics_base, validator_address)
136 }
137}
138
139#[derive(Clone)]
142pub struct SafeClient<C>
143where
144 C: Clone,
145{
146 authority_client: C,
147 committee_store: Arc<CommitteeStore>,
148 address: AuthorityPublicKeyBytes,
149 metrics: SafeClientMetrics,
150}
151
152impl<C: Clone> SafeClient<C> {
153 pub fn new(
154 authority_client: C,
155 committee_store: Arc<CommitteeStore>,
156 address: AuthorityPublicKeyBytes,
157 metrics: SafeClientMetrics,
158 ) -> Self {
159 Self {
160 authority_client,
161 committee_store,
162 address,
163 metrics,
164 }
165 }
166}
167
168impl<C: Clone> SafeClient<C> {
169 pub fn authority_client(&self) -> &C {
170 &self.authority_client
171 }
172
173 #[cfg(test)]
174 pub fn authority_client_mut(&mut self) -> &mut C {
175 &mut self.authority_client
176 }
177
178 fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Arc<Committee>> {
179 self.committee_store
180 .get_committee(epoch_id)?
181 .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(*epoch_id).into())
182 }
183
184 fn check_signed_effects_plain(
185 &self,
186 digest: &TransactionDigest,
187 signed_effects: SignedTransactionEffects,
188 expected_effects_digest: Option<&TransactionEffectsDigest>,
189 ) -> SuiResult<SignedTransactionEffects> {
190 fp_ensure!(
192 signed_effects.auth_sig().authority == self.address,
193 SuiErrorKind::ByzantineAuthoritySuspicion {
194 authority: self.address,
195 reason: format!(
196 "Unexpected validator address in the signed effects signature: {:?}",
197 signed_effects.auth_sig().authority
198 ),
199 }
200 .into()
201 );
202 fp_ensure!(
204 signed_effects.data().transaction_digest() == digest,
205 SuiErrorKind::ByzantineAuthoritySuspicion {
206 authority: self.address,
207 reason: "Unexpected tx digest in the signed effects".to_string()
208 }
209 .into()
210 );
211 if let Some(effects_digest) = expected_effects_digest {
213 fp_ensure!(
214 signed_effects.digest() == effects_digest,
215 SuiErrorKind::ByzantineAuthoritySuspicion {
216 authority: self.address,
217 reason: "Effects digest does not match with expected digest".to_string()
218 }
219 .into()
220 );
221 }
222 self.get_committee(&signed_effects.epoch())?;
223 Ok(signed_effects)
224 }
225
226 fn check_transaction_info(
227 &self,
228 digest: &TransactionDigest,
229 transaction: Transaction,
230 status: TransactionStatus,
231 ) -> SuiResult<PlainTransactionInfoResponse> {
232 fp_ensure!(
233 digest == transaction.digest(),
234 SuiErrorKind::ByzantineAuthoritySuspicion {
235 authority: self.address,
236 reason: "Signed transaction digest does not match with expected digest".to_string()
237 }
238 .into()
239 );
240 match status {
241 TransactionStatus::Signed(signed) => {
242 self.get_committee(&signed.epoch)?;
243 Ok(PlainTransactionInfoResponse::Signed(
244 SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
245 ))
246 }
247 TransactionStatus::Executed(_cert_opt, effects, events) => {
248 let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
251 Ok(PlainTransactionInfoResponse::Executed(
252 transaction,
253 signed_effects,
254 events,
255 ))
256 }
257 }
258 }
259
260 fn check_object_response(
261 &self,
262 request: &ObjectInfoRequest,
263 response: ObjectInfoResponse,
264 ) -> SuiResult<VerifiedObjectInfoResponse> {
265 let ObjectInfoResponse {
266 object,
267 layout: _,
268 lock_for_debugging: _,
269 } = response;
270
271 fp_ensure!(
272 request.object_id == object.id(),
273 SuiErrorKind::ByzantineAuthoritySuspicion {
274 authority: self.address,
275 reason: "Object id mismatch in the response".to_string()
276 }
277 .into()
278 );
279
280 Ok(VerifiedObjectInfoResponse { object })
281 }
282
283 pub fn address(&self) -> &AuthorityPublicKeyBytes {
284 &self.address
285 }
286}
287
288impl<C> SafeClient<C>
289where
290 C: AuthorityAPI + Send + Sync + Clone + 'static,
291{
292 pub async fn submit_transaction(
294 &self,
295 request: SubmitTxRequest,
296 client_addr: Option<SocketAddr>,
297 ) -> Result<SubmitTxResponse, SuiError> {
298 let _timer = self.metrics.handle_certificate_latency.start_timer();
299 self.authority_client
300 .submit_transaction(request, client_addr)
301 .await
302 }
303
304 pub async fn wait_for_effects(
307 &self,
308 request: WaitForEffectsRequest,
309 client_addr: Option<SocketAddr>,
310 ) -> Result<WaitForEffectsResponse, SuiError> {
311 let _timer = self.metrics.handle_certificate_latency.start_timer();
312 let wait_for_effects_resp = self
313 .authority_client
314 .wait_for_effects(request, client_addr)
315 .await?;
316
317 match &wait_for_effects_resp {
318 WaitForEffectsResponse::Executed {
319 effects_digest: _,
320 details: Some(details),
321 } => {
322 self.verify_executed_data((**details).clone())?;
323 }
324 _ => {
325 }
327 };
328
329 Ok(wait_for_effects_resp)
330 }
331
332 fn verify_events(
333 &self,
334 events: &Option<TransactionEvents>,
335 events_digest: Option<&TransactionEventsDigest>,
336 ) -> SuiResult {
337 match (events, events_digest) {
338 (None, None) | (None, Some(_)) => Ok(()),
339 (Some(events), None) => {
340 if !events.data.is_empty() {
341 Err(SuiErrorKind::ByzantineAuthoritySuspicion {
342 authority: self.address,
343 reason: "Returned events but no event digest present in effects"
344 .to_string(),
345 }
346 .into())
347 } else {
348 Ok(())
349 }
350 }
351 (Some(events), Some(events_digest)) => {
352 fp_ensure!(
353 &events.digest() == events_digest,
354 SuiErrorKind::ByzantineAuthoritySuspicion {
355 authority: self.address,
356 reason: "Returned events don't match events digest in effects".to_string(),
357 }
358 .into()
359 );
360 Ok(())
361 }
362 }
363 }
364
365 fn verify_objects<I>(&self, objects: &Option<Vec<Object>>, expected_refs: I) -> SuiResult
366 where
367 I: IntoIterator<Item = (ObjectID, ObjectRef)>,
368 {
369 if let Some(objects) = objects {
370 let expected: HashMap<_, _> = expected_refs.into_iter().collect();
371
372 for object in objects {
373 let object_ref = object.compute_object_reference();
374 if expected
375 .get(&object_ref.0)
376 .is_none_or(|expect| &object_ref != expect)
377 {
378 return Err(SuiErrorKind::ByzantineAuthoritySuspicion {
379 authority: self.address,
380 reason: "Returned object that wasn't present in effects".to_string(),
381 }
382 .into());
383 }
384 }
385 }
386 Ok(())
387 }
388
389 fn verify_executed_data(
390 &self,
391 ExecutedData {
392 effects,
393 events,
394 input_objects,
395 output_objects,
396 }: ExecutedData,
397 ) -> SuiResult<()> {
398 self.verify_events(&events, effects.events_digest())?;
400
401 self.verify_objects(
403 &Some(input_objects).filter(|v| !v.is_empty()),
404 effects
405 .old_object_metadata()
406 .into_iter()
407 .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
408 )?;
409
410 self.verify_objects(
412 &Some(output_objects).filter(|v| !v.is_empty()),
413 effects
414 .all_changed_objects()
415 .into_iter()
416 .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
417 )?;
418
419 Ok(())
420 }
421
422 pub async fn handle_object_info_request(
423 &self,
424 request: ObjectInfoRequest,
425 ) -> Result<VerifiedObjectInfoResponse, SuiError> {
426 self.metrics.total_requests_handle_object_info_request.inc();
427
428 let _timer = self.metrics.handle_obj_info_latency.start_timer();
429 let response = self
430 .authority_client
431 .handle_object_info_request(request.clone())
432 .await?;
433 let response = self
434 .check_object_response(&request, response)
435 .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
436
437 self.metrics
438 .total_ok_responses_handle_object_info_request
439 .inc();
440 Ok(response)
441 }
442
443 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
446 pub async fn handle_transaction_info_request(
447 &self,
448 request: TransactionInfoRequest,
449 ) -> Result<PlainTransactionInfoResponse, SuiError> {
450 self.metrics
451 .total_requests_handle_transaction_info_request
452 .inc();
453
454 let _timer = self.metrics.handle_tx_info_latency.start_timer();
455
456 let transaction_info = self
457 .authority_client
458 .handle_transaction_info_request(request.clone())
459 .await?;
460
461 let transaction = Transaction::new(transaction_info.transaction);
462 let transaction_info = self.check_transaction_info(
463 &request.transaction_digest,
464 transaction,
465 transaction_info.status,
466 ).tap_err(|err| {
467 error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
468 })?;
469 self.metrics
470 .total_ok_responses_handle_transaction_info_request
471 .inc();
472 Ok(transaction_info)
473 }
474
475 fn verify_checkpoint_sequence(
476 &self,
477 expected_seq: Option<CheckpointSequenceNumber>,
478 checkpoint: &Option<CertifiedCheckpointSummary>,
479 ) -> SuiResult {
480 let observed_seq = checkpoint.as_ref().map(|c| c.sequence_number);
481
482 if let (Some(e), Some(o)) = (expected_seq, observed_seq) {
483 fp_ensure!(
484 e == o,
485 SuiError::from("Expected checkpoint number doesn't match with returned")
486 );
487 }
488 Ok(())
489 }
490
491 fn verify_contents_exist<T, O>(
492 &self,
493 request_content: bool,
494 checkpoint: &Option<T>,
495 contents: &Option<O>,
496 ) -> SuiResult {
497 match (request_content, checkpoint, contents) {
498 (true, Some(_), None) | (false, _, Some(_)) | (_, None, Some(_)) => Err(
503 SuiError::from("Checkpoint contents inconsistent with request"),
504 ),
505 _ => Ok(()),
506 }
507 }
508
509 fn verify_checkpoint_response(
510 &self,
511 request: &CheckpointRequest,
512 response: &CheckpointResponse,
513 ) -> SuiResult {
514 let CheckpointResponse {
516 checkpoint,
517 contents,
518 } = &response;
519 self.verify_checkpoint_sequence(request.sequence_number, checkpoint)?;
521 self.verify_contents_exist(request.request_content, checkpoint, contents)?;
522 match checkpoint {
524 Some(c) => {
525 let epoch_id = c.epoch;
526 c.verify_with_contents(&*self.get_committee(&epoch_id)?, contents.as_ref())
527 }
528 None => Ok(()),
529 }
530 }
531
532 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
533 pub async fn handle_checkpoint(
534 &self,
535 request: CheckpointRequest,
536 ) -> Result<CheckpointResponse, SuiError> {
537 let resp = self
538 .authority_client
539 .handle_checkpoint(request.clone())
540 .await?;
541 self.verify_checkpoint_response(&request, &resp)
542 .tap_err(|err| {
543 error!(?err, authority=?self.address, "Client error in handle_checkpoint");
544 })?;
545 Ok(resp)
546 }
547
548 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
549 pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
550 self.authority_client
551 .handle_system_state_object(SystemStateRequest { _unused: false })
552 .await
553 }
554
555 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
557 pub async fn validator_health(
558 &self,
559 request: ValidatorHealthRequest,
560 ) -> Result<ValidatorHealthResponse, SuiError> {
561 self.authority_client.validator_health(request).await
562 }
563}