1use crate::authority_client::AuthorityAPI;
6use crate::epoch::committee_store::CommitteeStore;
7use prometheus::core::GenericCounter;
8use prometheus::{
9 Histogram, HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry,
10 register_int_counter_vec_with_registry,
11};
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use sui_types::crypto::AuthorityPublicKeyBytes;
16use sui_types::digests::TransactionEventsDigest;
17use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI, TransactionEvents};
18use sui_types::messages_checkpoint::{
19 CertifiedCheckpointSummary, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
20};
21use sui_types::messages_grpc::{
22 ExecutedData, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest, SubmitTxResponse,
23 SystemStateRequest, TransactionInfoRequest, TransactionStatus, ValidatorHealthRequest,
24 ValidatorHealthResponse, VerifiedObjectInfoResponse, WaitForEffectsRequest,
25 WaitForEffectsResponse,
26};
27use sui_types::messages_safe_client::PlainTransactionInfoResponse;
28use sui_types::object::Object;
29use sui_types::sui_system_state::SuiSystemState;
30use sui_types::{base_types::*, committee::*, fp_ensure};
31use sui_types::{
32 error::{SuiError, SuiErrorKind, SuiResult},
33 transaction::*,
34};
35use tap::TapFallible;
36use tracing::{error, instrument};
37
38#[derive(Clone)]
39pub struct SafeClientMetricsBase {
40 total_requests_by_address_method: IntCounterVec,
41 total_responses_by_address_method: IntCounterVec,
42 latency: HistogramVec,
43}
44
45impl SafeClientMetricsBase {
46 pub fn new(registry: &Registry) -> Self {
47 Self {
48 total_requests_by_address_method: register_int_counter_vec_with_registry!(
49 "safe_client_total_requests_by_address_method",
50 "Total requests to validators group by address and method",
51 &["address", "method"],
52 registry,
53 )
54 .unwrap(),
55 total_responses_by_address_method: register_int_counter_vec_with_registry!(
56 "safe_client_total_responses_by_address_method",
57 "Total good (OK) responses from validators group by address and method",
58 &["address", "method"],
59 registry,
60 )
61 .unwrap(),
62 latency: register_histogram_vec_with_registry!(
64 "safe_client_latency",
65 "RPC latency observed by safe client aggregator, group by method",
66 &["method"],
67 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
68 registry,
69 )
70 .unwrap(),
71 }
72 }
73}
74
75#[derive(Clone)]
77pub struct SafeClientMetrics {
78 total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
79 total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
80 total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
81 total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
82 handle_certificate_latency: Histogram,
83 handle_obj_info_latency: Histogram,
84 handle_tx_info_latency: Histogram,
85}
86
87impl SafeClientMetrics {
88 pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
89 let validator_address = validator_address.to_string();
90
91 let total_requests_handle_transaction_info_request = metrics_base
92 .total_requests_by_address_method
93 .with_label_values(&[&validator_address, "handle_transaction_info_request"]);
94 let total_ok_responses_handle_transaction_info_request = metrics_base
95 .total_responses_by_address_method
96 .with_label_values(&[&validator_address, "handle_transaction_info_request"]);
97
98 let total_requests_handle_object_info_request = metrics_base
99 .total_requests_by_address_method
100 .with_label_values(&[&validator_address, "handle_object_info_request"]);
101 let total_ok_responses_handle_object_info_request = metrics_base
102 .total_responses_by_address_method
103 .with_label_values(&[&validator_address, "handle_object_info_request"]);
104
105 let handle_certificate_latency = metrics_base
106 .latency
107 .with_label_values(&["handle_certificate"]);
108 let handle_obj_info_latency = metrics_base
109 .latency
110 .with_label_values(&["handle_object_info_request"]);
111 let handle_tx_info_latency = metrics_base
112 .latency
113 .with_label_values(&["handle_transaction_info_request"]);
114
115 Self {
116 total_requests_handle_transaction_info_request,
117 total_ok_responses_handle_transaction_info_request,
118 total_requests_handle_object_info_request,
119 total_ok_responses_handle_object_info_request,
120 handle_certificate_latency,
121 handle_obj_info_latency,
122 handle_tx_info_latency,
123 }
124 }
125
126 pub fn new_for_tests(validator_address: AuthorityName) -> Self {
127 let registry = Registry::new();
128 let metrics_base = SafeClientMetricsBase::new(®istry);
129 Self::new(&metrics_base, validator_address)
130 }
131}
132
133#[derive(Clone)]
136pub struct SafeClient<C>
137where
138 C: Clone,
139{
140 authority_client: C,
141 committee_store: Arc<CommitteeStore>,
142 address: AuthorityPublicKeyBytes,
143 metrics: SafeClientMetrics,
144}
145
146impl<C: Clone> SafeClient<C> {
147 pub fn new(
148 authority_client: C,
149 committee_store: Arc<CommitteeStore>,
150 address: AuthorityPublicKeyBytes,
151 metrics: SafeClientMetrics,
152 ) -> Self {
153 Self {
154 authority_client,
155 committee_store,
156 address,
157 metrics,
158 }
159 }
160}
161
162impl<C: Clone> SafeClient<C> {
163 pub fn authority_client(&self) -> &C {
164 &self.authority_client
165 }
166
167 #[cfg(test)]
168 pub fn authority_client_mut(&mut self) -> &mut C {
169 &mut self.authority_client
170 }
171
172 fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Arc<Committee>> {
173 self.committee_store
174 .get_committee(epoch_id)?
175 .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(*epoch_id).into())
176 }
177
178 fn check_signed_effects_plain(
179 &self,
180 digest: &TransactionDigest,
181 signed_effects: SignedTransactionEffects,
182 expected_effects_digest: Option<&TransactionEffectsDigest>,
183 ) -> SuiResult<SignedTransactionEffects> {
184 fp_ensure!(
186 signed_effects.auth_sig().authority == self.address,
187 SuiErrorKind::ByzantineAuthoritySuspicion {
188 authority: self.address,
189 reason: format!(
190 "Unexpected validator address in the signed effects signature: {:?}",
191 signed_effects.auth_sig().authority
192 ),
193 }
194 .into()
195 );
196 fp_ensure!(
198 signed_effects.data().transaction_digest() == digest,
199 SuiErrorKind::ByzantineAuthoritySuspicion {
200 authority: self.address,
201 reason: "Unexpected tx digest in the signed effects".to_string()
202 }
203 .into()
204 );
205 if let Some(effects_digest) = expected_effects_digest {
207 fp_ensure!(
208 signed_effects.digest() == effects_digest,
209 SuiErrorKind::ByzantineAuthoritySuspicion {
210 authority: self.address,
211 reason: "Effects digest does not match with expected digest".to_string()
212 }
213 .into()
214 );
215 }
216 self.get_committee(&signed_effects.epoch())?;
217 Ok(signed_effects)
218 }
219
220 fn check_transaction_info(
221 &self,
222 digest: &TransactionDigest,
223 transaction: Transaction,
224 status: TransactionStatus,
225 ) -> SuiResult<PlainTransactionInfoResponse> {
226 fp_ensure!(
227 digest == transaction.digest(),
228 SuiErrorKind::ByzantineAuthoritySuspicion {
229 authority: self.address,
230 reason: "Signed transaction digest does not match with expected digest".to_string()
231 }
232 .into()
233 );
234 match status {
235 TransactionStatus::Signed(signed) => {
236 self.get_committee(&signed.epoch)?;
237 Ok(PlainTransactionInfoResponse::Signed(
238 SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
239 ))
240 }
241 TransactionStatus::Executed(cert_opt, effects, events) => {
242 let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
243 match cert_opt {
244 Some(cert) => {
245 let committee = self.get_committee(&cert.epoch)?;
246 let ct = CertifiedTransaction::new_from_data_and_sig(
247 transaction.into_data(),
248 cert,
249 );
250 ct.verify_committee_sigs_only(&committee).map_err(|e| {
251 SuiErrorKind::FailedToVerifyTxCertWithExecutedEffects {
252 validator_name: self.address,
253 error: e.to_string(),
254 }
255 })?;
256 Ok(PlainTransactionInfoResponse::ExecutedWithCert(
257 ct,
258 signed_effects,
259 events,
260 ))
261 }
262 None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
263 transaction,
264 signed_effects,
265 events,
266 )),
267 }
268 }
269 }
270 }
271
272 fn check_object_response(
273 &self,
274 request: &ObjectInfoRequest,
275 response: ObjectInfoResponse,
276 ) -> SuiResult<VerifiedObjectInfoResponse> {
277 let ObjectInfoResponse {
278 object,
279 layout: _,
280 lock_for_debugging: _,
281 } = response;
282
283 fp_ensure!(
284 request.object_id == object.id(),
285 SuiErrorKind::ByzantineAuthoritySuspicion {
286 authority: self.address,
287 reason: "Object id mismatch in the response".to_string()
288 }
289 .into()
290 );
291
292 Ok(VerifiedObjectInfoResponse { object })
293 }
294
295 pub fn address(&self) -> &AuthorityPublicKeyBytes {
296 &self.address
297 }
298}
299
300impl<C> SafeClient<C>
301where
302 C: AuthorityAPI + Send + Sync + Clone + 'static,
303{
304 pub async fn submit_transaction(
306 &self,
307 request: SubmitTxRequest,
308 client_addr: Option<SocketAddr>,
309 ) -> Result<SubmitTxResponse, SuiError> {
310 let _timer = self.metrics.handle_certificate_latency.start_timer();
311 self.authority_client
312 .submit_transaction(request, client_addr)
313 .await
314 }
315
316 pub async fn wait_for_effects(
319 &self,
320 request: WaitForEffectsRequest,
321 client_addr: Option<SocketAddr>,
322 ) -> Result<WaitForEffectsResponse, SuiError> {
323 let _timer = self.metrics.handle_certificate_latency.start_timer();
324 let wait_for_effects_resp = self
325 .authority_client
326 .wait_for_effects(request, client_addr)
327 .await?;
328
329 match &wait_for_effects_resp {
330 WaitForEffectsResponse::Executed {
331 effects_digest: _,
332 fast_path: _,
333 details: Some(details),
334 } => {
335 self.verify_executed_data((**details).clone())?;
336 }
337 _ => {
338 }
340 };
341
342 Ok(wait_for_effects_resp)
343 }
344
345 fn verify_events(
346 &self,
347 events: &Option<TransactionEvents>,
348 events_digest: Option<&TransactionEventsDigest>,
349 ) -> SuiResult {
350 match (events, events_digest) {
351 (None, None) | (None, Some(_)) => Ok(()),
352 (Some(events), None) => {
353 if !events.data.is_empty() {
354 Err(SuiErrorKind::ByzantineAuthoritySuspicion {
355 authority: self.address,
356 reason: "Returned events but no event digest present in effects"
357 .to_string(),
358 }
359 .into())
360 } else {
361 Ok(())
362 }
363 }
364 (Some(events), Some(events_digest)) => {
365 fp_ensure!(
366 &events.digest() == events_digest,
367 SuiErrorKind::ByzantineAuthoritySuspicion {
368 authority: self.address,
369 reason: "Returned events don't match events digest in effects".to_string(),
370 }
371 .into()
372 );
373 Ok(())
374 }
375 }
376 }
377
378 fn verify_objects<I>(&self, objects: &Option<Vec<Object>>, expected_refs: I) -> SuiResult
379 where
380 I: IntoIterator<Item = (ObjectID, ObjectRef)>,
381 {
382 if let Some(objects) = objects {
383 let expected: HashMap<_, _> = expected_refs.into_iter().collect();
384
385 for object in objects {
386 let object_ref = object.compute_object_reference();
387 if expected
388 .get(&object_ref.0)
389 .is_none_or(|expect| &object_ref != expect)
390 {
391 return Err(SuiErrorKind::ByzantineAuthoritySuspicion {
392 authority: self.address,
393 reason: "Returned object that wasn't present in effects".to_string(),
394 }
395 .into());
396 }
397 }
398 }
399 Ok(())
400 }
401
402 fn verify_executed_data(
403 &self,
404 ExecutedData {
405 effects,
406 events,
407 input_objects,
408 output_objects,
409 }: ExecutedData,
410 ) -> SuiResult<()> {
411 self.verify_events(&events, effects.events_digest())?;
413
414 self.verify_objects(
416 &Some(input_objects).filter(|v| !v.is_empty()),
417 effects
418 .old_object_metadata()
419 .into_iter()
420 .map(|(object_ref, _owner)| (object_ref.0, object_ref)),
421 )?;
422
423 self.verify_objects(
425 &Some(output_objects).filter(|v| !v.is_empty()),
426 effects
427 .all_changed_objects()
428 .into_iter()
429 .map(|(object_ref, _, _)| (object_ref.0, object_ref)),
430 )?;
431
432 Ok(())
433 }
434
435 pub async fn handle_object_info_request(
436 &self,
437 request: ObjectInfoRequest,
438 ) -> Result<VerifiedObjectInfoResponse, SuiError> {
439 self.metrics.total_requests_handle_object_info_request.inc();
440
441 let _timer = self.metrics.handle_obj_info_latency.start_timer();
442 let response = self
443 .authority_client
444 .handle_object_info_request(request.clone())
445 .await?;
446 let response = self
447 .check_object_response(&request, response)
448 .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
449
450 self.metrics
451 .total_ok_responses_handle_object_info_request
452 .inc();
453 Ok(response)
454 }
455
456 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
459 pub async fn handle_transaction_info_request(
460 &self,
461 request: TransactionInfoRequest,
462 ) -> Result<PlainTransactionInfoResponse, SuiError> {
463 self.metrics
464 .total_requests_handle_transaction_info_request
465 .inc();
466
467 let _timer = self.metrics.handle_tx_info_latency.start_timer();
468
469 let transaction_info = self
470 .authority_client
471 .handle_transaction_info_request(request.clone())
472 .await?;
473
474 let transaction = Transaction::new(transaction_info.transaction);
475 let transaction_info = self.check_transaction_info(
476 &request.transaction_digest,
477 transaction,
478 transaction_info.status,
479 ).tap_err(|err| {
480 error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
481 })?;
482 self.metrics
483 .total_ok_responses_handle_transaction_info_request
484 .inc();
485 Ok(transaction_info)
486 }
487
488 fn verify_checkpoint_sequence(
489 &self,
490 expected_seq: Option<CheckpointSequenceNumber>,
491 checkpoint: &Option<CertifiedCheckpointSummary>,
492 ) -> SuiResult {
493 let observed_seq = checkpoint.as_ref().map(|c| c.sequence_number);
494
495 if let (Some(e), Some(o)) = (expected_seq, observed_seq) {
496 fp_ensure!(
497 e == o,
498 SuiError::from("Expected checkpoint number doesn't match with returned")
499 );
500 }
501 Ok(())
502 }
503
504 fn verify_contents_exist<T, O>(
505 &self,
506 request_content: bool,
507 checkpoint: &Option<T>,
508 contents: &Option<O>,
509 ) -> SuiResult {
510 match (request_content, checkpoint, contents) {
511 (true, Some(_), None) | (false, _, Some(_)) | (_, None, Some(_)) => Err(
516 SuiError::from("Checkpoint contents inconsistent with request"),
517 ),
518 _ => Ok(()),
519 }
520 }
521
522 fn verify_checkpoint_response(
523 &self,
524 request: &CheckpointRequest,
525 response: &CheckpointResponse,
526 ) -> SuiResult {
527 let CheckpointResponse {
529 checkpoint,
530 contents,
531 } = &response;
532 self.verify_checkpoint_sequence(request.sequence_number, checkpoint)?;
534 self.verify_contents_exist(request.request_content, checkpoint, contents)?;
535 match checkpoint {
537 Some(c) => {
538 let epoch_id = c.epoch;
539 c.verify_with_contents(&*self.get_committee(&epoch_id)?, contents.as_ref())
540 }
541 None => Ok(()),
542 }
543 }
544
545 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
546 pub async fn handle_checkpoint(
547 &self,
548 request: CheckpointRequest,
549 ) -> Result<CheckpointResponse, SuiError> {
550 let resp = self
551 .authority_client
552 .handle_checkpoint(request.clone())
553 .await?;
554 self.verify_checkpoint_response(&request, &resp)
555 .tap_err(|err| {
556 error!(?err, authority=?self.address, "Client error in handle_checkpoint");
557 })?;
558 Ok(resp)
559 }
560
561 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
562 pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
563 self.authority_client
564 .handle_system_state_object(SystemStateRequest { _unused: false })
565 .await
566 }
567
568 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
570 pub async fn validator_health(
571 &self,
572 request: ValidatorHealthRequest,
573 ) -> Result<ValidatorHealthResponse, SuiError> {
574 self.authority_client.validator_health(request).await
575 }
576}