sui_core/
safe_client.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::authority_client::AuthorityAPI;
6use crate::epoch::committee_store::CommitteeStore;
7use prometheus::core::GenericCounter;
8use prometheus::{
9    Histogram, HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry,
10    register_int_counter_vec_with_registry,
11};
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use sui_types::crypto::AuthorityPublicKeyBytes;
16use sui_types::digests::TransactionEventsDigest;
17use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI, TransactionEvents};
18use sui_types::messages_checkpoint::{
19    CertifiedCheckpointSummary, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
20};
21use sui_types::messages_grpc::{
22    ExecutedData, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest, SubmitTxResponse,
23    SystemStateRequest, TransactionInfoRequest, TransactionStatus, ValidatorHealthRequest,
24    ValidatorHealthResponse, VerifiedObjectInfoResponse, WaitForEffectsRequest,
25    WaitForEffectsResponse,
26};
27use sui_types::messages_safe_client::PlainTransactionInfoResponse;
28use sui_types::object::Object;
29use sui_types::sui_system_state::SuiSystemState;
30use sui_types::{base_types::*, committee::*, fp_ensure};
31use sui_types::{
32    error::{SuiError, SuiErrorKind, SuiResult},
33    transaction::*,
34};
35use tap::TapFallible;
36use tracing::{error, instrument};
37
38#[derive(Clone)]
39pub struct SafeClientMetricsBase {
40    total_requests_by_address_method: IntCounterVec,
41    total_responses_by_address_method: IntCounterVec,
42    latency: HistogramVec,
43}
44
45impl SafeClientMetricsBase {
46    pub fn new(registry: &Registry) -> Self {
47        Self {
48            total_requests_by_address_method: register_int_counter_vec_with_registry!(
49                "safe_client_total_requests_by_address_method",
50                "Total requests to validators group by address and method",
51                &["address", "method"],
52                registry,
53            )
54            .unwrap(),
55            total_responses_by_address_method: register_int_counter_vec_with_registry!(
56                "safe_client_total_responses_by_address_method",
57                "Total good (OK) responses from validators group by address and method",
58                &["address", "method"],
59                registry,
60            )
61            .unwrap(),
62            // Address label is removed to reduce high cardinality, can be added back if needed
63            latency: register_histogram_vec_with_registry!(
64                "safe_client_latency",
65                "RPC latency observed by safe client aggregator, group by method",
66                &["method"],
67                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
68                registry,
69            )
70            .unwrap(),
71        }
72    }
73}
74
75/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
76#[derive(Clone)]
77pub struct SafeClientMetrics {
78    total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
79    total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
80    total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
81    total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
82    handle_certificate_latency: Histogram,
83    handle_obj_info_latency: Histogram,
84    handle_tx_info_latency: Histogram,
85}
86
87impl SafeClientMetrics {
88    pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
89        let validator_address = validator_address.to_string();
90
91        let total_requests_handle_transaction_info_request = metrics_base
92            .total_requests_by_address_method
93            .with_label_values(&[
94                validator_address.as_str(),
95                "handle_transaction_info_request",
96            ]);
97        let total_ok_responses_handle_transaction_info_request = metrics_base
98            .total_responses_by_address_method
99            .with_label_values(&[
100                validator_address.as_str(),
101                "handle_transaction_info_request",
102            ]);
103
104        let total_requests_handle_object_info_request = metrics_base
105            .total_requests_by_address_method
106            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
107        let total_ok_responses_handle_object_info_request = metrics_base
108            .total_responses_by_address_method
109            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
110
111        let handle_certificate_latency = metrics_base
112            .latency
113            .with_label_values(&["handle_certificate"]);
114        let handle_obj_info_latency = metrics_base
115            .latency
116            .with_label_values(&["handle_object_info_request"]);
117        let handle_tx_info_latency = metrics_base
118            .latency
119            .with_label_values(&["handle_transaction_info_request"]);
120
121        Self {
122            total_requests_handle_transaction_info_request,
123            total_ok_responses_handle_transaction_info_request,
124            total_requests_handle_object_info_request,
125            total_ok_responses_handle_object_info_request,
126            handle_certificate_latency,
127            handle_obj_info_latency,
128            handle_tx_info_latency,
129        }
130    }
131
132    pub fn new_for_tests(validator_address: AuthorityName) -> Self {
133        let registry = Registry::new();
134        let metrics_base = SafeClientMetricsBase::new(&registry);
135        Self::new(&metrics_base, validator_address)
136    }
137}
138
139/// See `SafeClientMetrics::new` for description of each metrics.
140/// The metrics are per validator client.
141#[derive(Clone)]
142pub struct SafeClient<C>
143where
144    C: Clone,
145{
146    authority_client: C,
147    committee_store: Arc<CommitteeStore>,
148    address: AuthorityPublicKeyBytes,
149    metrics: SafeClientMetrics,
150}
151
152impl<C: Clone> SafeClient<C> {
153    pub fn new(
154        authority_client: C,
155        committee_store: Arc<CommitteeStore>,
156        address: AuthorityPublicKeyBytes,
157        metrics: SafeClientMetrics,
158    ) -> Self {
159        Self {
160            authority_client,
161            committee_store,
162            address,
163            metrics,
164        }
165    }
166}
167
168impl<C: Clone> SafeClient<C> {
169    pub fn authority_client(&self) -> &C {
170        &self.authority_client
171    }
172
173    #[cfg(test)]
174    pub fn authority_client_mut(&mut self) -> &mut C {
175        &mut self.authority_client
176    }
177
178    fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Arc<Committee>> {
179        self.committee_store
180            .get_committee(epoch_id)?
181            .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(*epoch_id).into())
182    }
183
184    fn check_signed_effects_plain(
185        &self,
186        digest: &TransactionDigest,
187        signed_effects: SignedTransactionEffects,
188        expected_effects_digest: Option<&TransactionEffectsDigest>,
189    ) -> SuiResult<SignedTransactionEffects> {
190        // Check it has the right signer
191        fp_ensure!(
192            signed_effects.auth_sig().authority == self.address,
193            SuiErrorKind::ByzantineAuthoritySuspicion {
194                authority: self.address,
195                reason: format!(
196                    "Unexpected validator address in the signed effects signature: {:?}",
197                    signed_effects.auth_sig().authority
198                ),
199            }
200            .into()
201        );
202        // Checks it concerns the right tx
203        fp_ensure!(
204            signed_effects.data().transaction_digest() == digest,
205            SuiErrorKind::ByzantineAuthoritySuspicion {
206                authority: self.address,
207                reason: "Unexpected tx digest in the signed effects".to_string()
208            }
209            .into()
210        );
211        // check that the effects digest is correct.
212        if let Some(effects_digest) = expected_effects_digest {
213            fp_ensure!(
214                signed_effects.digest() == effects_digest,
215                SuiErrorKind::ByzantineAuthoritySuspicion {
216                    authority: self.address,
217                    reason: "Effects digest does not match with expected digest".to_string()
218                }
219                .into()
220            );
221        }
222        self.get_committee(&signed_effects.epoch())?;
223        Ok(signed_effects)
224    }
225
226    fn check_transaction_info(
227        &self,
228        digest: &TransactionDigest,
229        transaction: Transaction,
230        status: TransactionStatus,
231    ) -> SuiResult<PlainTransactionInfoResponse> {
232        fp_ensure!(
233            digest == transaction.digest(),
234            SuiErrorKind::ByzantineAuthoritySuspicion {
235                authority: self.address,
236                reason: "Signed transaction digest does not match with expected digest".to_string()
237            }
238            .into()
239        );
240        match status {
241            TransactionStatus::Signed(signed) => {
242                self.get_committee(&signed.epoch)?;
243                Ok(PlainTransactionInfoResponse::Signed(
244                    SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
245                ))
246            }
247            TransactionStatus::Executed(_cert_opt, effects, events) => {
248                // `cert_opt` is permanently None: validators no longer aggregate or persist
249                // per-transaction quorum signatures.
250                let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
251                Ok(PlainTransactionInfoResponse::Executed(
252                    transaction,
253                    signed_effects,
254                    events,
255                ))
256            }
257        }
258    }
259
260    fn check_object_response(
261        &self,
262        request: &ObjectInfoRequest,
263        response: ObjectInfoResponse,
264    ) -> SuiResult<VerifiedObjectInfoResponse> {
265        let ObjectInfoResponse {
266            object,
267            layout: _,
268            lock_for_debugging: _,
269        } = response;
270
271        fp_ensure!(
272            request.object_id == object.id(),
273            SuiErrorKind::ByzantineAuthoritySuspicion {
274                authority: self.address,
275                reason: "Object id mismatch in the response".to_string()
276            }
277            .into()
278        );
279
280        Ok(VerifiedObjectInfoResponse { object })
281    }
282
283    pub fn address(&self) -> &AuthorityPublicKeyBytes {
284        &self.address
285    }
286}
287
288impl<C> SafeClient<C>
289where
290    C: AuthorityAPI + Send + Sync + Clone + 'static,
291{
292    /// Submit a transaction for certification and execution.
293    pub async fn submit_transaction(
294        &self,
295        request: SubmitTxRequest,
296        client_addr: Option<SocketAddr>,
297    ) -> Result<SubmitTxResponse, SuiError> {
298        let _timer = self.metrics.handle_certificate_latency.start_timer();
299        self.authority_client
300            .submit_transaction(request, client_addr)
301            .await
302    }
303
304    /// Wait for effects of a transaction that has been submitted to the network
305    /// through the `submit_transaction` API.
306    pub async fn wait_for_effects(
307        &self,
308        request: WaitForEffectsRequest,
309        client_addr: Option<SocketAddr>,
310    ) -> Result<WaitForEffectsResponse, SuiError> {
311        let _timer = self.metrics.handle_certificate_latency.start_timer();
312        let wait_for_effects_resp = self
313            .authority_client
314            .wait_for_effects(request, client_addr)
315            .await?;
316
317        match &wait_for_effects_resp {
318            WaitForEffectsResponse::Executed {
319                effects_digest: _,
320                details: Some(details),
321            } => {
322                self.verify_executed_data((**details).clone())?;
323            }
324            _ => {
325                // No additional verification needed for other response types
326            }
327        };
328
329        Ok(wait_for_effects_resp)
330    }
331
332    fn verify_events(
333        &self,
334        events: &Option<TransactionEvents>,
335        events_digest: Option<&TransactionEventsDigest>,
336    ) -> SuiResult {
337        match (events, events_digest) {
338            (None, None) | (None, Some(_)) => Ok(()),
339            (Some(events), None) => {
340                if !events.data.is_empty() {
341                    Err(SuiErrorKind::ByzantineAuthoritySuspicion {
342                        authority: self.address,
343                        reason: "Returned events but no event digest present in effects"
344                            .to_string(),
345                    }
346                    .into())
347                } else {
348                    Ok(())
349                }
350            }
351            (Some(events), Some(events_digest)) => {
352                fp_ensure!(
353                    &events.digest() == events_digest,
354                    SuiErrorKind::ByzantineAuthoritySuspicion {
355                        authority: self.address,
356                        reason: "Returned events don't match events digest in effects".to_string(),
357                    }
358                    .into()
359                );
360                Ok(())
361            }
362        }
363    }
364
365    fn verify_objects<I>(&self, objects: &Option<Vec<Object>>, expected_refs: I) -> SuiResult
366    where
367        I: IntoIterator<Item = (ObjectID, ObjectRef)>,
368    {
369        if let Some(objects) = objects {
370            let expected: HashMap<_, _> = expected_refs.into_iter().collect();
371
372            for object in objects {
373                let object_ref = object.compute_object_reference();
374                if expected
375                    .get(&object_ref.0)
376                    .is_none_or(|expect| &object_ref != expect)
377                {
378                    return Err(SuiErrorKind::ByzantineAuthoritySuspicion {
379                        authority: self.address,
380                        reason: "Returned object that wasn't present in effects".to_string(),
381                    }
382                    .into());
383                }
384            }
385        }
386        Ok(())
387    }
388
389    fn verify_executed_data(
390        &self,
391        ExecutedData {
392            effects,
393            events,
394            input_objects,
395            output_objects,
396        }: ExecutedData,
397    ) -> SuiResult<()> {
398        // Check Events
399        self.verify_events(&events, effects.events_digest())?;
400
401        // Check Input Objects
402        self.verify_objects(
403            &Some(input_objects).filter(|v| !v.is_empty()),
404            effects
405                .old_object_metadata()
406                .into_iter()
407                .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
408        )?;
409
410        // Check Output Objects
411        self.verify_objects(
412            &Some(output_objects).filter(|v| !v.is_empty()),
413            effects
414                .all_changed_objects()
415                .into_iter()
416                .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
417        )?;
418
419        Ok(())
420    }
421
422    pub async fn handle_object_info_request(
423        &self,
424        request: ObjectInfoRequest,
425    ) -> Result<VerifiedObjectInfoResponse, SuiError> {
426        self.metrics.total_requests_handle_object_info_request.inc();
427
428        let _timer = self.metrics.handle_obj_info_latency.start_timer();
429        let response = self
430            .authority_client
431            .handle_object_info_request(request.clone())
432            .await?;
433        let response = self
434            .check_object_response(&request, response)
435            .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
436
437        self.metrics
438            .total_ok_responses_handle_object_info_request
439            .inc();
440        Ok(response)
441    }
442
443    /// Handle Transaction information requests for a given digest.
444    /// Only used for testing.
445    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
446    pub async fn handle_transaction_info_request(
447        &self,
448        request: TransactionInfoRequest,
449    ) -> Result<PlainTransactionInfoResponse, SuiError> {
450        self.metrics
451            .total_requests_handle_transaction_info_request
452            .inc();
453
454        let _timer = self.metrics.handle_tx_info_latency.start_timer();
455
456        let transaction_info = self
457            .authority_client
458            .handle_transaction_info_request(request.clone())
459            .await?;
460
461        let transaction = Transaction::new(transaction_info.transaction);
462        let transaction_info = self.check_transaction_info(
463            &request.transaction_digest,
464            transaction,
465            transaction_info.status,
466        ).tap_err(|err| {
467            error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
468        })?;
469        self.metrics
470            .total_ok_responses_handle_transaction_info_request
471            .inc();
472        Ok(transaction_info)
473    }
474
475    fn verify_checkpoint_sequence(
476        &self,
477        expected_seq: Option<CheckpointSequenceNumber>,
478        checkpoint: &Option<CertifiedCheckpointSummary>,
479    ) -> SuiResult {
480        let observed_seq = checkpoint.as_ref().map(|c| c.sequence_number);
481
482        if let (Some(e), Some(o)) = (expected_seq, observed_seq) {
483            fp_ensure!(
484                e == o,
485                SuiError::from("Expected checkpoint number doesn't match with returned")
486            );
487        }
488        Ok(())
489    }
490
491    fn verify_contents_exist<T, O>(
492        &self,
493        request_content: bool,
494        checkpoint: &Option<T>,
495        contents: &Option<O>,
496    ) -> SuiResult {
497        match (request_content, checkpoint, contents) {
498            // If content is requested, checkpoint is not None, but we are not getting any content,
499            // it's an error.
500            // If content is not requested, or checkpoint is None, yet we are still getting content,
501            // it's an error.
502            (true, Some(_), None) | (false, _, Some(_)) | (_, None, Some(_)) => Err(
503                SuiError::from("Checkpoint contents inconsistent with request"),
504            ),
505            _ => Ok(()),
506        }
507    }
508
509    fn verify_checkpoint_response(
510        &self,
511        request: &CheckpointRequest,
512        response: &CheckpointResponse,
513    ) -> SuiResult {
514        // Verify response data was correct for request
515        let CheckpointResponse {
516            checkpoint,
517            contents,
518        } = &response;
519        // Checks that the sequence number is correct.
520        self.verify_checkpoint_sequence(request.sequence_number, checkpoint)?;
521        self.verify_contents_exist(request.request_content, checkpoint, contents)?;
522        // Verify signature.
523        match checkpoint {
524            Some(c) => {
525                let epoch_id = c.epoch;
526                c.verify_with_contents(&*self.get_committee(&epoch_id)?, contents.as_ref())
527            }
528            None => Ok(()),
529        }
530    }
531
532    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
533    pub async fn handle_checkpoint(
534        &self,
535        request: CheckpointRequest,
536    ) -> Result<CheckpointResponse, SuiError> {
537        let resp = self
538            .authority_client
539            .handle_checkpoint(request.clone())
540            .await?;
541        self.verify_checkpoint_response(&request, &resp)
542            .tap_err(|err| {
543                error!(?err, authority=?self.address, "Client error in handle_checkpoint");
544            })?;
545        Ok(resp)
546    }
547
548    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
549    pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
550        self.authority_client
551            .handle_system_state_object(SystemStateRequest { _unused: false })
552            .await
553    }
554
555    /// Handle validator health check requests (for latency measurement)
556    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
557    pub async fn validator_health(
558        &self,
559        request: ValidatorHealthRequest,
560    ) -> Result<ValidatorHealthResponse, SuiError> {
561        self.authority_client.validator_health(request).await
562    }
563}