sui_core/
safe_client.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::authority_client::AuthorityAPI;
6use crate::epoch::committee_store::CommitteeStore;
7use prometheus::core::GenericCounter;
8use prometheus::{
9    Histogram, HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry,
10    register_int_counter_vec_with_registry,
11};
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use sui_types::crypto::AuthorityPublicKeyBytes;
16use sui_types::digests::TransactionEventsDigest;
17use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI, TransactionEvents};
18use sui_types::messages_checkpoint::{
19    CertifiedCheckpointSummary, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
20};
21use sui_types::messages_grpc::{
22    ExecutedData, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest, SubmitTxResponse,
23    SystemStateRequest, TransactionInfoRequest, TransactionStatus, ValidatorHealthRequest,
24    ValidatorHealthResponse, VerifiedObjectInfoResponse, WaitForEffectsRequest,
25    WaitForEffectsResponse,
26};
27use sui_types::messages_safe_client::PlainTransactionInfoResponse;
28use sui_types::object::Object;
29use sui_types::sui_system_state::SuiSystemState;
30use sui_types::{base_types::*, committee::*, fp_ensure};
31use sui_types::{
32    error::{SuiError, SuiErrorKind, SuiResult},
33    transaction::*,
34};
35use tap::TapFallible;
36use tracing::{error, instrument};
37
38#[derive(Clone)]
39pub struct SafeClientMetricsBase {
40    total_requests_by_address_method: IntCounterVec,
41    total_responses_by_address_method: IntCounterVec,
42    latency: HistogramVec,
43}
44
45impl SafeClientMetricsBase {
46    pub fn new(registry: &Registry) -> Self {
47        Self {
48            total_requests_by_address_method: register_int_counter_vec_with_registry!(
49                "safe_client_total_requests_by_address_method",
50                "Total requests to validators group by address and method",
51                &["address", "method"],
52                registry,
53            )
54            .unwrap(),
55            total_responses_by_address_method: register_int_counter_vec_with_registry!(
56                "safe_client_total_responses_by_address_method",
57                "Total good (OK) responses from validators group by address and method",
58                &["address", "method"],
59                registry,
60            )
61            .unwrap(),
62            // Address label is removed to reduce high cardinality, can be added back if needed
63            latency: register_histogram_vec_with_registry!(
64                "safe_client_latency",
65                "RPC latency observed by safe client aggregator, group by method",
66                &["method"],
67                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
68                registry,
69            )
70            .unwrap(),
71        }
72    }
73}
74
75/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
76#[derive(Clone)]
77pub struct SafeClientMetrics {
78    total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
79    total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
80    total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
81    total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
82    handle_certificate_latency: Histogram,
83    handle_obj_info_latency: Histogram,
84    handle_tx_info_latency: Histogram,
85}
86
87impl SafeClientMetrics {
88    pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
89        let validator_address = validator_address.to_string();
90
91        let total_requests_handle_transaction_info_request = metrics_base
92            .total_requests_by_address_method
93            .with_label_values(&[&validator_address, "handle_transaction_info_request"]);
94        let total_ok_responses_handle_transaction_info_request = metrics_base
95            .total_responses_by_address_method
96            .with_label_values(&[&validator_address, "handle_transaction_info_request"]);
97
98        let total_requests_handle_object_info_request = metrics_base
99            .total_requests_by_address_method
100            .with_label_values(&[&validator_address, "handle_object_info_request"]);
101        let total_ok_responses_handle_object_info_request = metrics_base
102            .total_responses_by_address_method
103            .with_label_values(&[&validator_address, "handle_object_info_request"]);
104
105        let handle_certificate_latency = metrics_base
106            .latency
107            .with_label_values(&["handle_certificate"]);
108        let handle_obj_info_latency = metrics_base
109            .latency
110            .with_label_values(&["handle_object_info_request"]);
111        let handle_tx_info_latency = metrics_base
112            .latency
113            .with_label_values(&["handle_transaction_info_request"]);
114
115        Self {
116            total_requests_handle_transaction_info_request,
117            total_ok_responses_handle_transaction_info_request,
118            total_requests_handle_object_info_request,
119            total_ok_responses_handle_object_info_request,
120            handle_certificate_latency,
121            handle_obj_info_latency,
122            handle_tx_info_latency,
123        }
124    }
125
126    pub fn new_for_tests(validator_address: AuthorityName) -> Self {
127        let registry = Registry::new();
128        let metrics_base = SafeClientMetricsBase::new(&registry);
129        Self::new(&metrics_base, validator_address)
130    }
131}
132
133/// See `SafeClientMetrics::new` for description of each metrics.
134/// The metrics are per validator client.
135#[derive(Clone)]
136pub struct SafeClient<C>
137where
138    C: Clone,
139{
140    authority_client: C,
141    committee_store: Arc<CommitteeStore>,
142    address: AuthorityPublicKeyBytes,
143    metrics: SafeClientMetrics,
144}
145
146impl<C: Clone> SafeClient<C> {
147    pub fn new(
148        authority_client: C,
149        committee_store: Arc<CommitteeStore>,
150        address: AuthorityPublicKeyBytes,
151        metrics: SafeClientMetrics,
152    ) -> Self {
153        Self {
154            authority_client,
155            committee_store,
156            address,
157            metrics,
158        }
159    }
160}
161
162impl<C: Clone> SafeClient<C> {
163    pub fn authority_client(&self) -> &C {
164        &self.authority_client
165    }
166
167    #[cfg(test)]
168    pub fn authority_client_mut(&mut self) -> &mut C {
169        &mut self.authority_client
170    }
171
172    fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Arc<Committee>> {
173        self.committee_store
174            .get_committee(epoch_id)?
175            .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(*epoch_id).into())
176    }
177
178    fn check_signed_effects_plain(
179        &self,
180        digest: &TransactionDigest,
181        signed_effects: SignedTransactionEffects,
182        expected_effects_digest: Option<&TransactionEffectsDigest>,
183    ) -> SuiResult<SignedTransactionEffects> {
184        // Check it has the right signer
185        fp_ensure!(
186            signed_effects.auth_sig().authority == self.address,
187            SuiErrorKind::ByzantineAuthoritySuspicion {
188                authority: self.address,
189                reason: format!(
190                    "Unexpected validator address in the signed effects signature: {:?}",
191                    signed_effects.auth_sig().authority
192                ),
193            }
194            .into()
195        );
196        // Checks it concerns the right tx
197        fp_ensure!(
198            signed_effects.data().transaction_digest() == digest,
199            SuiErrorKind::ByzantineAuthoritySuspicion {
200                authority: self.address,
201                reason: "Unexpected tx digest in the signed effects".to_string()
202            }
203            .into()
204        );
205        // check that the effects digest is correct.
206        if let Some(effects_digest) = expected_effects_digest {
207            fp_ensure!(
208                signed_effects.digest() == effects_digest,
209                SuiErrorKind::ByzantineAuthoritySuspicion {
210                    authority: self.address,
211                    reason: "Effects digest does not match with expected digest".to_string()
212                }
213                .into()
214            );
215        }
216        self.get_committee(&signed_effects.epoch())?;
217        Ok(signed_effects)
218    }
219
220    fn check_transaction_info(
221        &self,
222        digest: &TransactionDigest,
223        transaction: Transaction,
224        status: TransactionStatus,
225    ) -> SuiResult<PlainTransactionInfoResponse> {
226        fp_ensure!(
227            digest == transaction.digest(),
228            SuiErrorKind::ByzantineAuthoritySuspicion {
229                authority: self.address,
230                reason: "Signed transaction digest does not match with expected digest".to_string()
231            }
232            .into()
233        );
234        match status {
235            TransactionStatus::Signed(signed) => {
236                self.get_committee(&signed.epoch)?;
237                Ok(PlainTransactionInfoResponse::Signed(
238                    SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
239                ))
240            }
241            TransactionStatus::Executed(cert_opt, effects, events) => {
242                let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
243                match cert_opt {
244                    Some(cert) => {
245                        let committee = self.get_committee(&cert.epoch)?;
246                        let ct = CertifiedTransaction::new_from_data_and_sig(
247                            transaction.into_data(),
248                            cert,
249                        );
250                        ct.verify_committee_sigs_only(&committee).map_err(|e| {
251                            SuiErrorKind::FailedToVerifyTxCertWithExecutedEffects {
252                                validator_name: self.address,
253                                error: e.to_string(),
254                            }
255                        })?;
256                        Ok(PlainTransactionInfoResponse::ExecutedWithCert(
257                            ct,
258                            signed_effects,
259                            events,
260                        ))
261                    }
262                    None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
263                        transaction,
264                        signed_effects,
265                        events,
266                    )),
267                }
268            }
269        }
270    }
271
272    fn check_object_response(
273        &self,
274        request: &ObjectInfoRequest,
275        response: ObjectInfoResponse,
276    ) -> SuiResult<VerifiedObjectInfoResponse> {
277        let ObjectInfoResponse {
278            object,
279            layout: _,
280            lock_for_debugging: _,
281        } = response;
282
283        fp_ensure!(
284            request.object_id == object.id(),
285            SuiErrorKind::ByzantineAuthoritySuspicion {
286                authority: self.address,
287                reason: "Object id mismatch in the response".to_string()
288            }
289            .into()
290        );
291
292        Ok(VerifiedObjectInfoResponse { object })
293    }
294
295    pub fn address(&self) -> &AuthorityPublicKeyBytes {
296        &self.address
297    }
298}
299
300impl<C> SafeClient<C>
301where
302    C: AuthorityAPI + Send + Sync + Clone + 'static,
303{
304    /// Submit a transaction for certification and execution.
305    pub async fn submit_transaction(
306        &self,
307        request: SubmitTxRequest,
308        client_addr: Option<SocketAddr>,
309    ) -> Result<SubmitTxResponse, SuiError> {
310        let _timer = self.metrics.handle_certificate_latency.start_timer();
311        self.authority_client
312            .submit_transaction(request, client_addr)
313            .await
314    }
315
316    /// Wait for effects of a transaction that has been submitted to the network
317    /// through the `submit_transaction` API.
318    pub async fn wait_for_effects(
319        &self,
320        request: WaitForEffectsRequest,
321        client_addr: Option<SocketAddr>,
322    ) -> Result<WaitForEffectsResponse, SuiError> {
323        let _timer = self.metrics.handle_certificate_latency.start_timer();
324        let wait_for_effects_resp = self
325            .authority_client
326            .wait_for_effects(request, client_addr)
327            .await?;
328
329        match &wait_for_effects_resp {
330            WaitForEffectsResponse::Executed {
331                effects_digest: _,
332                fast_path: _,
333                details: Some(details),
334            } => {
335                self.verify_executed_data((**details).clone())?;
336            }
337            _ => {
338                // No additional verification needed for other response types
339            }
340        };
341
342        Ok(wait_for_effects_resp)
343    }
344
345    fn verify_events(
346        &self,
347        events: &Option<TransactionEvents>,
348        events_digest: Option<&TransactionEventsDigest>,
349    ) -> SuiResult {
350        match (events, events_digest) {
351            (None, None) | (None, Some(_)) => Ok(()),
352            (Some(events), None) => {
353                if !events.data.is_empty() {
354                    Err(SuiErrorKind::ByzantineAuthoritySuspicion {
355                        authority: self.address,
356                        reason: "Returned events but no event digest present in effects"
357                            .to_string(),
358                    }
359                    .into())
360                } else {
361                    Ok(())
362                }
363            }
364            (Some(events), Some(events_digest)) => {
365                fp_ensure!(
366                    &events.digest() == events_digest,
367                    SuiErrorKind::ByzantineAuthoritySuspicion {
368                        authority: self.address,
369                        reason: "Returned events don't match events digest in effects".to_string(),
370                    }
371                    .into()
372                );
373                Ok(())
374            }
375        }
376    }
377
378    fn verify_objects<I>(&self, objects: &Option<Vec<Object>>, expected_refs: I) -> SuiResult
379    where
380        I: IntoIterator<Item = (ObjectID, ObjectRef)>,
381    {
382        if let Some(objects) = objects {
383            let expected: HashMap<_, _> = expected_refs.into_iter().collect();
384
385            for object in objects {
386                let object_ref = object.compute_object_reference();
387                if expected
388                    .get(&object_ref.0)
389                    .is_none_or(|expect| &object_ref != expect)
390                {
391                    return Err(SuiErrorKind::ByzantineAuthoritySuspicion {
392                        authority: self.address,
393                        reason: "Returned object that wasn't present in effects".to_string(),
394                    }
395                    .into());
396                }
397            }
398        }
399        Ok(())
400    }
401
402    fn verify_executed_data(
403        &self,
404        ExecutedData {
405            effects,
406            events,
407            input_objects,
408            output_objects,
409        }: ExecutedData,
410    ) -> SuiResult<()> {
411        // Check Events
412        self.verify_events(&events, effects.events_digest())?;
413
414        // Check Input Objects
415        self.verify_objects(
416            &Some(input_objects).filter(|v| !v.is_empty()),
417            effects
418                .old_object_metadata()
419                .into_iter()
420                .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
421        )?;
422
423        // Check Output Objects
424        self.verify_objects(
425            &Some(output_objects).filter(|v| !v.is_empty()),
426            effects
427                .all_changed_objects()
428                .into_iter()
429                .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
430        )?;
431
432        Ok(())
433    }
434
435    pub async fn handle_object_info_request(
436        &self,
437        request: ObjectInfoRequest,
438    ) -> Result<VerifiedObjectInfoResponse, SuiError> {
439        self.metrics.total_requests_handle_object_info_request.inc();
440
441        let _timer = self.metrics.handle_obj_info_latency.start_timer();
442        let response = self
443            .authority_client
444            .handle_object_info_request(request.clone())
445            .await?;
446        let response = self
447            .check_object_response(&request, response)
448            .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
449
450        self.metrics
451            .total_ok_responses_handle_object_info_request
452            .inc();
453        Ok(response)
454    }
455
456    /// Handle Transaction information requests for a given digest.
457    /// Only used for testing.
458    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
459    pub async fn handle_transaction_info_request(
460        &self,
461        request: TransactionInfoRequest,
462    ) -> Result<PlainTransactionInfoResponse, SuiError> {
463        self.metrics
464            .total_requests_handle_transaction_info_request
465            .inc();
466
467        let _timer = self.metrics.handle_tx_info_latency.start_timer();
468
469        let transaction_info = self
470            .authority_client
471            .handle_transaction_info_request(request.clone())
472            .await?;
473
474        let transaction = Transaction::new(transaction_info.transaction);
475        let transaction_info = self.check_transaction_info(
476            &request.transaction_digest,
477            transaction,
478            transaction_info.status,
479        ).tap_err(|err| {
480            error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
481        })?;
482        self.metrics
483            .total_ok_responses_handle_transaction_info_request
484            .inc();
485        Ok(transaction_info)
486    }
487
488    fn verify_checkpoint_sequence(
489        &self,
490        expected_seq: Option<CheckpointSequenceNumber>,
491        checkpoint: &Option<CertifiedCheckpointSummary>,
492    ) -> SuiResult {
493        let observed_seq = checkpoint.as_ref().map(|c| c.sequence_number);
494
495        if let (Some(e), Some(o)) = (expected_seq, observed_seq) {
496            fp_ensure!(
497                e == o,
498                SuiError::from("Expected checkpoint number doesn't match with returned")
499            );
500        }
501        Ok(())
502    }
503
504    fn verify_contents_exist<T, O>(
505        &self,
506        request_content: bool,
507        checkpoint: &Option<T>,
508        contents: &Option<O>,
509    ) -> SuiResult {
510        match (request_content, checkpoint, contents) {
511            // If content is requested, checkpoint is not None, but we are not getting any content,
512            // it's an error.
513            // If content is not requested, or checkpoint is None, yet we are still getting content,
514            // it's an error.
515            (true, Some(_), None) | (false, _, Some(_)) | (_, None, Some(_)) => Err(
516                SuiError::from("Checkpoint contents inconsistent with request"),
517            ),
518            _ => Ok(()),
519        }
520    }
521
522    fn verify_checkpoint_response(
523        &self,
524        request: &CheckpointRequest,
525        response: &CheckpointResponse,
526    ) -> SuiResult {
527        // Verify response data was correct for request
528        let CheckpointResponse {
529            checkpoint,
530            contents,
531        } = &response;
532        // Checks that the sequence number is correct.
533        self.verify_checkpoint_sequence(request.sequence_number, checkpoint)?;
534        self.verify_contents_exist(request.request_content, checkpoint, contents)?;
535        // Verify signature.
536        match checkpoint {
537            Some(c) => {
538                let epoch_id = c.epoch;
539                c.verify_with_contents(&*self.get_committee(&epoch_id)?, contents.as_ref())
540            }
541            None => Ok(()),
542        }
543    }
544
545    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
546    pub async fn handle_checkpoint(
547        &self,
548        request: CheckpointRequest,
549    ) -> Result<CheckpointResponse, SuiError> {
550        let resp = self
551            .authority_client
552            .handle_checkpoint(request.clone())
553            .await?;
554        self.verify_checkpoint_response(&request, &resp)
555            .tap_err(|err| {
556                error!(?err, authority=?self.address, "Client error in handle_checkpoint");
557            })?;
558        Ok(resp)
559    }
560
561    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
562    pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
563        self.authority_client
564            .handle_system_state_object(SystemStateRequest { _unused: false })
565            .await
566    }
567
568    /// Handle validator health check requests (for latency measurement)
569    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
570    pub async fn validator_health(
571        &self,
572        request: ValidatorHealthRequest,
573    ) -> Result<ValidatorHealthResponse, SuiError> {
574        self.authority_client.validator_health(request).await
575    }
576}