1use crate::authority_client::AuthorityAPI;
6use crate::epoch::committee_store::CommitteeStore;
7use prometheus::core::GenericCounter;
8use prometheus::{
9 Histogram, HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry,
10 register_int_counter_vec_with_registry,
11};
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use sui_types::crypto::AuthorityPublicKeyBytes;
16use sui_types::digests::TransactionEventsDigest;
17use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI, TransactionEvents};
18use sui_types::messages_checkpoint::{
19 CertifiedCheckpointSummary, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
20};
21use sui_types::messages_grpc::{
22 ExecutedData, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest, SubmitTxResponse,
23 SystemStateRequest, TransactionInfoRequest, TransactionStatus, ValidatorHealthRequest,
24 ValidatorHealthResponse, VerifiedObjectInfoResponse, WaitForEffectsRequest,
25 WaitForEffectsResponse,
26};
27use sui_types::messages_safe_client::PlainTransactionInfoResponse;
28use sui_types::object::Object;
29use sui_types::sui_system_state::SuiSystemState;
30use sui_types::{base_types::*, committee::*, fp_ensure};
31use sui_types::{
32 error::{SuiError, SuiErrorKind, SuiResult},
33 transaction::*,
34};
35use tap::TapFallible;
36use tracing::{error, instrument};
37
38#[derive(Clone)]
39pub struct SafeClientMetricsBase {
40 total_requests_by_address_method: IntCounterVec,
41 total_responses_by_address_method: IntCounterVec,
42 latency: HistogramVec,
43}
44
45impl SafeClientMetricsBase {
46 pub fn new(registry: &Registry) -> Self {
47 Self {
48 total_requests_by_address_method: register_int_counter_vec_with_registry!(
49 "safe_client_total_requests_by_address_method",
50 "Total requests to validators group by address and method",
51 &["address", "method"],
52 registry,
53 )
54 .unwrap(),
55 total_responses_by_address_method: register_int_counter_vec_with_registry!(
56 "safe_client_total_responses_by_address_method",
57 "Total good (OK) responses from validators group by address and method",
58 &["address", "method"],
59 registry,
60 )
61 .unwrap(),
62 latency: register_histogram_vec_with_registry!(
64 "safe_client_latency",
65 "RPC latency observed by safe client aggregator, group by method",
66 &["method"],
67 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
68 registry,
69 )
70 .unwrap(),
71 }
72 }
73}
74
75#[derive(Clone)]
77pub struct SafeClientMetrics {
78 total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
79 total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
80 total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
81 total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
82 handle_certificate_latency: Histogram,
83 handle_obj_info_latency: Histogram,
84 handle_tx_info_latency: Histogram,
85}
86
87impl SafeClientMetrics {
88 pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
89 let validator_address = validator_address.to_string();
90
91 let total_requests_handle_transaction_info_request = metrics_base
92 .total_requests_by_address_method
93 .with_label_values(&[
94 validator_address.as_str(),
95 "handle_transaction_info_request",
96 ]);
97 let total_ok_responses_handle_transaction_info_request = metrics_base
98 .total_responses_by_address_method
99 .with_label_values(&[
100 validator_address.as_str(),
101 "handle_transaction_info_request",
102 ]);
103
104 let total_requests_handle_object_info_request = metrics_base
105 .total_requests_by_address_method
106 .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
107 let total_ok_responses_handle_object_info_request = metrics_base
108 .total_responses_by_address_method
109 .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
110
111 let handle_certificate_latency = metrics_base
112 .latency
113 .with_label_values(&["handle_certificate"]);
114 let handle_obj_info_latency = metrics_base
115 .latency
116 .with_label_values(&["handle_object_info_request"]);
117 let handle_tx_info_latency = metrics_base
118 .latency
119 .with_label_values(&["handle_transaction_info_request"]);
120
121 Self {
122 total_requests_handle_transaction_info_request,
123 total_ok_responses_handle_transaction_info_request,
124 total_requests_handle_object_info_request,
125 total_ok_responses_handle_object_info_request,
126 handle_certificate_latency,
127 handle_obj_info_latency,
128 handle_tx_info_latency,
129 }
130 }
131
132 pub fn new_for_tests(validator_address: AuthorityName) -> Self {
133 let registry = Registry::new();
134 let metrics_base = SafeClientMetricsBase::new(®istry);
135 Self::new(&metrics_base, validator_address)
136 }
137}
138
139#[derive(Clone)]
142pub struct SafeClient<C>
143where
144 C: Clone,
145{
146 authority_client: C,
147 committee_store: Arc<CommitteeStore>,
148 address: AuthorityPublicKeyBytes,
149 metrics: SafeClientMetrics,
150}
151
152impl<C: Clone> SafeClient<C> {
153 pub fn new(
154 authority_client: C,
155 committee_store: Arc<CommitteeStore>,
156 address: AuthorityPublicKeyBytes,
157 metrics: SafeClientMetrics,
158 ) -> Self {
159 Self {
160 authority_client,
161 committee_store,
162 address,
163 metrics,
164 }
165 }
166}
167
168impl<C: Clone> SafeClient<C> {
169 pub fn authority_client(&self) -> &C {
170 &self.authority_client
171 }
172
173 #[cfg(test)]
174 pub fn authority_client_mut(&mut self) -> &mut C {
175 &mut self.authority_client
176 }
177
178 fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Arc<Committee>> {
179 self.committee_store
180 .get_committee(epoch_id)?
181 .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(*epoch_id).into())
182 }
183
184 fn check_signed_effects_plain(
185 &self,
186 digest: &TransactionDigest,
187 signed_effects: SignedTransactionEffects,
188 expected_effects_digest: Option<&TransactionEffectsDigest>,
189 ) -> SuiResult<SignedTransactionEffects> {
190 fp_ensure!(
192 signed_effects.auth_sig().authority == self.address,
193 SuiErrorKind::ByzantineAuthoritySuspicion {
194 authority: self.address,
195 reason: format!(
196 "Unexpected validator address in the signed effects signature: {:?}",
197 signed_effects.auth_sig().authority
198 ),
199 }
200 .into()
201 );
202 fp_ensure!(
204 signed_effects.data().transaction_digest() == digest,
205 SuiErrorKind::ByzantineAuthoritySuspicion {
206 authority: self.address,
207 reason: "Unexpected tx digest in the signed effects".to_string()
208 }
209 .into()
210 );
211 if let Some(effects_digest) = expected_effects_digest {
213 fp_ensure!(
214 signed_effects.digest() == effects_digest,
215 SuiErrorKind::ByzantineAuthoritySuspicion {
216 authority: self.address,
217 reason: "Effects digest does not match with expected digest".to_string()
218 }
219 .into()
220 );
221 }
222 self.get_committee(&signed_effects.epoch())?;
223 Ok(signed_effects)
224 }
225
226 fn check_transaction_info(
227 &self,
228 digest: &TransactionDigest,
229 transaction: Transaction,
230 status: TransactionStatus,
231 ) -> SuiResult<PlainTransactionInfoResponse> {
232 fp_ensure!(
233 digest == transaction.digest(),
234 SuiErrorKind::ByzantineAuthoritySuspicion {
235 authority: self.address,
236 reason: "Signed transaction digest does not match with expected digest".to_string()
237 }
238 .into()
239 );
240 match status {
241 TransactionStatus::Signed(signed) => {
242 self.get_committee(&signed.epoch)?;
243 Ok(PlainTransactionInfoResponse::Signed(
244 SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
245 ))
246 }
247 TransactionStatus::Executed(cert_opt, effects, events) => {
248 let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
249 match cert_opt {
250 Some(cert) => {
251 let committee = self.get_committee(&cert.epoch)?;
252 let ct = CertifiedTransaction::new_from_data_and_sig(
253 transaction.into_data(),
254 cert,
255 );
256 ct.verify_committee_sigs_only(&committee).map_err(|e| {
257 SuiErrorKind::FailedToVerifyTxCertWithExecutedEffects {
258 validator_name: self.address,
259 error: e.to_string(),
260 }
261 })?;
262 Ok(PlainTransactionInfoResponse::ExecutedWithCert(
263 ct,
264 signed_effects,
265 events,
266 ))
267 }
268 None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
269 transaction,
270 signed_effects,
271 events,
272 )),
273 }
274 }
275 }
276 }
277
278 fn check_object_response(
279 &self,
280 request: &ObjectInfoRequest,
281 response: ObjectInfoResponse,
282 ) -> SuiResult<VerifiedObjectInfoResponse> {
283 let ObjectInfoResponse {
284 object,
285 layout: _,
286 lock_for_debugging: _,
287 } = response;
288
289 fp_ensure!(
290 request.object_id == object.id(),
291 SuiErrorKind::ByzantineAuthoritySuspicion {
292 authority: self.address,
293 reason: "Object id mismatch in the response".to_string()
294 }
295 .into()
296 );
297
298 Ok(VerifiedObjectInfoResponse { object })
299 }
300
301 pub fn address(&self) -> &AuthorityPublicKeyBytes {
302 &self.address
303 }
304}
305
306impl<C> SafeClient<C>
307where
308 C: AuthorityAPI + Send + Sync + Clone + 'static,
309{
310 pub async fn submit_transaction(
312 &self,
313 request: SubmitTxRequest,
314 client_addr: Option<SocketAddr>,
315 ) -> Result<SubmitTxResponse, SuiError> {
316 let _timer = self.metrics.handle_certificate_latency.start_timer();
317 self.authority_client
318 .submit_transaction(request, client_addr)
319 .await
320 }
321
322 pub async fn wait_for_effects(
325 &self,
326 request: WaitForEffectsRequest,
327 client_addr: Option<SocketAddr>,
328 ) -> Result<WaitForEffectsResponse, SuiError> {
329 let _timer = self.metrics.handle_certificate_latency.start_timer();
330 let wait_for_effects_resp = self
331 .authority_client
332 .wait_for_effects(request, client_addr)
333 .await?;
334
335 match &wait_for_effects_resp {
336 WaitForEffectsResponse::Executed {
337 effects_digest: _,
338 fast_path: _,
339 details: Some(details),
340 } => {
341 self.verify_executed_data((**details).clone())?;
342 }
343 _ => {
344 }
346 };
347
348 Ok(wait_for_effects_resp)
349 }
350
351 fn verify_events(
352 &self,
353 events: &Option<TransactionEvents>,
354 events_digest: Option<&TransactionEventsDigest>,
355 ) -> SuiResult {
356 match (events, events_digest) {
357 (None, None) | (None, Some(_)) => Ok(()),
358 (Some(events), None) => {
359 if !events.data.is_empty() {
360 Err(SuiErrorKind::ByzantineAuthoritySuspicion {
361 authority: self.address,
362 reason: "Returned events but no event digest present in effects"
363 .to_string(),
364 }
365 .into())
366 } else {
367 Ok(())
368 }
369 }
370 (Some(events), Some(events_digest)) => {
371 fp_ensure!(
372 &events.digest() == events_digest,
373 SuiErrorKind::ByzantineAuthoritySuspicion {
374 authority: self.address,
375 reason: "Returned events don't match events digest in effects".to_string(),
376 }
377 .into()
378 );
379 Ok(())
380 }
381 }
382 }
383
384 fn verify_objects<I>(&self, objects: &Option<Vec<Object>>, expected_refs: I) -> SuiResult
385 where
386 I: IntoIterator<Item = (ObjectID, ObjectRef)>,
387 {
388 if let Some(objects) = objects {
389 let expected: HashMap<_, _> = expected_refs.into_iter().collect();
390
391 for object in objects {
392 let object_ref = object.compute_object_reference();
393 if expected
394 .get(&object_ref.0)
395 .is_none_or(|expect| &object_ref != expect)
396 {
397 return Err(SuiErrorKind::ByzantineAuthoritySuspicion {
398 authority: self.address,
399 reason: "Returned object that wasn't present in effects".to_string(),
400 }
401 .into());
402 }
403 }
404 }
405 Ok(())
406 }
407
408 fn verify_executed_data(
409 &self,
410 ExecutedData {
411 effects,
412 events,
413 input_objects,
414 output_objects,
415 }: ExecutedData,
416 ) -> SuiResult<()> {
417 self.verify_events(&events, effects.events_digest())?;
419
420 self.verify_objects(
422 &Some(input_objects).filter(|v| !v.is_empty()),
423 effects
424 .old_object_metadata()
425 .into_iter()
426 .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
427 )?;
428
429 self.verify_objects(
431 &Some(output_objects).filter(|v| !v.is_empty()),
432 effects
433 .all_changed_objects()
434 .into_iter()
435 .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
436 )?;
437
438 Ok(())
439 }
440
441 pub async fn handle_object_info_request(
442 &self,
443 request: ObjectInfoRequest,
444 ) -> Result<VerifiedObjectInfoResponse, SuiError> {
445 self.metrics.total_requests_handle_object_info_request.inc();
446
447 let _timer = self.metrics.handle_obj_info_latency.start_timer();
448 let response = self
449 .authority_client
450 .handle_object_info_request(request.clone())
451 .await?;
452 let response = self
453 .check_object_response(&request, response)
454 .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
455
456 self.metrics
457 .total_ok_responses_handle_object_info_request
458 .inc();
459 Ok(response)
460 }
461
462 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
465 pub async fn handle_transaction_info_request(
466 &self,
467 request: TransactionInfoRequest,
468 ) -> Result<PlainTransactionInfoResponse, SuiError> {
469 self.metrics
470 .total_requests_handle_transaction_info_request
471 .inc();
472
473 let _timer = self.metrics.handle_tx_info_latency.start_timer();
474
475 let transaction_info = self
476 .authority_client
477 .handle_transaction_info_request(request.clone())
478 .await?;
479
480 let transaction = Transaction::new(transaction_info.transaction);
481 let transaction_info = self.check_transaction_info(
482 &request.transaction_digest,
483 transaction,
484 transaction_info.status,
485 ).tap_err(|err| {
486 error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
487 })?;
488 self.metrics
489 .total_ok_responses_handle_transaction_info_request
490 .inc();
491 Ok(transaction_info)
492 }
493
494 fn verify_checkpoint_sequence(
495 &self,
496 expected_seq: Option<CheckpointSequenceNumber>,
497 checkpoint: &Option<CertifiedCheckpointSummary>,
498 ) -> SuiResult {
499 let observed_seq = checkpoint.as_ref().map(|c| c.sequence_number);
500
501 if let (Some(e), Some(o)) = (expected_seq, observed_seq) {
502 fp_ensure!(
503 e == o,
504 SuiError::from("Expected checkpoint number doesn't match with returned")
505 );
506 }
507 Ok(())
508 }
509
510 fn verify_contents_exist<T, O>(
511 &self,
512 request_content: bool,
513 checkpoint: &Option<T>,
514 contents: &Option<O>,
515 ) -> SuiResult {
516 match (request_content, checkpoint, contents) {
517 (true, Some(_), None) | (false, _, Some(_)) | (_, None, Some(_)) => Err(
522 SuiError::from("Checkpoint contents inconsistent with request"),
523 ),
524 _ => Ok(()),
525 }
526 }
527
528 fn verify_checkpoint_response(
529 &self,
530 request: &CheckpointRequest,
531 response: &CheckpointResponse,
532 ) -> SuiResult {
533 let CheckpointResponse {
535 checkpoint,
536 contents,
537 } = &response;
538 self.verify_checkpoint_sequence(request.sequence_number, checkpoint)?;
540 self.verify_contents_exist(request.request_content, checkpoint, contents)?;
541 match checkpoint {
543 Some(c) => {
544 let epoch_id = c.epoch;
545 c.verify_with_contents(&*self.get_committee(&epoch_id)?, contents.as_ref())
546 }
547 None => Ok(()),
548 }
549 }
550
551 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
552 pub async fn handle_checkpoint(
553 &self,
554 request: CheckpointRequest,
555 ) -> Result<CheckpointResponse, SuiError> {
556 let resp = self
557 .authority_client
558 .handle_checkpoint(request.clone())
559 .await?;
560 self.verify_checkpoint_response(&request, &resp)
561 .tap_err(|err| {
562 error!(?err, authority=?self.address, "Client error in handle_checkpoint");
563 })?;
564 Ok(resp)
565 }
566
567 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
568 pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
569 self.authority_client
570 .handle_system_state_object(SystemStateRequest { _unused: false })
571 .await
572 }
573
574 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
576 pub async fn validator_health(
577 &self,
578 request: ValidatorHealthRequest,
579 ) -> Result<ValidatorHealthResponse, SuiError> {
580 self.authority_client.validator_health(request).await
581 }
582}