1use crate::authority_client::AuthorityAPI;
6use crate::epoch::committee_store::CommitteeStore;
7use prometheus::core::GenericCounter;
8use prometheus::{
9 Histogram, HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry,
10 register_int_counter_vec_with_registry,
11};
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use sui_types::crypto::AuthorityPublicKeyBytes;
16use sui_types::digests::TransactionEventsDigest;
17use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI, TransactionEvents};
18use sui_types::messages_checkpoint::{
19 CertifiedCheckpointSummary, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
20};
21use sui_types::messages_grpc::{
22 ExecutedData, HandleCertificateRequestV3, HandleCertificateResponseV2,
23 HandleCertificateResponseV3, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest,
24 SubmitTxResponse, SystemStateRequest, TransactionInfoRequest, TransactionStatus,
25 ValidatorHealthRequest, ValidatorHealthResponse, VerifiedObjectInfoResponse,
26 WaitForEffectsRequest, WaitForEffectsResponse,
27};
28use sui_types::messages_safe_client::PlainTransactionInfoResponse;
29use sui_types::object::Object;
30use sui_types::sui_system_state::SuiSystemState;
31use sui_types::{base_types::*, committee::*, fp_ensure};
32use sui_types::{
33 error::{SuiError, SuiErrorKind, SuiResult},
34 transaction::*,
35};
36use tap::TapFallible;
37use tracing::{debug, error, instrument};
38
39macro_rules! check_error {
40 ($address:expr, $cond:expr, $msg:expr) => {
41 $cond.tap_err(|err| {
42 if err.individual_error_indicates_epoch_change() {
43 debug!(?err, authority=?$address, "Not a real client error");
44 } else {
45 error!(?err, authority=?$address, $msg);
46 }
47 })
48 }
49}
50
51#[derive(Clone)]
52pub struct SafeClientMetricsBase {
53 total_requests_by_address_method: IntCounterVec,
54 total_responses_by_address_method: IntCounterVec,
55 latency: HistogramVec,
56}
57
58impl SafeClientMetricsBase {
59 pub fn new(registry: &Registry) -> Self {
60 Self {
61 total_requests_by_address_method: register_int_counter_vec_with_registry!(
62 "safe_client_total_requests_by_address_method",
63 "Total requests to validators group by address and method",
64 &["address", "method"],
65 registry,
66 )
67 .unwrap(),
68 total_responses_by_address_method: register_int_counter_vec_with_registry!(
69 "safe_client_total_responses_by_address_method",
70 "Total good (OK) responses from validators group by address and method",
71 &["address", "method"],
72 registry,
73 )
74 .unwrap(),
75 latency: register_histogram_vec_with_registry!(
77 "safe_client_latency",
78 "RPC latency observed by safe client aggregator, group by method",
79 &["method"],
80 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
81 registry,
82 )
83 .unwrap(),
84 }
85 }
86}
87
88#[derive(Clone)]
90pub struct SafeClientMetrics {
91 total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
92 total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
93 total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
94 total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
95 handle_transaction_latency: Histogram,
96 handle_certificate_latency: Histogram,
97 handle_obj_info_latency: Histogram,
98 handle_tx_info_latency: Histogram,
99}
100
101impl SafeClientMetrics {
102 pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
103 let validator_address = validator_address.to_string();
104
105 let total_requests_handle_transaction_info_request = metrics_base
106 .total_requests_by_address_method
107 .with_label_values(&[&validator_address, "handle_transaction_info_request"]);
108 let total_ok_responses_handle_transaction_info_request = metrics_base
109 .total_responses_by_address_method
110 .with_label_values(&[&validator_address, "handle_transaction_info_request"]);
111
112 let total_requests_handle_object_info_request = metrics_base
113 .total_requests_by_address_method
114 .with_label_values(&[&validator_address, "handle_object_info_request"]);
115 let total_ok_responses_handle_object_info_request = metrics_base
116 .total_responses_by_address_method
117 .with_label_values(&[&validator_address, "handle_object_info_request"]);
118
119 let handle_transaction_latency = metrics_base
120 .latency
121 .with_label_values(&["handle_transaction"]);
122 let handle_certificate_latency = metrics_base
123 .latency
124 .with_label_values(&["handle_certificate"]);
125 let handle_obj_info_latency = metrics_base
126 .latency
127 .with_label_values(&["handle_object_info_request"]);
128 let handle_tx_info_latency = metrics_base
129 .latency
130 .with_label_values(&["handle_transaction_info_request"]);
131
132 Self {
133 total_requests_handle_transaction_info_request,
134 total_ok_responses_handle_transaction_info_request,
135 total_requests_handle_object_info_request,
136 total_ok_responses_handle_object_info_request,
137 handle_transaction_latency,
138 handle_certificate_latency,
139 handle_obj_info_latency,
140 handle_tx_info_latency,
141 }
142 }
143
144 pub fn new_for_tests(validator_address: AuthorityName) -> Self {
145 let registry = Registry::new();
146 let metrics_base = SafeClientMetricsBase::new(®istry);
147 Self::new(&metrics_base, validator_address)
148 }
149}
150
151#[derive(Clone)]
154pub struct SafeClient<C>
155where
156 C: Clone,
157{
158 authority_client: C,
159 committee_store: Arc<CommitteeStore>,
160 address: AuthorityPublicKeyBytes,
161 metrics: SafeClientMetrics,
162}
163
164impl<C: Clone> SafeClient<C> {
165 pub fn new(
166 authority_client: C,
167 committee_store: Arc<CommitteeStore>,
168 address: AuthorityPublicKeyBytes,
169 metrics: SafeClientMetrics,
170 ) -> Self {
171 Self {
172 authority_client,
173 committee_store,
174 address,
175 metrics,
176 }
177 }
178}
179
180impl<C: Clone> SafeClient<C> {
181 pub fn authority_client(&self) -> &C {
182 &self.authority_client
183 }
184
185 #[cfg(test)]
186 pub fn authority_client_mut(&mut self) -> &mut C {
187 &mut self.authority_client
188 }
189
190 fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Arc<Committee>> {
191 self.committee_store
192 .get_committee(epoch_id)?
193 .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(*epoch_id).into())
194 }
195
196 fn check_signed_effects_plain(
197 &self,
198 digest: &TransactionDigest,
199 signed_effects: SignedTransactionEffects,
200 expected_effects_digest: Option<&TransactionEffectsDigest>,
201 ) -> SuiResult<SignedTransactionEffects> {
202 fp_ensure!(
204 signed_effects.auth_sig().authority == self.address,
205 SuiErrorKind::ByzantineAuthoritySuspicion {
206 authority: self.address,
207 reason: format!(
208 "Unexpected validator address in the signed effects signature: {:?}",
209 signed_effects.auth_sig().authority
210 ),
211 }
212 .into()
213 );
214 fp_ensure!(
216 signed_effects.data().transaction_digest() == digest,
217 SuiErrorKind::ByzantineAuthoritySuspicion {
218 authority: self.address,
219 reason: "Unexpected tx digest in the signed effects".to_string()
220 }
221 .into()
222 );
223 if let Some(effects_digest) = expected_effects_digest {
225 fp_ensure!(
226 signed_effects.digest() == effects_digest,
227 SuiErrorKind::ByzantineAuthoritySuspicion {
228 authority: self.address,
229 reason: "Effects digest does not match with expected digest".to_string()
230 }
231 .into()
232 );
233 }
234 self.get_committee(&signed_effects.epoch())?;
235 Ok(signed_effects)
236 }
237
238 fn check_transaction_info(
239 &self,
240 digest: &TransactionDigest,
241 transaction: Transaction,
242 status: TransactionStatus,
243 ) -> SuiResult<PlainTransactionInfoResponse> {
244 fp_ensure!(
245 digest == transaction.digest(),
246 SuiErrorKind::ByzantineAuthoritySuspicion {
247 authority: self.address,
248 reason: "Signed transaction digest does not match with expected digest".to_string()
249 }
250 .into()
251 );
252 match status {
253 TransactionStatus::Signed(signed) => {
254 self.get_committee(&signed.epoch)?;
255 Ok(PlainTransactionInfoResponse::Signed(
256 SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
257 ))
258 }
259 TransactionStatus::Executed(cert_opt, effects, events) => {
260 let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
261 match cert_opt {
262 Some(cert) => {
263 let committee = self.get_committee(&cert.epoch)?;
264 let ct = CertifiedTransaction::new_from_data_and_sig(
265 transaction.into_data(),
266 cert,
267 );
268 ct.verify_committee_sigs_only(&committee).map_err(|e| {
269 SuiErrorKind::FailedToVerifyTxCertWithExecutedEffects {
270 validator_name: self.address,
271 error: e.to_string(),
272 }
273 })?;
274 Ok(PlainTransactionInfoResponse::ExecutedWithCert(
275 ct,
276 signed_effects,
277 events,
278 ))
279 }
280 None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
281 transaction,
282 signed_effects,
283 events,
284 )),
285 }
286 }
287 }
288 }
289
290 fn check_object_response(
291 &self,
292 request: &ObjectInfoRequest,
293 response: ObjectInfoResponse,
294 ) -> SuiResult<VerifiedObjectInfoResponse> {
295 let ObjectInfoResponse {
296 object,
297 layout: _,
298 lock_for_debugging: _,
299 } = response;
300
301 fp_ensure!(
302 request.object_id == object.id(),
303 SuiErrorKind::ByzantineAuthoritySuspicion {
304 authority: self.address,
305 reason: "Object id mismatch in the response".to_string()
306 }
307 .into()
308 );
309
310 Ok(VerifiedObjectInfoResponse { object })
311 }
312
313 pub fn address(&self) -> &AuthorityPublicKeyBytes {
314 &self.address
315 }
316}
317
318impl<C> SafeClient<C>
319where
320 C: AuthorityAPI + Send + Sync + Clone + 'static,
321{
322 pub async fn submit_transaction(
324 &self,
325 request: SubmitTxRequest,
326 client_addr: Option<SocketAddr>,
327 ) -> Result<SubmitTxResponse, SuiError> {
328 let _timer = self.metrics.handle_certificate_latency.start_timer();
329 self.authority_client
330 .submit_transaction(request, client_addr)
331 .await
332 }
333
334 pub async fn wait_for_effects(
337 &self,
338 request: WaitForEffectsRequest,
339 client_addr: Option<SocketAddr>,
340 ) -> Result<WaitForEffectsResponse, SuiError> {
341 let _timer = self.metrics.handle_certificate_latency.start_timer();
342 let wait_for_effects_resp = self
343 .authority_client
344 .wait_for_effects(request, client_addr)
345 .await?;
346
347 match &wait_for_effects_resp {
348 WaitForEffectsResponse::Executed {
349 effects_digest: _,
350 fast_path: _,
351 details: Some(details),
352 } => {
353 self.verify_executed_data((**details).clone())?;
354 }
355 _ => {
356 }
358 };
359
360 Ok(wait_for_effects_resp)
361 }
362
363 pub async fn handle_transaction(
365 &self,
366 transaction: Transaction,
367 client_addr: Option<SocketAddr>,
368 ) -> Result<PlainTransactionInfoResponse, SuiError> {
369 let _timer = self.metrics.handle_transaction_latency.start_timer();
370 let digest = *transaction.digest();
371 let response = self
372 .authority_client
373 .handle_transaction(transaction.clone(), client_addr)
374 .await?;
375 let response = check_error!(
376 self.address,
377 self.check_transaction_info(&digest, transaction, response.status),
378 "Client error in handle_transaction"
379 )?;
380 Ok(response)
381 }
382
383 fn verify_certificate_response_v2(
384 &self,
385 digest: &TransactionDigest,
386 response: HandleCertificateResponseV2,
387 ) -> SuiResult<HandleCertificateResponseV2> {
388 let signed_effects =
389 self.check_signed_effects_plain(digest, response.signed_effects, None)?;
390
391 Ok(HandleCertificateResponseV2 {
392 signed_effects,
393 events: response.events,
394 fastpath_input_objects: vec![], })
396 }
397
398 pub async fn handle_certificate_v2(
400 &self,
401 certificate: CertifiedTransaction,
402 client_addr: Option<SocketAddr>,
403 ) -> Result<HandleCertificateResponseV2, SuiError> {
404 let digest = *certificate.digest();
405 let _timer = self.metrics.handle_certificate_latency.start_timer();
406 let response = self
407 .authority_client
408 .handle_certificate_v2(certificate, client_addr)
409 .await?;
410
411 let verified = check_error!(
412 self.address,
413 self.verify_certificate_response_v2(&digest, response),
414 "Client error in handle_certificate"
415 )?;
416 Ok(verified)
417 }
418
419 fn verify_events(
420 &self,
421 events: &Option<TransactionEvents>,
422 events_digest: Option<&TransactionEventsDigest>,
423 ) -> SuiResult {
424 match (events, events_digest) {
425 (None, None) | (None, Some(_)) => Ok(()),
426 (Some(events), None) => {
427 if !events.data.is_empty() {
428 Err(SuiErrorKind::ByzantineAuthoritySuspicion {
429 authority: self.address,
430 reason: "Returned events but no event digest present in effects"
431 .to_string(),
432 }
433 .into())
434 } else {
435 Ok(())
436 }
437 }
438 (Some(events), Some(events_digest)) => {
439 fp_ensure!(
440 &events.digest() == events_digest,
441 SuiErrorKind::ByzantineAuthoritySuspicion {
442 authority: self.address,
443 reason: "Returned events don't match events digest in effects".to_string(),
444 }
445 .into()
446 );
447 Ok(())
448 }
449 }
450 }
451
452 fn verify_objects<I>(&self, objects: &Option<Vec<Object>>, expected_refs: I) -> SuiResult
453 where
454 I: IntoIterator<Item = (ObjectID, ObjectRef)>,
455 {
456 if let Some(objects) = objects {
457 let expected: HashMap<_, _> = expected_refs.into_iter().collect();
458
459 for object in objects {
460 let object_ref = object.compute_object_reference();
461 if expected
462 .get(&object_ref.0)
463 .is_none_or(|expect| &object_ref != expect)
464 {
465 return Err(SuiErrorKind::ByzantineAuthoritySuspicion {
466 authority: self.address,
467 reason: "Returned object that wasn't present in effects".to_string(),
468 }
469 .into());
470 }
471 }
472 }
473 Ok(())
474 }
475
476 fn verify_certificate_response_v3(
477 &self,
478 digest: &TransactionDigest,
479 HandleCertificateResponseV3 {
480 effects,
481 events,
482 input_objects,
483 output_objects,
484 auxiliary_data,
485 }: HandleCertificateResponseV3,
486 ) -> SuiResult<HandleCertificateResponseV3> {
487 let effects = self.check_signed_effects_plain(digest, effects, None)?;
488
489 self.verify_events(&events, effects.events_digest())?;
491
492 self.verify_objects(
494 &input_objects,
495 effects
496 .old_object_metadata()
497 .into_iter()
498 .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
499 )?;
500
501 self.verify_objects(
503 &output_objects,
504 effects
505 .all_changed_objects()
506 .into_iter()
507 .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
508 )?;
509
510 Ok(HandleCertificateResponseV3 {
511 effects,
512 events,
513 input_objects,
514 output_objects,
515 auxiliary_data,
516 })
517 }
518
519 fn verify_executed_data(
520 &self,
521 ExecutedData {
522 effects,
523 events,
524 input_objects,
525 output_objects,
526 }: ExecutedData,
527 ) -> SuiResult<()> {
528 self.verify_events(&events, effects.events_digest())?;
530
531 self.verify_objects(
533 &Some(input_objects).filter(|v| !v.is_empty()),
534 effects
535 .old_object_metadata()
536 .into_iter()
537 .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
538 )?;
539
540 self.verify_objects(
542 &Some(output_objects).filter(|v| !v.is_empty()),
543 effects
544 .all_changed_objects()
545 .into_iter()
546 .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
547 )?;
548
549 Ok(())
550 }
551
552 pub async fn handle_certificate_v3(
554 &self,
555 request: HandleCertificateRequestV3,
556 client_addr: Option<SocketAddr>,
557 ) -> Result<HandleCertificateResponseV3, SuiError> {
558 let digest = *request.certificate.digest();
559 let _timer = self.metrics.handle_certificate_latency.start_timer();
560 let response = self
561 .authority_client
562 .handle_certificate_v3(request, client_addr)
563 .await?;
564
565 let verified = check_error!(
566 self.address,
567 self.verify_certificate_response_v3(&digest, response),
568 "Client error in handle_certificate"
569 )?;
570 Ok(verified)
571 }
572
573 pub async fn handle_object_info_request(
574 &self,
575 request: ObjectInfoRequest,
576 ) -> Result<VerifiedObjectInfoResponse, SuiError> {
577 self.metrics.total_requests_handle_object_info_request.inc();
578
579 let _timer = self.metrics.handle_obj_info_latency.start_timer();
580 let response = self
581 .authority_client
582 .handle_object_info_request(request.clone())
583 .await?;
584 let response = self
585 .check_object_response(&request, response)
586 .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
587
588 self.metrics
589 .total_ok_responses_handle_object_info_request
590 .inc();
591 Ok(response)
592 }
593
594 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
597 pub async fn handle_transaction_info_request(
598 &self,
599 request: TransactionInfoRequest,
600 ) -> Result<PlainTransactionInfoResponse, SuiError> {
601 self.metrics
602 .total_requests_handle_transaction_info_request
603 .inc();
604
605 let _timer = self.metrics.handle_tx_info_latency.start_timer();
606
607 let transaction_info = self
608 .authority_client
609 .handle_transaction_info_request(request.clone())
610 .await?;
611
612 let transaction = Transaction::new(transaction_info.transaction);
613 let transaction_info = self.check_transaction_info(
614 &request.transaction_digest,
615 transaction,
616 transaction_info.status,
617 ).tap_err(|err| {
618 error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
619 })?;
620 self.metrics
621 .total_ok_responses_handle_transaction_info_request
622 .inc();
623 Ok(transaction_info)
624 }
625
626 fn verify_checkpoint_sequence(
627 &self,
628 expected_seq: Option<CheckpointSequenceNumber>,
629 checkpoint: &Option<CertifiedCheckpointSummary>,
630 ) -> SuiResult {
631 let observed_seq = checkpoint.as_ref().map(|c| c.sequence_number);
632
633 if let (Some(e), Some(o)) = (expected_seq, observed_seq) {
634 fp_ensure!(
635 e == o,
636 SuiError::from("Expected checkpoint number doesn't match with returned")
637 );
638 }
639 Ok(())
640 }
641
642 fn verify_contents_exist<T, O>(
643 &self,
644 request_content: bool,
645 checkpoint: &Option<T>,
646 contents: &Option<O>,
647 ) -> SuiResult {
648 match (request_content, checkpoint, contents) {
649 (true, Some(_), None) | (false, _, Some(_)) | (_, None, Some(_)) => Err(
654 SuiError::from("Checkpoint contents inconsistent with request"),
655 ),
656 _ => Ok(()),
657 }
658 }
659
660 fn verify_checkpoint_response(
661 &self,
662 request: &CheckpointRequest,
663 response: &CheckpointResponse,
664 ) -> SuiResult {
665 let CheckpointResponse {
667 checkpoint,
668 contents,
669 } = &response;
670 self.verify_checkpoint_sequence(request.sequence_number, checkpoint)?;
672 self.verify_contents_exist(request.request_content, checkpoint, contents)?;
673 match checkpoint {
675 Some(c) => {
676 let epoch_id = c.epoch;
677 c.verify_with_contents(&*self.get_committee(&epoch_id)?, contents.as_ref())
678 }
679 None => Ok(()),
680 }
681 }
682
683 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
684 pub async fn handle_checkpoint(
685 &self,
686 request: CheckpointRequest,
687 ) -> Result<CheckpointResponse, SuiError> {
688 let resp = self
689 .authority_client
690 .handle_checkpoint(request.clone())
691 .await?;
692 self.verify_checkpoint_response(&request, &resp)
693 .tap_err(|err| {
694 error!(?err, authority=?self.address, "Client error in handle_checkpoint");
695 })?;
696 Ok(resp)
697 }
698
699 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
700 pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
701 self.authority_client
702 .handle_system_state_object(SystemStateRequest { _unused: false })
703 .await
704 }
705
706 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
708 pub async fn validator_health(
709 &self,
710 request: ValidatorHealthRequest,
711 ) -> Result<ValidatorHealthResponse, SuiError> {
712 self.authority_client.validator_health(request).await
713 }
714}