sui_core/
safe_client.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::authority_client::AuthorityAPI;
6use crate::epoch::committee_store::CommitteeStore;
7use prometheus::core::GenericCounter;
8use prometheus::{
9    Histogram, HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry,
10    register_int_counter_vec_with_registry,
11};
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use sui_types::crypto::AuthorityPublicKeyBytes;
16use sui_types::digests::TransactionEventsDigest;
17use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI, TransactionEvents};
18use sui_types::messages_checkpoint::{
19    CertifiedCheckpointSummary, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
20};
21use sui_types::messages_grpc::{
22    ExecutedData, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest, SubmitTxResponse,
23    SystemStateRequest, TransactionInfoRequest, TransactionStatus, ValidatorHealthRequest,
24    ValidatorHealthResponse, VerifiedObjectInfoResponse, WaitForEffectsRequest,
25    WaitForEffectsResponse,
26};
27use sui_types::messages_safe_client::PlainTransactionInfoResponse;
28use sui_types::object::Object;
29use sui_types::sui_system_state::SuiSystemState;
30use sui_types::{base_types::*, committee::*, fp_ensure};
31use sui_types::{
32    error::{SuiError, SuiErrorKind, SuiResult},
33    transaction::*,
34};
35use tap::TapFallible;
36use tracing::{error, instrument};
37
38#[derive(Clone)]
39pub struct SafeClientMetricsBase {
40    total_requests_by_address_method: IntCounterVec,
41    total_responses_by_address_method: IntCounterVec,
42    latency: HistogramVec,
43}
44
45impl SafeClientMetricsBase {
46    pub fn new(registry: &Registry) -> Self {
47        Self {
48            total_requests_by_address_method: register_int_counter_vec_with_registry!(
49                "safe_client_total_requests_by_address_method",
50                "Total requests to validators group by address and method",
51                &["address", "method"],
52                registry,
53            )
54            .unwrap(),
55            total_responses_by_address_method: register_int_counter_vec_with_registry!(
56                "safe_client_total_responses_by_address_method",
57                "Total good (OK) responses from validators group by address and method",
58                &["address", "method"],
59                registry,
60            )
61            .unwrap(),
62            // Address label is removed to reduce high cardinality, can be added back if needed
63            latency: register_histogram_vec_with_registry!(
64                "safe_client_latency",
65                "RPC latency observed by safe client aggregator, group by method",
66                &["method"],
67                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
68                registry,
69            )
70            .unwrap(),
71        }
72    }
73}
74
75/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
76#[derive(Clone)]
77pub struct SafeClientMetrics {
78    total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
79    total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
80    total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
81    total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
82    handle_certificate_latency: Histogram,
83    handle_obj_info_latency: Histogram,
84    handle_tx_info_latency: Histogram,
85}
86
87impl SafeClientMetrics {
88    pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
89        let validator_address = validator_address.to_string();
90
91        let total_requests_handle_transaction_info_request = metrics_base
92            .total_requests_by_address_method
93            .with_label_values(&[
94                validator_address.as_str(),
95                "handle_transaction_info_request",
96            ]);
97        let total_ok_responses_handle_transaction_info_request = metrics_base
98            .total_responses_by_address_method
99            .with_label_values(&[
100                validator_address.as_str(),
101                "handle_transaction_info_request",
102            ]);
103
104        let total_requests_handle_object_info_request = metrics_base
105            .total_requests_by_address_method
106            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
107        let total_ok_responses_handle_object_info_request = metrics_base
108            .total_responses_by_address_method
109            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
110
111        let handle_certificate_latency = metrics_base
112            .latency
113            .with_label_values(&["handle_certificate"]);
114        let handle_obj_info_latency = metrics_base
115            .latency
116            .with_label_values(&["handle_object_info_request"]);
117        let handle_tx_info_latency = metrics_base
118            .latency
119            .with_label_values(&["handle_transaction_info_request"]);
120
121        Self {
122            total_requests_handle_transaction_info_request,
123            total_ok_responses_handle_transaction_info_request,
124            total_requests_handle_object_info_request,
125            total_ok_responses_handle_object_info_request,
126            handle_certificate_latency,
127            handle_obj_info_latency,
128            handle_tx_info_latency,
129        }
130    }
131
132    pub fn new_for_tests(validator_address: AuthorityName) -> Self {
133        let registry = Registry::new();
134        let metrics_base = SafeClientMetricsBase::new(&registry);
135        Self::new(&metrics_base, validator_address)
136    }
137}
138
139/// See `SafeClientMetrics::new` for description of each metrics.
140/// The metrics are per validator client.
141#[derive(Clone)]
142pub struct SafeClient<C>
143where
144    C: Clone,
145{
146    authority_client: C,
147    committee_store: Arc<CommitteeStore>,
148    address: AuthorityPublicKeyBytes,
149    metrics: SafeClientMetrics,
150}
151
152impl<C: Clone> SafeClient<C> {
153    pub fn new(
154        authority_client: C,
155        committee_store: Arc<CommitteeStore>,
156        address: AuthorityPublicKeyBytes,
157        metrics: SafeClientMetrics,
158    ) -> Self {
159        Self {
160            authority_client,
161            committee_store,
162            address,
163            metrics,
164        }
165    }
166}
167
168impl<C: Clone> SafeClient<C> {
169    pub fn authority_client(&self) -> &C {
170        &self.authority_client
171    }
172
173    #[cfg(test)]
174    pub fn authority_client_mut(&mut self) -> &mut C {
175        &mut self.authority_client
176    }
177
178    fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Arc<Committee>> {
179        self.committee_store
180            .get_committee(epoch_id)?
181            .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(*epoch_id).into())
182    }
183
184    fn check_signed_effects_plain(
185        &self,
186        digest: &TransactionDigest,
187        signed_effects: SignedTransactionEffects,
188        expected_effects_digest: Option<&TransactionEffectsDigest>,
189    ) -> SuiResult<SignedTransactionEffects> {
190        // Check it has the right signer
191        fp_ensure!(
192            signed_effects.auth_sig().authority == self.address,
193            SuiErrorKind::ByzantineAuthoritySuspicion {
194                authority: self.address,
195                reason: format!(
196                    "Unexpected validator address in the signed effects signature: {:?}",
197                    signed_effects.auth_sig().authority
198                ),
199            }
200            .into()
201        );
202        // Checks it concerns the right tx
203        fp_ensure!(
204            signed_effects.data().transaction_digest() == digest,
205            SuiErrorKind::ByzantineAuthoritySuspicion {
206                authority: self.address,
207                reason: "Unexpected tx digest in the signed effects".to_string()
208            }
209            .into()
210        );
211        // check that the effects digest is correct.
212        if let Some(effects_digest) = expected_effects_digest {
213            fp_ensure!(
214                signed_effects.digest() == effects_digest,
215                SuiErrorKind::ByzantineAuthoritySuspicion {
216                    authority: self.address,
217                    reason: "Effects digest does not match with expected digest".to_string()
218                }
219                .into()
220            );
221        }
222        self.get_committee(&signed_effects.epoch())?;
223        Ok(signed_effects)
224    }
225
226    fn check_transaction_info(
227        &self,
228        digest: &TransactionDigest,
229        transaction: Transaction,
230        status: TransactionStatus,
231    ) -> SuiResult<PlainTransactionInfoResponse> {
232        fp_ensure!(
233            digest == transaction.digest(),
234            SuiErrorKind::ByzantineAuthoritySuspicion {
235                authority: self.address,
236                reason: "Signed transaction digest does not match with expected digest".to_string()
237            }
238            .into()
239        );
240        match status {
241            TransactionStatus::Signed(signed) => {
242                self.get_committee(&signed.epoch)?;
243                Ok(PlainTransactionInfoResponse::Signed(
244                    SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
245                ))
246            }
247            TransactionStatus::Executed(cert_opt, effects, events) => {
248                let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
249                match cert_opt {
250                    Some(cert) => {
251                        let committee = self.get_committee(&cert.epoch)?;
252                        let ct = CertifiedTransaction::new_from_data_and_sig(
253                            transaction.into_data(),
254                            cert,
255                        );
256                        ct.verify_committee_sigs_only(&committee).map_err(|e| {
257                            SuiErrorKind::FailedToVerifyTxCertWithExecutedEffects {
258                                validator_name: self.address,
259                                error: e.to_string(),
260                            }
261                        })?;
262                        Ok(PlainTransactionInfoResponse::ExecutedWithCert(
263                            ct,
264                            signed_effects,
265                            events,
266                        ))
267                    }
268                    None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
269                        transaction,
270                        signed_effects,
271                        events,
272                    )),
273                }
274            }
275        }
276    }
277
278    fn check_object_response(
279        &self,
280        request: &ObjectInfoRequest,
281        response: ObjectInfoResponse,
282    ) -> SuiResult<VerifiedObjectInfoResponse> {
283        let ObjectInfoResponse {
284            object,
285            layout: _,
286            lock_for_debugging: _,
287        } = response;
288
289        fp_ensure!(
290            request.object_id == object.id(),
291            SuiErrorKind::ByzantineAuthoritySuspicion {
292                authority: self.address,
293                reason: "Object id mismatch in the response".to_string()
294            }
295            .into()
296        );
297
298        Ok(VerifiedObjectInfoResponse { object })
299    }
300
301    pub fn address(&self) -> &AuthorityPublicKeyBytes {
302        &self.address
303    }
304}
305
306impl<C> SafeClient<C>
307where
308    C: AuthorityAPI + Send + Sync + Clone + 'static,
309{
310    /// Submit a transaction for certification and execution.
311    pub async fn submit_transaction(
312        &self,
313        request: SubmitTxRequest,
314        client_addr: Option<SocketAddr>,
315    ) -> Result<SubmitTxResponse, SuiError> {
316        let _timer = self.metrics.handle_certificate_latency.start_timer();
317        self.authority_client
318            .submit_transaction(request, client_addr)
319            .await
320    }
321
322    /// Wait for effects of a transaction that has been submitted to the network
323    /// through the `submit_transaction` API.
324    pub async fn wait_for_effects(
325        &self,
326        request: WaitForEffectsRequest,
327        client_addr: Option<SocketAddr>,
328    ) -> Result<WaitForEffectsResponse, SuiError> {
329        let _timer = self.metrics.handle_certificate_latency.start_timer();
330        let wait_for_effects_resp = self
331            .authority_client
332            .wait_for_effects(request, client_addr)
333            .await?;
334
335        match &wait_for_effects_resp {
336            WaitForEffectsResponse::Executed {
337                effects_digest: _,
338                details: Some(details),
339            } => {
340                self.verify_executed_data((**details).clone())?;
341            }
342            _ => {
343                // No additional verification needed for other response types
344            }
345        };
346
347        Ok(wait_for_effects_resp)
348    }
349
350    fn verify_events(
351        &self,
352        events: &Option<TransactionEvents>,
353        events_digest: Option<&TransactionEventsDigest>,
354    ) -> SuiResult {
355        match (events, events_digest) {
356            (None, None) | (None, Some(_)) => Ok(()),
357            (Some(events), None) => {
358                if !events.data.is_empty() {
359                    Err(SuiErrorKind::ByzantineAuthoritySuspicion {
360                        authority: self.address,
361                        reason: "Returned events but no event digest present in effects"
362                            .to_string(),
363                    }
364                    .into())
365                } else {
366                    Ok(())
367                }
368            }
369            (Some(events), Some(events_digest)) => {
370                fp_ensure!(
371                    &events.digest() == events_digest,
372                    SuiErrorKind::ByzantineAuthoritySuspicion {
373                        authority: self.address,
374                        reason: "Returned events don't match events digest in effects".to_string(),
375                    }
376                    .into()
377                );
378                Ok(())
379            }
380        }
381    }
382
383    fn verify_objects<I>(&self, objects: &Option<Vec<Object>>, expected_refs: I) -> SuiResult
384    where
385        I: IntoIterator<Item = (ObjectID, ObjectRef)>,
386    {
387        if let Some(objects) = objects {
388            let expected: HashMap<_, _> = expected_refs.into_iter().collect();
389
390            for object in objects {
391                let object_ref = object.compute_object_reference();
392                if expected
393                    .get(&object_ref.0)
394                    .is_none_or(|expect| &object_ref != expect)
395                {
396                    return Err(SuiErrorKind::ByzantineAuthoritySuspicion {
397                        authority: self.address,
398                        reason: "Returned object that wasn't present in effects".to_string(),
399                    }
400                    .into());
401                }
402            }
403        }
404        Ok(())
405    }
406
407    fn verify_executed_data(
408        &self,
409        ExecutedData {
410            effects,
411            events,
412            input_objects,
413            output_objects,
414        }: ExecutedData,
415    ) -> SuiResult<()> {
416        // Check Events
417        self.verify_events(&events, effects.events_digest())?;
418
419        // Check Input Objects
420        self.verify_objects(
421            &Some(input_objects).filter(|v| !v.is_empty()),
422            effects
423                .old_object_metadata()
424                .into_iter()
425                .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
426        )?;
427
428        // Check Output Objects
429        self.verify_objects(
430            &Some(output_objects).filter(|v| !v.is_empty()),
431            effects
432                .all_changed_objects()
433                .into_iter()
434                .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
435        )?;
436
437        Ok(())
438    }
439
440    pub async fn handle_object_info_request(
441        &self,
442        request: ObjectInfoRequest,
443    ) -> Result<VerifiedObjectInfoResponse, SuiError> {
444        self.metrics.total_requests_handle_object_info_request.inc();
445
446        let _timer = self.metrics.handle_obj_info_latency.start_timer();
447        let response = self
448            .authority_client
449            .handle_object_info_request(request.clone())
450            .await?;
451        let response = self
452            .check_object_response(&request, response)
453            .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
454
455        self.metrics
456            .total_ok_responses_handle_object_info_request
457            .inc();
458        Ok(response)
459    }
460
461    /// Handle Transaction information requests for a given digest.
462    /// Only used for testing.
463    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
464    pub async fn handle_transaction_info_request(
465        &self,
466        request: TransactionInfoRequest,
467    ) -> Result<PlainTransactionInfoResponse, SuiError> {
468        self.metrics
469            .total_requests_handle_transaction_info_request
470            .inc();
471
472        let _timer = self.metrics.handle_tx_info_latency.start_timer();
473
474        let transaction_info = self
475            .authority_client
476            .handle_transaction_info_request(request.clone())
477            .await?;
478
479        let transaction = Transaction::new(transaction_info.transaction);
480        let transaction_info = self.check_transaction_info(
481            &request.transaction_digest,
482            transaction,
483            transaction_info.status,
484        ).tap_err(|err| {
485            error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
486        })?;
487        self.metrics
488            .total_ok_responses_handle_transaction_info_request
489            .inc();
490        Ok(transaction_info)
491    }
492
493    fn verify_checkpoint_sequence(
494        &self,
495        expected_seq: Option<CheckpointSequenceNumber>,
496        checkpoint: &Option<CertifiedCheckpointSummary>,
497    ) -> SuiResult {
498        let observed_seq = checkpoint.as_ref().map(|c| c.sequence_number);
499
500        if let (Some(e), Some(o)) = (expected_seq, observed_seq) {
501            fp_ensure!(
502                e == o,
503                SuiError::from("Expected checkpoint number doesn't match with returned")
504            );
505        }
506        Ok(())
507    }
508
509    fn verify_contents_exist<T, O>(
510        &self,
511        request_content: bool,
512        checkpoint: &Option<T>,
513        contents: &Option<O>,
514    ) -> SuiResult {
515        match (request_content, checkpoint, contents) {
516            // If content is requested, checkpoint is not None, but we are not getting any content,
517            // it's an error.
518            // If content is not requested, or checkpoint is None, yet we are still getting content,
519            // it's an error.
520            (true, Some(_), None) | (false, _, Some(_)) | (_, None, Some(_)) => Err(
521                SuiError::from("Checkpoint contents inconsistent with request"),
522            ),
523            _ => Ok(()),
524        }
525    }
526
527    fn verify_checkpoint_response(
528        &self,
529        request: &CheckpointRequest,
530        response: &CheckpointResponse,
531    ) -> SuiResult {
532        // Verify response data was correct for request
533        let CheckpointResponse {
534            checkpoint,
535            contents,
536        } = &response;
537        // Checks that the sequence number is correct.
538        self.verify_checkpoint_sequence(request.sequence_number, checkpoint)?;
539        self.verify_contents_exist(request.request_content, checkpoint, contents)?;
540        // Verify signature.
541        match checkpoint {
542            Some(c) => {
543                let epoch_id = c.epoch;
544                c.verify_with_contents(&*self.get_committee(&epoch_id)?, contents.as_ref())
545            }
546            None => Ok(()),
547        }
548    }
549
550    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
551    pub async fn handle_checkpoint(
552        &self,
553        request: CheckpointRequest,
554    ) -> Result<CheckpointResponse, SuiError> {
555        let resp = self
556            .authority_client
557            .handle_checkpoint(request.clone())
558            .await?;
559        self.verify_checkpoint_response(&request, &resp)
560            .tap_err(|err| {
561                error!(?err, authority=?self.address, "Client error in handle_checkpoint");
562            })?;
563        Ok(resp)
564    }
565
566    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
567    pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
568        self.authority_client
569            .handle_system_state_object(SystemStateRequest { _unused: false })
570            .await
571    }
572
573    /// Handle validator health check requests (for latency measurement)
574    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
575    pub async fn validator_health(
576        &self,
577        request: ValidatorHealthRequest,
578    ) -> Result<ValidatorHealthResponse, SuiError> {
579        self.authority_client.validator_health(request).await
580    }
581}