sui_core/
safe_client.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::authority_client::AuthorityAPI;
6use crate::epoch::committee_store::CommitteeStore;
7use prometheus::core::GenericCounter;
8use prometheus::{
9    Histogram, HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry,
10    register_int_counter_vec_with_registry,
11};
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use sui_types::crypto::AuthorityPublicKeyBytes;
16use sui_types::digests::TransactionEventsDigest;
17use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI, TransactionEvents};
18use sui_types::messages_checkpoint::{
19    CertifiedCheckpointSummary, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
20};
21use sui_types::messages_grpc::{
22    ExecutedData, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest, SubmitTxResponse,
23    SystemStateRequest, TransactionInfoRequest, TransactionStatus, ValidatorHealthRequest,
24    ValidatorHealthResponse, VerifiedObjectInfoResponse, WaitForEffectsRequest,
25    WaitForEffectsResponse,
26};
27use sui_types::messages_safe_client::PlainTransactionInfoResponse;
28use sui_types::object::Object;
29use sui_types::sui_system_state::SuiSystemState;
30use sui_types::{base_types::*, committee::*, fp_ensure};
31use sui_types::{
32    error::{SuiError, SuiErrorKind, SuiResult},
33    transaction::*,
34};
35use tap::TapFallible;
36use tracing::{error, instrument};
37
38#[derive(Clone)]
39pub struct SafeClientMetricsBase {
40    total_requests_by_address_method: IntCounterVec,
41    total_responses_by_address_method: IntCounterVec,
42    latency: HistogramVec,
43}
44
45impl SafeClientMetricsBase {
46    pub fn new(registry: &Registry) -> Self {
47        Self {
48            total_requests_by_address_method: register_int_counter_vec_with_registry!(
49                "safe_client_total_requests_by_address_method",
50                "Total requests to validators group by address and method",
51                &["address", "method"],
52                registry,
53            )
54            .unwrap(),
55            total_responses_by_address_method: register_int_counter_vec_with_registry!(
56                "safe_client_total_responses_by_address_method",
57                "Total good (OK) responses from validators group by address and method",
58                &["address", "method"],
59                registry,
60            )
61            .unwrap(),
62            // Address label is removed to reduce high cardinality, can be added back if needed
63            latency: register_histogram_vec_with_registry!(
64                "safe_client_latency",
65                "RPC latency observed by safe client aggregator, group by method",
66                &["method"],
67                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
68                registry,
69            )
70            .unwrap(),
71        }
72    }
73}
74
75/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
76#[derive(Clone)]
77pub struct SafeClientMetrics {
78    total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
79    total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
80    total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
81    total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
82    handle_certificate_latency: Histogram,
83    handle_obj_info_latency: Histogram,
84    handle_tx_info_latency: Histogram,
85}
86
87impl SafeClientMetrics {
88    pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
89        let validator_address = validator_address.to_string();
90
91        let total_requests_handle_transaction_info_request = metrics_base
92            .total_requests_by_address_method
93            .with_label_values(&[
94                validator_address.as_str(),
95                "handle_transaction_info_request",
96            ]);
97        let total_ok_responses_handle_transaction_info_request = metrics_base
98            .total_responses_by_address_method
99            .with_label_values(&[
100                validator_address.as_str(),
101                "handle_transaction_info_request",
102            ]);
103
104        let total_requests_handle_object_info_request = metrics_base
105            .total_requests_by_address_method
106            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
107        let total_ok_responses_handle_object_info_request = metrics_base
108            .total_responses_by_address_method
109            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
110
111        let handle_certificate_latency = metrics_base
112            .latency
113            .with_label_values(&["handle_certificate"]);
114        let handle_obj_info_latency = metrics_base
115            .latency
116            .with_label_values(&["handle_object_info_request"]);
117        let handle_tx_info_latency = metrics_base
118            .latency
119            .with_label_values(&["handle_transaction_info_request"]);
120
121        Self {
122            total_requests_handle_transaction_info_request,
123            total_ok_responses_handle_transaction_info_request,
124            total_requests_handle_object_info_request,
125            total_ok_responses_handle_object_info_request,
126            handle_certificate_latency,
127            handle_obj_info_latency,
128            handle_tx_info_latency,
129        }
130    }
131
132    pub fn new_for_tests(validator_address: AuthorityName) -> Self {
133        let registry = Registry::new();
134        let metrics_base = SafeClientMetricsBase::new(&registry);
135        Self::new(&metrics_base, validator_address)
136    }
137}
138
139/// See `SafeClientMetrics::new` for description of each metrics.
140/// The metrics are per validator client.
141#[derive(Clone)]
142pub struct SafeClient<C>
143where
144    C: Clone,
145{
146    authority_client: C,
147    committee_store: Arc<CommitteeStore>,
148    address: AuthorityPublicKeyBytes,
149    metrics: SafeClientMetrics,
150}
151
152impl<C: Clone> SafeClient<C> {
153    pub fn new(
154        authority_client: C,
155        committee_store: Arc<CommitteeStore>,
156        address: AuthorityPublicKeyBytes,
157        metrics: SafeClientMetrics,
158    ) -> Self {
159        Self {
160            authority_client,
161            committee_store,
162            address,
163            metrics,
164        }
165    }
166}
167
168impl<C: Clone> SafeClient<C> {
169    pub fn authority_client(&self) -> &C {
170        &self.authority_client
171    }
172
173    #[cfg(test)]
174    pub fn authority_client_mut(&mut self) -> &mut C {
175        &mut self.authority_client
176    }
177
178    fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Arc<Committee>> {
179        self.committee_store
180            .get_committee(epoch_id)?
181            .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(*epoch_id).into())
182    }
183
184    fn check_signed_effects_plain(
185        &self,
186        digest: &TransactionDigest,
187        signed_effects: SignedTransactionEffects,
188        expected_effects_digest: Option<&TransactionEffectsDigest>,
189    ) -> SuiResult<SignedTransactionEffects> {
190        // Check it has the right signer
191        fp_ensure!(
192            signed_effects.auth_sig().authority == self.address,
193            SuiErrorKind::ByzantineAuthoritySuspicion {
194                authority: self.address,
195                reason: format!(
196                    "Unexpected validator address in the signed effects signature: {:?}",
197                    signed_effects.auth_sig().authority
198                ),
199            }
200            .into()
201        );
202        // Checks it concerns the right tx
203        fp_ensure!(
204            signed_effects.data().transaction_digest() == digest,
205            SuiErrorKind::ByzantineAuthoritySuspicion {
206                authority: self.address,
207                reason: "Unexpected tx digest in the signed effects".to_string()
208            }
209            .into()
210        );
211        // check that the effects digest is correct.
212        if let Some(effects_digest) = expected_effects_digest {
213            fp_ensure!(
214                signed_effects.digest() == effects_digest,
215                SuiErrorKind::ByzantineAuthoritySuspicion {
216                    authority: self.address,
217                    reason: "Effects digest does not match with expected digest".to_string()
218                }
219                .into()
220            );
221        }
222        self.get_committee(&signed_effects.epoch())?;
223        Ok(signed_effects)
224    }
225
226    fn check_transaction_info(
227        &self,
228        digest: &TransactionDigest,
229        transaction: Transaction,
230        status: TransactionStatus,
231    ) -> SuiResult<PlainTransactionInfoResponse> {
232        fp_ensure!(
233            digest == transaction.digest(),
234            SuiErrorKind::ByzantineAuthoritySuspicion {
235                authority: self.address,
236                reason: "Signed transaction digest does not match with expected digest".to_string()
237            }
238            .into()
239        );
240        match status {
241            TransactionStatus::Signed(signed) => {
242                self.get_committee(&signed.epoch)?;
243                Ok(PlainTransactionInfoResponse::Signed(
244                    SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
245                ))
246            }
247            TransactionStatus::Executed(cert_opt, effects, events) => {
248                let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
249                match cert_opt {
250                    Some(cert) => {
251                        let committee = self.get_committee(&cert.epoch)?;
252                        let ct = CertifiedTransaction::new_from_data_and_sig(
253                            transaction.into_data(),
254                            cert,
255                        );
256                        ct.verify_committee_sigs_only(&committee).map_err(|e| {
257                            SuiErrorKind::FailedToVerifyTxCertWithExecutedEffects {
258                                validator_name: self.address,
259                                error: e.to_string(),
260                            }
261                        })?;
262                        Ok(PlainTransactionInfoResponse::ExecutedWithCert(
263                            ct,
264                            signed_effects,
265                            events,
266                        ))
267                    }
268                    None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
269                        transaction,
270                        signed_effects,
271                        events,
272                    )),
273                }
274            }
275        }
276    }
277
278    fn check_object_response(
279        &self,
280        request: &ObjectInfoRequest,
281        response: ObjectInfoResponse,
282    ) -> SuiResult<VerifiedObjectInfoResponse> {
283        let ObjectInfoResponse {
284            object,
285            layout: _,
286            lock_for_debugging: _,
287        } = response;
288
289        fp_ensure!(
290            request.object_id == object.id(),
291            SuiErrorKind::ByzantineAuthoritySuspicion {
292                authority: self.address,
293                reason: "Object id mismatch in the response".to_string()
294            }
295            .into()
296        );
297
298        Ok(VerifiedObjectInfoResponse { object })
299    }
300
301    pub fn address(&self) -> &AuthorityPublicKeyBytes {
302        &self.address
303    }
304}
305
306impl<C> SafeClient<C>
307where
308    C: AuthorityAPI + Send + Sync + Clone + 'static,
309{
310    /// Submit a transaction for certification and execution.
311    pub async fn submit_transaction(
312        &self,
313        request: SubmitTxRequest,
314        client_addr: Option<SocketAddr>,
315    ) -> Result<SubmitTxResponse, SuiError> {
316        let _timer = self.metrics.handle_certificate_latency.start_timer();
317        self.authority_client
318            .submit_transaction(request, client_addr)
319            .await
320    }
321
322    /// Wait for effects of a transaction that has been submitted to the network
323    /// through the `submit_transaction` API.
324    pub async fn wait_for_effects(
325        &self,
326        request: WaitForEffectsRequest,
327        client_addr: Option<SocketAddr>,
328    ) -> Result<WaitForEffectsResponse, SuiError> {
329        let _timer = self.metrics.handle_certificate_latency.start_timer();
330        let wait_for_effects_resp = self
331            .authority_client
332            .wait_for_effects(request, client_addr)
333            .await?;
334
335        match &wait_for_effects_resp {
336            WaitForEffectsResponse::Executed {
337                effects_digest: _,
338                fast_path: _,
339                details: Some(details),
340            } => {
341                self.verify_executed_data((**details).clone())?;
342            }
343            _ => {
344                // No additional verification needed for other response types
345            }
346        };
347
348        Ok(wait_for_effects_resp)
349    }
350
351    fn verify_events(
352        &self,
353        events: &Option<TransactionEvents>,
354        events_digest: Option<&TransactionEventsDigest>,
355    ) -> SuiResult {
356        match (events, events_digest) {
357            (None, None) | (None, Some(_)) => Ok(()),
358            (Some(events), None) => {
359                if !events.data.is_empty() {
360                    Err(SuiErrorKind::ByzantineAuthoritySuspicion {
361                        authority: self.address,
362                        reason: "Returned events but no event digest present in effects"
363                            .to_string(),
364                    }
365                    .into())
366                } else {
367                    Ok(())
368                }
369            }
370            (Some(events), Some(events_digest)) => {
371                fp_ensure!(
372                    &events.digest() == events_digest,
373                    SuiErrorKind::ByzantineAuthoritySuspicion {
374                        authority: self.address,
375                        reason: "Returned events don't match events digest in effects".to_string(),
376                    }
377                    .into()
378                );
379                Ok(())
380            }
381        }
382    }
383
384    fn verify_objects<I>(&self, objects: &Option<Vec<Object>>, expected_refs: I) -> SuiResult
385    where
386        I: IntoIterator<Item = (ObjectID, ObjectRef)>,
387    {
388        if let Some(objects) = objects {
389            let expected: HashMap<_, _> = expected_refs.into_iter().collect();
390
391            for object in objects {
392                let object_ref = object.compute_object_reference();
393                if expected
394                    .get(&object_ref.0)
395                    .is_none_or(|expect| &object_ref != expect)
396                {
397                    return Err(SuiErrorKind::ByzantineAuthoritySuspicion {
398                        authority: self.address,
399                        reason: "Returned object that wasn't present in effects".to_string(),
400                    }
401                    .into());
402                }
403            }
404        }
405        Ok(())
406    }
407
408    fn verify_executed_data(
409        &self,
410        ExecutedData {
411            effects,
412            events,
413            input_objects,
414            output_objects,
415        }: ExecutedData,
416    ) -> SuiResult<()> {
417        // Check Events
418        self.verify_events(&events, effects.events_digest())?;
419
420        // Check Input Objects
421        self.verify_objects(
422            &Some(input_objects).filter(|v| !v.is_empty()),
423            effects
424                .old_object_metadata()
425                .into_iter()
426                .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
427        )?;
428
429        // Check Output Objects
430        self.verify_objects(
431            &Some(output_objects).filter(|v| !v.is_empty()),
432            effects
433                .all_changed_objects()
434                .into_iter()
435                .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
436        )?;
437
438        Ok(())
439    }
440
441    pub async fn handle_object_info_request(
442        &self,
443        request: ObjectInfoRequest,
444    ) -> Result<VerifiedObjectInfoResponse, SuiError> {
445        self.metrics.total_requests_handle_object_info_request.inc();
446
447        let _timer = self.metrics.handle_obj_info_latency.start_timer();
448        let response = self
449            .authority_client
450            .handle_object_info_request(request.clone())
451            .await?;
452        let response = self
453            .check_object_response(&request, response)
454            .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
455
456        self.metrics
457            .total_ok_responses_handle_object_info_request
458            .inc();
459        Ok(response)
460    }
461
462    /// Handle Transaction information requests for a given digest.
463    /// Only used for testing.
464    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
465    pub async fn handle_transaction_info_request(
466        &self,
467        request: TransactionInfoRequest,
468    ) -> Result<PlainTransactionInfoResponse, SuiError> {
469        self.metrics
470            .total_requests_handle_transaction_info_request
471            .inc();
472
473        let _timer = self.metrics.handle_tx_info_latency.start_timer();
474
475        let transaction_info = self
476            .authority_client
477            .handle_transaction_info_request(request.clone())
478            .await?;
479
480        let transaction = Transaction::new(transaction_info.transaction);
481        let transaction_info = self.check_transaction_info(
482            &request.transaction_digest,
483            transaction,
484            transaction_info.status,
485        ).tap_err(|err| {
486            error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
487        })?;
488        self.metrics
489            .total_ok_responses_handle_transaction_info_request
490            .inc();
491        Ok(transaction_info)
492    }
493
494    fn verify_checkpoint_sequence(
495        &self,
496        expected_seq: Option<CheckpointSequenceNumber>,
497        checkpoint: &Option<CertifiedCheckpointSummary>,
498    ) -> SuiResult {
499        let observed_seq = checkpoint.as_ref().map(|c| c.sequence_number);
500
501        if let (Some(e), Some(o)) = (expected_seq, observed_seq) {
502            fp_ensure!(
503                e == o,
504                SuiError::from("Expected checkpoint number doesn't match with returned")
505            );
506        }
507        Ok(())
508    }
509
510    fn verify_contents_exist<T, O>(
511        &self,
512        request_content: bool,
513        checkpoint: &Option<T>,
514        contents: &Option<O>,
515    ) -> SuiResult {
516        match (request_content, checkpoint, contents) {
517            // If content is requested, checkpoint is not None, but we are not getting any content,
518            // it's an error.
519            // If content is not requested, or checkpoint is None, yet we are still getting content,
520            // it's an error.
521            (true, Some(_), None) | (false, _, Some(_)) | (_, None, Some(_)) => Err(
522                SuiError::from("Checkpoint contents inconsistent with request"),
523            ),
524            _ => Ok(()),
525        }
526    }
527
528    fn verify_checkpoint_response(
529        &self,
530        request: &CheckpointRequest,
531        response: &CheckpointResponse,
532    ) -> SuiResult {
533        // Verify response data was correct for request
534        let CheckpointResponse {
535            checkpoint,
536            contents,
537        } = &response;
538        // Checks that the sequence number is correct.
539        self.verify_checkpoint_sequence(request.sequence_number, checkpoint)?;
540        self.verify_contents_exist(request.request_content, checkpoint, contents)?;
541        // Verify signature.
542        match checkpoint {
543            Some(c) => {
544                let epoch_id = c.epoch;
545                c.verify_with_contents(&*self.get_committee(&epoch_id)?, contents.as_ref())
546            }
547            None => Ok(()),
548        }
549    }
550
551    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
552    pub async fn handle_checkpoint(
553        &self,
554        request: CheckpointRequest,
555    ) -> Result<CheckpointResponse, SuiError> {
556        let resp = self
557            .authority_client
558            .handle_checkpoint(request.clone())
559            .await?;
560        self.verify_checkpoint_response(&request, &resp)
561            .tap_err(|err| {
562                error!(?err, authority=?self.address, "Client error in handle_checkpoint");
563            })?;
564        Ok(resp)
565    }
566
567    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
568    pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
569        self.authority_client
570            .handle_system_state_object(SystemStateRequest { _unused: false })
571            .await
572    }
573
574    /// Handle validator health check requests (for latency measurement)
575    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
576    pub async fn validator_health(
577        &self,
578        request: ValidatorHealthRequest,
579    ) -> Result<ValidatorHealthResponse, SuiError> {
580        self.authority_client.validator_health(request).await
581    }
582}