1use std::net::IpAddr;
5use std::ops::Deref;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11use consensus_core::BlockStatus;
12use futures::FutureExt;
13use futures::StreamExt;
14use futures::future::{self, Either, select};
15use futures::stream::FuturesUnordered;
16use mysten_common::debug_fatal;
17use mysten_metrics::{
18 GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
19};
20use parking_lot::RwLockReadGuard;
21use prometheus::Histogram;
22use prometheus::HistogramVec;
23use prometheus::IntCounter;
24use prometheus::IntCounterVec;
25use prometheus::IntGauge;
26use prometheus::IntGaugeVec;
27use prometheus::Registry;
28use prometheus::{
29 register_histogram_vec_with_registry, register_histogram_with_registry,
30 register_int_counter_vec_with_registry, register_int_counter_with_registry,
31 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
32};
33use sui_types::base_types::AuthorityName;
34use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
35use sui_types::fp_ensure;
36use sui_types::messages_consensus::ConsensusPosition;
37use sui_types::messages_consensus::ConsensusTransactionKind;
38use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
39use sui_types::transaction::TransactionDataAPI;
40use tokio::sync::{Notify, Semaphore, SemaphorePermit, oneshot};
41use tokio::task::JoinHandle;
42use tokio::time::Duration;
43use tokio::time::{self};
44use tracing::{Instrument, debug, debug_span, info, instrument, trace, warn};
45
46use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
47use crate::checkpoints::CheckpointStore;
48use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
49use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
50
51#[cfg(test)]
52#[path = "unit_tests/consensus_tests.rs"]
53pub mod consensus_tests;
54
55pub struct ConsensusAdapterMetrics {
56 pub sequencing_certificate_attempt: IntCounterVec,
58 pub sequencing_certificate_success: IntCounterVec,
59 pub sequencing_certificate_failures: IntCounterVec,
60 pub sequencing_certificate_status: IntCounterVec,
61 pub sequencing_certificate_inflight: IntGaugeVec,
62 pub sequencing_acknowledge_latency: HistogramVec,
63 pub sequencing_certificate_latency: HistogramVec,
64 pub sequencing_certificate_processed: IntCounterVec,
65 pub sequencing_in_flight_semaphore_wait: IntGauge,
66 pub sequencing_in_flight_submissions: IntGauge,
67 pub sequencing_best_effort_timeout: IntCounterVec,
68 pub consensus_latency: Histogram,
69 pub num_rejected_cert_in_epoch_boundary: IntCounter,
70}
71
72impl ConsensusAdapterMetrics {
73 pub fn new(registry: &Registry) -> Self {
74 Self {
75 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
76 "sequencing_certificate_attempt",
77 "Counts the number of certificates the validator attempts to sequence.",
78 &["tx_type"],
79 registry,
80 )
81 .unwrap(),
82 sequencing_certificate_success: register_int_counter_vec_with_registry!(
83 "sequencing_certificate_success",
84 "Counts the number of successfully sequenced certificates.",
85 &["tx_type"],
86 registry,
87 )
88 .unwrap(),
89 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
90 "sequencing_certificate_failures",
91 "Counts the number of sequenced certificates that failed other than by timeout.",
92 &["tx_type"],
93 registry,
94 )
95 .unwrap(),
96 sequencing_certificate_status: register_int_counter_vec_with_registry!(
97 "sequencing_certificate_status",
98 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
99 &["tx_type", "status"],
100 registry,
101 )
102 .unwrap(),
103 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
104 "sequencing_certificate_inflight",
105 "The inflight requests to sequence certificates.",
106 &["tx_type"],
107 registry,
108 )
109 .unwrap(),
110 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
111 "sequencing_acknowledge_latency",
112 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
113 &["retry", "tx_type"],
114 LATENCY_SEC_BUCKETS.to_vec(),
115 registry,
116 ).unwrap(),
117 sequencing_certificate_latency: register_histogram_vec_with_registry!(
118 "sequencing_certificate_latency",
119 "The latency for sequencing a certificate.",
120 &["submitted", "tx_type", "processed_method"],
121 LATENCY_SEC_BUCKETS.to_vec(),
122 registry,
123 ).unwrap(),
124 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
125 "sequencing_certificate_processed",
126 "The number of certificates that have been processed either by consensus or checkpoint.",
127 &["source"],
128 registry
129 ).unwrap(),
130 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
131 "sequencing_in_flight_semaphore_wait",
132 "How many requests are blocked on submit_permit.",
133 registry,
134 )
135 .unwrap(),
136 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
137 "sequencing_in_flight_submissions",
138 "Number of transactions submitted to local consensus instance and not yet sequenced",
139 registry,
140 )
141 .unwrap(),
142 sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
143 "sequencing_best_effort_timeout",
144 "The number of times the best effort submission has timed out.",
145 &["tx_type"],
146 registry,
147 ).unwrap(),
148 consensus_latency: register_histogram_with_registry!(
151 "validator_service_consensus_latency",
152 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
153 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
154 registry,
155 ).unwrap(),
156 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
157 "validator_service_num_rejected_cert_in_epoch_boundary",
158 "Number of rejected transaction certificate during epoch transitioning",
159 registry,
160 ).unwrap(),
161 }
162 }
163
164 pub fn new_test() -> Self {
165 Self::new(&Registry::default())
166 }
167}
168
169pub trait ConsensusOverloadChecker: Sync + Send + 'static {
171 fn check_consensus_overload(&self) -> SuiResult;
172}
173
174pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
175
176#[mockall::automock]
177pub trait SubmitToConsensus: Sync + Send + 'static {
178 fn submit_to_consensus(
179 &self,
180 transactions: &[ConsensusTransaction],
181 epoch_store: &Arc<AuthorityPerEpochStore>,
182 ) -> SuiResult;
183
184 fn submit_best_effort(
185 &self,
186 transaction: &ConsensusTransaction,
187 epoch_store: &Arc<AuthorityPerEpochStore>,
188 timeout: Duration,
189 ) -> SuiResult;
190}
191
192#[mockall::automock]
193#[async_trait::async_trait]
194pub trait ConsensusClient: Sync + Send + 'static {
195 async fn submit(
196 &self,
197 transactions: &[ConsensusTransaction],
198 epoch_store: &Arc<AuthorityPerEpochStore>,
199 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
200}
201
202pub struct ConsensusAdapter {
204 consensus_client: Arc<dyn ConsensusClient>,
206 checkpoint_store: Arc<CheckpointStore>,
208 authority: AuthorityName,
210 max_pending_transactions: usize,
212 num_inflight_transactions: AtomicU64,
214 metrics: ConsensusAdapterMetrics,
216 submit_semaphore: Arc<Semaphore>,
218 inflight_slot_freed_notify: Arc<Notify>,
222}
223
224impl ConsensusAdapter {
225 pub fn new(
227 consensus_client: Arc<dyn ConsensusClient>,
228 checkpoint_store: Arc<CheckpointStore>,
229 authority: AuthorityName,
230 max_pending_transactions: usize,
231 max_pending_local_submissions: usize,
232 metrics: ConsensusAdapterMetrics,
233 inflight_slot_freed_notify: Arc<Notify>,
234 ) -> Self {
235 let num_inflight_transactions = Default::default();
236 Self {
237 consensus_client,
238 checkpoint_store,
239 authority,
240 max_pending_transactions,
241 num_inflight_transactions,
242 metrics,
243 submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
244 inflight_slot_freed_notify,
245 }
246 }
247
248 pub fn num_inflight_transactions(&self) -> u64 {
250 self.num_inflight_transactions.load(Ordering::Relaxed)
251 }
252
253 pub fn max_pending_transactions(&self) -> usize {
255 self.max_pending_transactions
256 }
257
258 pub async fn submit_and_get_positions(
261 self: &Arc<Self>,
262 consensus_transactions: Vec<ConsensusTransaction>,
263 epoch_store: &Arc<AuthorityPerEpochStore>,
264 submitter_client_addr: Option<IpAddr>,
265 ) -> Result<Vec<ConsensusPosition>, SuiError> {
266 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
267
268 {
269 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
271 if !reconfiguration_lock.should_accept_user_certs() {
272 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
273 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
274 }
275
276 let _metrics_guard = self.metrics.consensus_latency.start_timer();
280
281 self.submit_batch(
282 &consensus_transactions,
283 Some(&reconfiguration_lock),
284 epoch_store,
285 Some(tx_consensus_positions),
286 submitter_client_addr,
287 )?;
288 }
289
290 rx_consensus_positions.await.map_err(|e| {
291 SuiErrorKind::FailedToSubmitToConsensus(format!(
292 "Failed to get consensus position: {e}"
293 ))
294 .into()
295 })
296 }
297
298 pub fn recover_end_of_publish(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
299 if epoch_store.should_send_end_of_publish() {
302 let transaction = ConsensusTransaction::new_end_of_publish(self.authority);
303 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
304 self.submit_unchecked(&[transaction], epoch_store, None, None);
305 }
306 }
307
308 pub fn submit(
316 self: &Arc<Self>,
317 transaction: ConsensusTransaction,
318 lock: Option<&RwLockReadGuard<ReconfigState>>,
319 epoch_store: &Arc<AuthorityPerEpochStore>,
320 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
321 submitter_client_addr: Option<IpAddr>,
322 ) -> SuiResult<JoinHandle<()>> {
323 self.submit_batch(
324 &[transaction],
325 lock,
326 epoch_store,
327 tx_consensus_position,
328 submitter_client_addr,
329 )
330 }
331
332 pub fn submit_batch(
335 self: &Arc<Self>,
336 transactions: &[ConsensusTransaction],
337 _lock: Option<&RwLockReadGuard<ReconfigState>>,
338 epoch_store: &Arc<AuthorityPerEpochStore>,
339 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
340 submitter_client_addr: Option<IpAddr>,
341 ) -> SuiResult<JoinHandle<()>> {
342 if transactions.len() > 1 {
343 for transaction in transactions {
345 fp_ensure!(
346 transaction.is_user_transaction(),
347 SuiErrorKind::InvalidTxKindInSoftBundle.into()
348 );
349 }
350 }
351
352 Ok(self.submit_unchecked(
353 transactions,
354 epoch_store,
355 tx_consensus_position,
356 submitter_client_addr,
357 ))
358 }
359
360 fn check_limits(&self) -> bool {
363 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
365 > self.max_pending_transactions
366 {
367 return false;
368 }
369 self.submit_semaphore.available_permits() > 0
371 }
372
373 fn submit_unchecked(
374 self: &Arc<Self>,
375 transactions: &[ConsensusTransaction],
376 epoch_store: &Arc<AuthorityPerEpochStore>,
377 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
378 submitter_client_addr: Option<IpAddr>,
379 ) -> JoinHandle<()> {
380 let async_stage = self
382 .clone()
383 .submit_and_wait(
384 transactions.to_vec(),
385 epoch_store.clone(),
386 tx_consensus_position,
387 submitter_client_addr,
388 )
389 .in_current_span();
390 let join_handle = spawn_monitored_task!(async_stage);
393 join_handle
394 }
395
396 async fn submit_and_wait(
397 self: Arc<Self>,
398 transactions: Vec<ConsensusTransaction>,
399 epoch_store: Arc<AuthorityPerEpochStore>,
400 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
401 submitter_client_addr: Option<IpAddr>,
402 ) {
403 epoch_store
417 .within_alive_epoch(self.submit_and_wait_inner(
418 transactions,
419 &epoch_store,
420 tx_consensus_position,
421 submitter_client_addr,
422 ))
423 .await
424 .ok(); }
426
427 #[allow(clippy::option_map_unit_fn)]
428 #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
429 async fn submit_and_wait_inner(
430 self: Arc<Self>,
431 transactions: Vec<ConsensusTransaction>,
432 epoch_store: &Arc<AuthorityPerEpochStore>,
433 mut tx_consensus_positions: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
434 submitter_client_addr: Option<IpAddr>,
435 ) {
436 if transactions.is_empty() {
437 debug!(
440 "Performing a ping check, pinging consensus to get a consensus position in next block"
441 );
442 let (consensus_positions, _status_waiter) = self
443 .submit_inner(&transactions, epoch_store, &[], "ping")
444 .await;
445
446 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
447 let _ = tx_consensus_positions.send(consensus_positions);
448 } else {
449 debug_fatal!("Ping check must have a consensus position channel");
450 }
451 return;
452 }
453
454 for transaction in &transactions {
456 if let Some(tx) = transaction.kind.as_user_transaction() {
457 let amplification_factor = (tx.data().transaction_data().gas_price()
458 / epoch_store.reference_gas_price().max(1))
459 .max(1);
460 epoch_store.submitted_transaction_cache.record_submitted_tx(
461 tx.digest(),
462 amplification_factor as u32,
463 submitter_client_addr,
464 );
465 }
466 }
467
468 let skip_processed_checks = tx_consensus_positions.is_some();
472
473 let is_soft_bundle = transactions.len() > 1;
478
479 let mut transaction_keys = Vec::new();
480 let mut tx_consensus_positions = tx_consensus_positions;
481
482 for transaction in &transactions {
483 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
484 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
485 epoch_store.record_epoch_pending_certs_process_time_metric();
486 }
487
488 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
489 transaction_keys.push(transaction_key);
490 }
491 let tx_type = if is_soft_bundle {
492 "soft_bundle"
493 } else {
494 classify(&transactions[0])
495 };
496 tracing::Span::current().record("tx_type", tx_type);
497 tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
498
499 let mut guard = InflightDropGuard::acquire(&self, tx_type);
500
501 let already_processed = if skip_processed_checks {
506 None
507 } else {
508 self.check_processed_via_consensus_or_checkpoint(&transaction_keys, epoch_store)
509 };
510 if let Some(method) = &already_processed {
511 guard.processed_method = *method;
512 }
513
514 let _monitor = if matches!(
516 transactions[0].kind,
517 ConsensusTransactionKind::EndOfPublish(_)
518 | ConsensusTransactionKind::CapabilityNotification(_)
519 | ConsensusTransactionKind::CapabilityNotificationV2(_)
520 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
521 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
522 ) {
523 assert!(
524 !is_soft_bundle,
525 "System transactions should have been submitted individually"
526 );
527 let transaction_keys = transaction_keys.clone();
528 Some(CancelOnDrop(spawn_monitored_task!(async {
529 let mut i = 0u64;
530 loop {
531 i += 1;
532 const WARN_DELAY_S: u64 = 30;
533 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
534 let total_wait = i * WARN_DELAY_S;
535 warn!(
536 "Still waiting {} seconds for transactions {:?} to commit in consensus",
537 total_wait, transaction_keys
538 );
539 }
540 })))
541 } else {
542 None
543 };
544
545 if already_processed.is_none() {
546 debug!("Submitting {:?} to consensus", transaction_keys);
547 guard.submitted = true;
548
549 let _permit: SemaphorePermit = self
550 .submit_semaphore
551 .acquire()
552 .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
553 .await
554 .expect("Consensus adapter does not close semaphore");
555 let _in_flight_submission_guard =
556 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
557
558 let submit_fut = async {
561 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
562
563 loop {
564 let (consensus_positions, status_waiter) = self
566 .submit_inner(&transactions, epoch_store, &transaction_keys, tx_type)
567 .await;
568
569 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
570 tracing::Span::current().record(
571 "consensus_positions",
572 tracing::field::debug(&consensus_positions),
573 );
574 let _ = tx_consensus_positions.send(consensus_positions);
580 }
581
582 match status_waiter.await {
583 Ok(status @ BlockStatus::Sequenced(_)) => {
584 tracing::Span::current()
585 .record("status", tracing::field::debug(&status));
586 self.metrics
587 .sequencing_certificate_status
588 .with_label_values(&[tx_type, "sequenced"])
589 .inc();
590 trace!(
592 "Transaction {transaction_keys:?} has been sequenced by consensus."
593 );
594 break;
595 }
596 Ok(status @ BlockStatus::GarbageCollected(_)) => {
597 tracing::Span::current()
598 .record("status", tracing::field::debug(&status));
599 self.metrics
600 .sequencing_certificate_status
601 .with_label_values(&[tx_type, "garbage_collected"])
602 .inc();
603 debug!(
607 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
608 );
609 time::sleep(RETRY_DELAY_STEP).await;
610 continue;
611 }
612 Err(err) => {
613 warn!(
614 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
615 err
616 );
617 time::sleep(RETRY_DELAY_STEP).await;
618 continue;
619 }
620 }
621 }
622 };
623
624 guard.processed_method = if skip_processed_checks {
625 submit_fut.await;
627 ProcessedMethod::Consensus
628 } else {
629 let processed_waiter = self
635 .processed_notify(transaction_keys.clone(), epoch_store)
636 .boxed();
637 match select(processed_waiter, submit_fut.boxed()).await {
638 Either::Left((observed, _submit_fut)) => observed,
639 Either::Right(((), processed_waiter)) => {
640 debug!("Submitted {transaction_keys:?} to consensus");
641 processed_waiter.await
642 }
643 }
644 };
645 }
646 debug!("{transaction_keys:?} processed by consensus");
647
648 if transactions[0].is_user_transaction()
652 && epoch_store.should_send_end_of_publish()
653 && !epoch_store.protocol_config().timestamp_based_epoch_close()
654 {
655 if let Err(err) = self.submit(
657 ConsensusTransaction::new_end_of_publish(self.authority),
658 None,
659 epoch_store,
660 None,
661 None,
662 ) {
663 warn!("Error when sending end of publish message: {:?}", err);
664 } else {
665 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
666 }
667 }
668 self.metrics
669 .sequencing_certificate_success
670 .with_label_values(&[tx_type])
671 .inc();
672 }
673
674 #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
675 async fn submit_inner(
676 self: &Arc<Self>,
677 transactions: &[ConsensusTransaction],
678 epoch_store: &Arc<AuthorityPerEpochStore>,
679 transaction_keys: &[SequencedConsensusTransactionKey],
680 tx_type: &str,
681 ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
682 let ack_start = Instant::now();
683 let mut retries: u32 = 0;
684 let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
685 Duration::from_millis(100),
686 Duration::from_secs(10),
687 );
688
689 let (consensus_positions, status_waiter) = loop {
690 let span = debug_span!("client_submit");
691 match self
692 .consensus_client
693 .submit(transactions, epoch_store)
694 .instrument(span)
695 .await
696 {
697 Err(err) => {
698 if cfg!(msim) || retries > 3 {
700 warn!(
701 "Failed to submit transactions {transaction_keys:?} to consensus: {err}. Retry #{retries}"
702 );
703 }
704 self.metrics
705 .sequencing_certificate_failures
706 .with_label_values(&[tx_type])
707 .inc();
708 retries += 1;
709
710 time::sleep(backoff.next().unwrap()).await;
711 }
712 Ok((consensus_positions, status_waiter)) => {
713 break (consensus_positions, status_waiter);
714 }
715 }
716 };
717
718 let bucket = match retries {
722 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
724 21..=50 => "between_20_and_50".to_string(),
725 51..=100 => "between_50_and_100".to_string(),
726 _ => "over_100".to_string(),
727 };
728
729 self.metrics
730 .sequencing_acknowledge_latency
731 .with_label_values(&[bucket.as_str(), tx_type])
732 .observe(ack_start.elapsed().as_secs_f64());
733
734 (consensus_positions, status_waiter)
735 }
736
737 fn check_processed_via_consensus_or_checkpoint(
746 self: &Arc<Self>,
747 transaction_keys: &[SequencedConsensusTransactionKey],
748 epoch_store: &Arc<AuthorityPerEpochStore>,
749 ) -> Option<ProcessedMethod> {
750 let mut seen_checkpoint = false;
751 for transaction_key in transaction_keys {
752 if epoch_store
755 .is_consensus_message_processed(transaction_key)
756 .expect("Storage error when checking consensus message processed")
757 {
758 self.metrics
759 .sequencing_certificate_processed
760 .with_label_values(&["consensus"])
761 .inc();
762 continue;
763 }
764
765 if let SequencedConsensusTransactionKey::External(ConsensusTransactionKey::Certificate(
768 digest,
769 )) = transaction_key
770 && epoch_store
771 .is_transaction_executed_in_checkpoint(digest)
772 .expect("Storage error when checking transaction executed in checkpoint")
773 {
774 self.metrics
775 .sequencing_certificate_processed
776 .with_label_values(&["checkpoint"])
777 .inc();
778 seen_checkpoint = true;
779 continue;
780 }
781
782 if let SequencedConsensusTransactionKey::External(
786 ConsensusTransactionKey::CheckpointSignature(_, seq)
787 | ConsensusTransactionKey::CheckpointSignatureV2(_, seq, _),
788 ) = transaction_key
789 && let Some(synced_seq) = self
790 .checkpoint_store
791 .get_highest_synced_checkpoint_seq_number()
792 .expect("Storage error when reading highest synced checkpoint")
793 && synced_seq >= *seq
794 {
795 self.metrics
796 .sequencing_certificate_processed
797 .with_label_values(&["synced_checkpoint"])
798 .inc();
799 seen_checkpoint = true;
800 continue;
801 }
802
803 return None;
805 }
806
807 if seen_checkpoint {
808 Some(ProcessedMethod::Checkpoint)
809 } else {
810 Some(ProcessedMethod::Consensus)
811 }
812 }
813
814 async fn processed_notify(
821 self: &Arc<Self>,
822 transaction_keys: Vec<SequencedConsensusTransactionKey>,
823 epoch_store: &Arc<AuthorityPerEpochStore>,
824 ) -> ProcessedMethod {
825 let notifications = FuturesUnordered::new();
826 for transaction_key in transaction_keys {
827 let transaction_digests = match transaction_key {
828 SequencedConsensusTransactionKey::External(
829 ConsensusTransactionKey::Certificate(digest),
830 ) => vec![digest],
831 _ => vec![],
832 };
833
834 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
835 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
836 | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
837 ) = transaction_key
838 {
839 Either::Left(
842 self.checkpoint_store
843 .notify_read_synced_checkpoint(checkpoint_sequence_number),
844 )
845 } else {
846 Either::Right(future::pending())
847 };
848
849 notifications.push(async move {
853 tokio::select! {
854 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
855 processed.expect("Storage error when waiting for consensus message processed");
856 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
857 return ProcessedMethod::Consensus;
858 },
859 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
860 processed.expect("Storage error when waiting for transaction executed in checkpoint");
861 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
862 }
863 _ = checkpoint_synced_future => {
864 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
865 }
866 }
867 ProcessedMethod::Checkpoint
868 });
869 }
870
871 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
872 for method in processed_methods {
873 if method == ProcessedMethod::Checkpoint {
874 return ProcessedMethod::Checkpoint;
875 }
876 }
877 ProcessedMethod::Consensus
878 }
879}
880
881impl ConsensusOverloadChecker for ConsensusAdapter {
882 fn check_consensus_overload(&self) -> SuiResult {
883 fp_ensure!(
884 self.check_limits(),
885 SuiErrorKind::TooManyTransactionsPendingConsensus.into()
886 );
887 Ok(())
888 }
889}
890
891pub struct NoopConsensusOverloadChecker {}
892
893impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
894 fn check_consensus_overload(&self) -> SuiResult {
895 Ok(())
896 }
897}
898
899impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
900 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
904 {
905 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
906 if !reconfig_guard.should_accept_user_certs() {
907 return;
909 }
910 epoch_store.close_user_certs(reconfig_guard);
911 }
912 if epoch_store.should_send_end_of_publish() {
913 if let Err(err) = self.submit(
914 ConsensusTransaction::new_end_of_publish(self.authority),
915 None,
916 epoch_store,
917 None,
918 None,
919 ) {
920 warn!("Error when sending end of publish message: {:?}", err);
921 } else {
922 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
923 }
924 }
925 }
926}
927
928struct CancelOnDrop<T>(JoinHandle<T>);
929
930impl<T> Deref for CancelOnDrop<T> {
931 type Target = JoinHandle<T>;
932
933 fn deref(&self) -> &Self::Target {
934 &self.0
935 }
936}
937
938impl<T> Drop for CancelOnDrop<T> {
939 fn drop(&mut self) {
940 self.0.abort();
941 }
942}
943
944struct InflightDropGuard<'a> {
946 adapter: &'a ConsensusAdapter,
947 start: Instant,
948 submitted: bool,
949 tx_type: &'static str,
950 processed_method: ProcessedMethod,
951}
952
953#[derive(Copy, Clone, PartialEq, Eq)]
954enum ProcessedMethod {
955 Consensus,
956 Checkpoint,
957}
958
959impl<'a> InflightDropGuard<'a> {
960 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
961 adapter
962 .num_inflight_transactions
963 .fetch_add(1, Ordering::SeqCst);
964 adapter
965 .metrics
966 .sequencing_certificate_inflight
967 .with_label_values(&[tx_type])
968 .inc();
969 adapter
970 .metrics
971 .sequencing_certificate_attempt
972 .with_label_values(&[tx_type])
973 .inc();
974 Self {
975 adapter,
976 start: Instant::now(),
977 submitted: false,
978 tx_type,
979 processed_method: ProcessedMethod::Consensus,
980 }
981 }
982}
983
984impl Drop for InflightDropGuard<'_> {
985 fn drop(&mut self) {
986 self.adapter
987 .num_inflight_transactions
988 .fetch_sub(1, Ordering::SeqCst);
989 self.adapter
990 .metrics
991 .sequencing_certificate_inflight
992 .with_label_values(&[self.tx_type])
993 .dec();
994 self.adapter.inflight_slot_freed_notify.notify_one();
996
997 let latency = self.start.elapsed();
998 let processed_method = match self.processed_method {
999 ProcessedMethod::Consensus => "processed_via_consensus",
1000 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1001 };
1002 let submitted = if self.submitted {
1003 "submitted"
1004 } else {
1005 "skipped"
1006 };
1007
1008 self.adapter
1009 .metrics
1010 .sequencing_certificate_latency
1011 .with_label_values(&[submitted, self.tx_type, processed_method])
1012 .observe(latency.as_secs_f64());
1013 }
1014}
1015
1016impl SubmitToConsensus for Arc<ConsensusAdapter> {
1017 fn submit_to_consensus(
1018 &self,
1019 transactions: &[ConsensusTransaction],
1020 epoch_store: &Arc<AuthorityPerEpochStore>,
1021 ) -> SuiResult {
1022 self.submit_batch(transactions, None, epoch_store, None, None)
1023 .map(|_| ())
1024 }
1025
1026 fn submit_best_effort(
1027 &self,
1028 transaction: &ConsensusTransaction,
1029 epoch_store: &Arc<AuthorityPerEpochStore>,
1030 timeout: Duration,
1032 ) -> SuiResult {
1033 let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1034 Ok(permit) => permit,
1035 Err(_) => {
1036 return Err(SuiErrorKind::TooManyTransactionsPendingConsensus.into());
1037 }
1038 };
1039
1040 let _in_flight_submission_guard =
1041 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1042
1043 let key = SequencedConsensusTransactionKey::External(transaction.key());
1044 let tx_type = classify(transaction);
1045
1046 let async_stage = {
1047 let transaction = transaction.clone();
1048 let epoch_store = epoch_store.clone();
1049 let this = self.clone();
1050
1051 async move {
1052 let _permit = permit; let result = tokio::time::timeout(
1055 timeout,
1056 this.submit_inner(&[transaction], &epoch_store, &[key], tx_type),
1057 )
1058 .await;
1059
1060 if let Err(e) = result {
1061 warn!("Consensus submission timed out: {e:?}");
1062 this.metrics
1063 .sequencing_best_effort_timeout
1064 .with_label_values(&[tx_type])
1065 .inc();
1066 }
1067 }
1068 };
1069
1070 let epoch_store = epoch_store.clone();
1071 spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1072 Ok(())
1073 }
1074}