sui_core/
safe_client.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::authority_client::AuthorityAPI;
6use crate::epoch::committee_store::CommitteeStore;
7use prometheus::core::GenericCounter;
8use prometheus::{
9    Histogram, HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry,
10    register_int_counter_vec_with_registry,
11};
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use sui_types::crypto::AuthorityPublicKeyBytes;
16use sui_types::digests::TransactionEventsDigest;
17use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI, TransactionEvents};
18use sui_types::messages_checkpoint::{
19    CertifiedCheckpointSummary, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
20};
21use sui_types::messages_grpc::{
22    ExecutedData, HandleCertificateRequestV3, HandleCertificateResponseV2,
23    HandleCertificateResponseV3, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest,
24    SubmitTxResponse, SystemStateRequest, TransactionInfoRequest, TransactionStatus,
25    ValidatorHealthRequest, ValidatorHealthResponse, VerifiedObjectInfoResponse,
26    WaitForEffectsRequest, WaitForEffectsResponse,
27};
28use sui_types::messages_safe_client::PlainTransactionInfoResponse;
29use sui_types::object::Object;
30use sui_types::sui_system_state::SuiSystemState;
31use sui_types::{base_types::*, committee::*, fp_ensure};
32use sui_types::{
33    error::{SuiError, SuiErrorKind, SuiResult},
34    transaction::*,
35};
36use tap::TapFallible;
37use tracing::{debug, error, instrument};
38
39macro_rules! check_error {
40    ($address:expr, $cond:expr, $msg:expr) => {
41        $cond.tap_err(|err| {
42            if err.individual_error_indicates_epoch_change() {
43                debug!(?err, authority=?$address, "Not a real client error");
44            } else {
45                error!(?err, authority=?$address, $msg);
46            }
47        })
48    }
49}
50
51#[derive(Clone)]
52pub struct SafeClientMetricsBase {
53    total_requests_by_address_method: IntCounterVec,
54    total_responses_by_address_method: IntCounterVec,
55    latency: HistogramVec,
56}
57
58impl SafeClientMetricsBase {
59    pub fn new(registry: &Registry) -> Self {
60        Self {
61            total_requests_by_address_method: register_int_counter_vec_with_registry!(
62                "safe_client_total_requests_by_address_method",
63                "Total requests to validators group by address and method",
64                &["address", "method"],
65                registry,
66            )
67            .unwrap(),
68            total_responses_by_address_method: register_int_counter_vec_with_registry!(
69                "safe_client_total_responses_by_address_method",
70                "Total good (OK) responses from validators group by address and method",
71                &["address", "method"],
72                registry,
73            )
74            .unwrap(),
75            // Address label is removed to reduce high cardinality, can be added back if needed
76            latency: register_histogram_vec_with_registry!(
77                "safe_client_latency",
78                "RPC latency observed by safe client aggregator, group by method",
79                &["method"],
80                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
81                registry,
82            )
83            .unwrap(),
84        }
85    }
86}
87
88/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
89#[derive(Clone)]
90pub struct SafeClientMetrics {
91    total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
92    total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
93    total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
94    total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
95    handle_transaction_latency: Histogram,
96    handle_certificate_latency: Histogram,
97    handle_obj_info_latency: Histogram,
98    handle_tx_info_latency: Histogram,
99}
100
101impl SafeClientMetrics {
102    pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
103        let validator_address = validator_address.to_string();
104
105        let total_requests_handle_transaction_info_request = metrics_base
106            .total_requests_by_address_method
107            .with_label_values(&[&validator_address, "handle_transaction_info_request"]);
108        let total_ok_responses_handle_transaction_info_request = metrics_base
109            .total_responses_by_address_method
110            .with_label_values(&[&validator_address, "handle_transaction_info_request"]);
111
112        let total_requests_handle_object_info_request = metrics_base
113            .total_requests_by_address_method
114            .with_label_values(&[&validator_address, "handle_object_info_request"]);
115        let total_ok_responses_handle_object_info_request = metrics_base
116            .total_responses_by_address_method
117            .with_label_values(&[&validator_address, "handle_object_info_request"]);
118
119        let handle_transaction_latency = metrics_base
120            .latency
121            .with_label_values(&["handle_transaction"]);
122        let handle_certificate_latency = metrics_base
123            .latency
124            .with_label_values(&["handle_certificate"]);
125        let handle_obj_info_latency = metrics_base
126            .latency
127            .with_label_values(&["handle_object_info_request"]);
128        let handle_tx_info_latency = metrics_base
129            .latency
130            .with_label_values(&["handle_transaction_info_request"]);
131
132        Self {
133            total_requests_handle_transaction_info_request,
134            total_ok_responses_handle_transaction_info_request,
135            total_requests_handle_object_info_request,
136            total_ok_responses_handle_object_info_request,
137            handle_transaction_latency,
138            handle_certificate_latency,
139            handle_obj_info_latency,
140            handle_tx_info_latency,
141        }
142    }
143
144    pub fn new_for_tests(validator_address: AuthorityName) -> Self {
145        let registry = Registry::new();
146        let metrics_base = SafeClientMetricsBase::new(&registry);
147        Self::new(&metrics_base, validator_address)
148    }
149}
150
151/// See `SafeClientMetrics::new` for description of each metrics.
152/// The metrics are per validator client.
153#[derive(Clone)]
154pub struct SafeClient<C>
155where
156    C: Clone,
157{
158    authority_client: C,
159    committee_store: Arc<CommitteeStore>,
160    address: AuthorityPublicKeyBytes,
161    metrics: SafeClientMetrics,
162}
163
164impl<C: Clone> SafeClient<C> {
165    pub fn new(
166        authority_client: C,
167        committee_store: Arc<CommitteeStore>,
168        address: AuthorityPublicKeyBytes,
169        metrics: SafeClientMetrics,
170    ) -> Self {
171        Self {
172            authority_client,
173            committee_store,
174            address,
175            metrics,
176        }
177    }
178}
179
180impl<C: Clone> SafeClient<C> {
181    pub fn authority_client(&self) -> &C {
182        &self.authority_client
183    }
184
185    #[cfg(test)]
186    pub fn authority_client_mut(&mut self) -> &mut C {
187        &mut self.authority_client
188    }
189
190    fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Arc<Committee>> {
191        self.committee_store
192            .get_committee(epoch_id)?
193            .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(*epoch_id).into())
194    }
195
196    fn check_signed_effects_plain(
197        &self,
198        digest: &TransactionDigest,
199        signed_effects: SignedTransactionEffects,
200        expected_effects_digest: Option<&TransactionEffectsDigest>,
201    ) -> SuiResult<SignedTransactionEffects> {
202        // Check it has the right signer
203        fp_ensure!(
204            signed_effects.auth_sig().authority == self.address,
205            SuiErrorKind::ByzantineAuthoritySuspicion {
206                authority: self.address,
207                reason: format!(
208                    "Unexpected validator address in the signed effects signature: {:?}",
209                    signed_effects.auth_sig().authority
210                ),
211            }
212            .into()
213        );
214        // Checks it concerns the right tx
215        fp_ensure!(
216            signed_effects.data().transaction_digest() == digest,
217            SuiErrorKind::ByzantineAuthoritySuspicion {
218                authority: self.address,
219                reason: "Unexpected tx digest in the signed effects".to_string()
220            }
221            .into()
222        );
223        // check that the effects digest is correct.
224        if let Some(effects_digest) = expected_effects_digest {
225            fp_ensure!(
226                signed_effects.digest() == effects_digest,
227                SuiErrorKind::ByzantineAuthoritySuspicion {
228                    authority: self.address,
229                    reason: "Effects digest does not match with expected digest".to_string()
230                }
231                .into()
232            );
233        }
234        self.get_committee(&signed_effects.epoch())?;
235        Ok(signed_effects)
236    }
237
238    fn check_transaction_info(
239        &self,
240        digest: &TransactionDigest,
241        transaction: Transaction,
242        status: TransactionStatus,
243    ) -> SuiResult<PlainTransactionInfoResponse> {
244        fp_ensure!(
245            digest == transaction.digest(),
246            SuiErrorKind::ByzantineAuthoritySuspicion {
247                authority: self.address,
248                reason: "Signed transaction digest does not match with expected digest".to_string()
249            }
250            .into()
251        );
252        match status {
253            TransactionStatus::Signed(signed) => {
254                self.get_committee(&signed.epoch)?;
255                Ok(PlainTransactionInfoResponse::Signed(
256                    SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
257                ))
258            }
259            TransactionStatus::Executed(cert_opt, effects, events) => {
260                let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
261                match cert_opt {
262                    Some(cert) => {
263                        let committee = self.get_committee(&cert.epoch)?;
264                        let ct = CertifiedTransaction::new_from_data_and_sig(
265                            transaction.into_data(),
266                            cert,
267                        );
268                        ct.verify_committee_sigs_only(&committee).map_err(|e| {
269                            SuiErrorKind::FailedToVerifyTxCertWithExecutedEffects {
270                                validator_name: self.address,
271                                error: e.to_string(),
272                            }
273                        })?;
274                        Ok(PlainTransactionInfoResponse::ExecutedWithCert(
275                            ct,
276                            signed_effects,
277                            events,
278                        ))
279                    }
280                    None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
281                        transaction,
282                        signed_effects,
283                        events,
284                    )),
285                }
286            }
287        }
288    }
289
290    fn check_object_response(
291        &self,
292        request: &ObjectInfoRequest,
293        response: ObjectInfoResponse,
294    ) -> SuiResult<VerifiedObjectInfoResponse> {
295        let ObjectInfoResponse {
296            object,
297            layout: _,
298            lock_for_debugging: _,
299        } = response;
300
301        fp_ensure!(
302            request.object_id == object.id(),
303            SuiErrorKind::ByzantineAuthoritySuspicion {
304                authority: self.address,
305                reason: "Object id mismatch in the response".to_string()
306            }
307            .into()
308        );
309
310        Ok(VerifiedObjectInfoResponse { object })
311    }
312
313    pub fn address(&self) -> &AuthorityPublicKeyBytes {
314        &self.address
315    }
316}
317
318impl<C> SafeClient<C>
319where
320    C: AuthorityAPI + Send + Sync + Clone + 'static,
321{
322    /// Submit a transaction for certification and execution.
323    pub async fn submit_transaction(
324        &self,
325        request: SubmitTxRequest,
326        client_addr: Option<SocketAddr>,
327    ) -> Result<SubmitTxResponse, SuiError> {
328        let _timer = self.metrics.handle_certificate_latency.start_timer();
329        self.authority_client
330            .submit_transaction(request, client_addr)
331            .await
332    }
333
334    /// Wait for effects of a transaction that has been submitted to the network
335    /// through the `submit_transaction` API.
336    pub async fn wait_for_effects(
337        &self,
338        request: WaitForEffectsRequest,
339        client_addr: Option<SocketAddr>,
340    ) -> Result<WaitForEffectsResponse, SuiError> {
341        let _timer = self.metrics.handle_certificate_latency.start_timer();
342        let wait_for_effects_resp = self
343            .authority_client
344            .wait_for_effects(request, client_addr)
345            .await?;
346
347        match &wait_for_effects_resp {
348            WaitForEffectsResponse::Executed {
349                effects_digest: _,
350                fast_path: _,
351                details: Some(details),
352            } => {
353                self.verify_executed_data((**details).clone())?;
354            }
355            _ => {
356                // No additional verification needed for other response types
357            }
358        };
359
360        Ok(wait_for_effects_resp)
361    }
362
363    /// Initiate a new transfer to a Sui or Primary account.
364    pub async fn handle_transaction(
365        &self,
366        transaction: Transaction,
367        client_addr: Option<SocketAddr>,
368    ) -> Result<PlainTransactionInfoResponse, SuiError> {
369        let _timer = self.metrics.handle_transaction_latency.start_timer();
370        let digest = *transaction.digest();
371        let response = self
372            .authority_client
373            .handle_transaction(transaction.clone(), client_addr)
374            .await?;
375        let response = check_error!(
376            self.address,
377            self.check_transaction_info(&digest, transaction, response.status),
378            "Client error in handle_transaction"
379        )?;
380        Ok(response)
381    }
382
383    fn verify_certificate_response_v2(
384        &self,
385        digest: &TransactionDigest,
386        response: HandleCertificateResponseV2,
387    ) -> SuiResult<HandleCertificateResponseV2> {
388        let signed_effects =
389            self.check_signed_effects_plain(digest, response.signed_effects, None)?;
390
391        Ok(HandleCertificateResponseV2 {
392            signed_effects,
393            events: response.events,
394            fastpath_input_objects: vec![], // unused field
395        })
396    }
397
398    /// Execute a certificate.
399    pub async fn handle_certificate_v2(
400        &self,
401        certificate: CertifiedTransaction,
402        client_addr: Option<SocketAddr>,
403    ) -> Result<HandleCertificateResponseV2, SuiError> {
404        let digest = *certificate.digest();
405        let _timer = self.metrics.handle_certificate_latency.start_timer();
406        let response = self
407            .authority_client
408            .handle_certificate_v2(certificate, client_addr)
409            .await?;
410
411        let verified = check_error!(
412            self.address,
413            self.verify_certificate_response_v2(&digest, response),
414            "Client error in handle_certificate"
415        )?;
416        Ok(verified)
417    }
418
419    fn verify_events(
420        &self,
421        events: &Option<TransactionEvents>,
422        events_digest: Option<&TransactionEventsDigest>,
423    ) -> SuiResult {
424        match (events, events_digest) {
425            (None, None) | (None, Some(_)) => Ok(()),
426            (Some(events), None) => {
427                if !events.data.is_empty() {
428                    Err(SuiErrorKind::ByzantineAuthoritySuspicion {
429                        authority: self.address,
430                        reason: "Returned events but no event digest present in effects"
431                            .to_string(),
432                    }
433                    .into())
434                } else {
435                    Ok(())
436                }
437            }
438            (Some(events), Some(events_digest)) => {
439                fp_ensure!(
440                    &events.digest() == events_digest,
441                    SuiErrorKind::ByzantineAuthoritySuspicion {
442                        authority: self.address,
443                        reason: "Returned events don't match events digest in effects".to_string(),
444                    }
445                    .into()
446                );
447                Ok(())
448            }
449        }
450    }
451
452    fn verify_objects<I>(&self, objects: &Option<Vec<Object>>, expected_refs: I) -> SuiResult
453    where
454        I: IntoIterator<Item = (ObjectID, ObjectRef)>,
455    {
456        if let Some(objects) = objects {
457            let expected: HashMap<_, _> = expected_refs.into_iter().collect();
458
459            for object in objects {
460                let object_ref = object.compute_object_reference();
461                if expected
462                    .get(&object_ref.0)
463                    .is_none_or(|expect| &object_ref != expect)
464                {
465                    return Err(SuiErrorKind::ByzantineAuthoritySuspicion {
466                        authority: self.address,
467                        reason: "Returned object that wasn't present in effects".to_string(),
468                    }
469                    .into());
470                }
471            }
472        }
473        Ok(())
474    }
475
476    fn verify_certificate_response_v3(
477        &self,
478        digest: &TransactionDigest,
479        HandleCertificateResponseV3 {
480            effects,
481            events,
482            input_objects,
483            output_objects,
484            auxiliary_data,
485        }: HandleCertificateResponseV3,
486    ) -> SuiResult<HandleCertificateResponseV3> {
487        let effects = self.check_signed_effects_plain(digest, effects, None)?;
488
489        // Check Events
490        self.verify_events(&events, effects.events_digest())?;
491
492        // Check Input Objects
493        self.verify_objects(
494            &input_objects,
495            effects
496                .old_object_metadata()
497                .into_iter()
498                .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
499        )?;
500
501        // Check Output Objects
502        self.verify_objects(
503            &output_objects,
504            effects
505                .all_changed_objects()
506                .into_iter()
507                .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
508        )?;
509
510        Ok(HandleCertificateResponseV3 {
511            effects,
512            events,
513            input_objects,
514            output_objects,
515            auxiliary_data,
516        })
517    }
518
519    fn verify_executed_data(
520        &self,
521        ExecutedData {
522            effects,
523            events,
524            input_objects,
525            output_objects,
526        }: ExecutedData,
527    ) -> SuiResult<()> {
528        // Check Events
529        self.verify_events(&events, effects.events_digest())?;
530
531        // Check Input Objects
532        self.verify_objects(
533            &Some(input_objects).filter(|v| !v.is_empty()),
534            effects
535                .old_object_metadata()
536                .into_iter()
537                .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
538        )?;
539
540        // Check Output Objects
541        self.verify_objects(
542            &Some(output_objects).filter(|v| !v.is_empty()),
543            effects
544                .all_changed_objects()
545                .into_iter()
546                .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
547        )?;
548
549        Ok(())
550    }
551
552    /// Execute a certificate.
553    pub async fn handle_certificate_v3(
554        &self,
555        request: HandleCertificateRequestV3,
556        client_addr: Option<SocketAddr>,
557    ) -> Result<HandleCertificateResponseV3, SuiError> {
558        let digest = *request.certificate.digest();
559        let _timer = self.metrics.handle_certificate_latency.start_timer();
560        let response = self
561            .authority_client
562            .handle_certificate_v3(request, client_addr)
563            .await?;
564
565        let verified = check_error!(
566            self.address,
567            self.verify_certificate_response_v3(&digest, response),
568            "Client error in handle_certificate"
569        )?;
570        Ok(verified)
571    }
572
573    pub async fn handle_object_info_request(
574        &self,
575        request: ObjectInfoRequest,
576    ) -> Result<VerifiedObjectInfoResponse, SuiError> {
577        self.metrics.total_requests_handle_object_info_request.inc();
578
579        let _timer = self.metrics.handle_obj_info_latency.start_timer();
580        let response = self
581            .authority_client
582            .handle_object_info_request(request.clone())
583            .await?;
584        let response = self
585            .check_object_response(&request, response)
586            .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
587
588        self.metrics
589            .total_ok_responses_handle_object_info_request
590            .inc();
591        Ok(response)
592    }
593
594    /// Handle Transaction information requests for a given digest.
595    /// Only used for testing.
596    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
597    pub async fn handle_transaction_info_request(
598        &self,
599        request: TransactionInfoRequest,
600    ) -> Result<PlainTransactionInfoResponse, SuiError> {
601        self.metrics
602            .total_requests_handle_transaction_info_request
603            .inc();
604
605        let _timer = self.metrics.handle_tx_info_latency.start_timer();
606
607        let transaction_info = self
608            .authority_client
609            .handle_transaction_info_request(request.clone())
610            .await?;
611
612        let transaction = Transaction::new(transaction_info.transaction);
613        let transaction_info = self.check_transaction_info(
614            &request.transaction_digest,
615            transaction,
616            transaction_info.status,
617        ).tap_err(|err| {
618            error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
619        })?;
620        self.metrics
621            .total_ok_responses_handle_transaction_info_request
622            .inc();
623        Ok(transaction_info)
624    }
625
626    fn verify_checkpoint_sequence(
627        &self,
628        expected_seq: Option<CheckpointSequenceNumber>,
629        checkpoint: &Option<CertifiedCheckpointSummary>,
630    ) -> SuiResult {
631        let observed_seq = checkpoint.as_ref().map(|c| c.sequence_number);
632
633        if let (Some(e), Some(o)) = (expected_seq, observed_seq) {
634            fp_ensure!(
635                e == o,
636                SuiError::from("Expected checkpoint number doesn't match with returned")
637            );
638        }
639        Ok(())
640    }
641
642    fn verify_contents_exist<T, O>(
643        &self,
644        request_content: bool,
645        checkpoint: &Option<T>,
646        contents: &Option<O>,
647    ) -> SuiResult {
648        match (request_content, checkpoint, contents) {
649            // If content is requested, checkpoint is not None, but we are not getting any content,
650            // it's an error.
651            // If content is not requested, or checkpoint is None, yet we are still getting content,
652            // it's an error.
653            (true, Some(_), None) | (false, _, Some(_)) | (_, None, Some(_)) => Err(
654                SuiError::from("Checkpoint contents inconsistent with request"),
655            ),
656            _ => Ok(()),
657        }
658    }
659
660    fn verify_checkpoint_response(
661        &self,
662        request: &CheckpointRequest,
663        response: &CheckpointResponse,
664    ) -> SuiResult {
665        // Verify response data was correct for request
666        let CheckpointResponse {
667            checkpoint,
668            contents,
669        } = &response;
670        // Checks that the sequence number is correct.
671        self.verify_checkpoint_sequence(request.sequence_number, checkpoint)?;
672        self.verify_contents_exist(request.request_content, checkpoint, contents)?;
673        // Verify signature.
674        match checkpoint {
675            Some(c) => {
676                let epoch_id = c.epoch;
677                c.verify_with_contents(&*self.get_committee(&epoch_id)?, contents.as_ref())
678            }
679            None => Ok(()),
680        }
681    }
682
683    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
684    pub async fn handle_checkpoint(
685        &self,
686        request: CheckpointRequest,
687    ) -> Result<CheckpointResponse, SuiError> {
688        let resp = self
689            .authority_client
690            .handle_checkpoint(request.clone())
691            .await?;
692        self.verify_checkpoint_response(&request, &resp)
693            .tap_err(|err| {
694                error!(?err, authority=?self.address, "Client error in handle_checkpoint");
695            })?;
696        Ok(resp)
697    }
698
699    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
700    pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
701        self.authority_client
702            .handle_system_state_object(SystemStateRequest { _unused: false })
703            .await
704    }
705
706    /// Handle validator health check requests (for latency measurement)
707    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
708    pub async fn validator_health(
709        &self,
710        request: ValidatorHealthRequest,
711    ) -> Result<ValidatorHealthResponse, SuiError> {
712        self.authority_client.validator_health(request).await
713    }
714}