1use std::net::IpAddr;
5use std::ops::Deref;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11use consensus_core::BlockStatus;
12use futures::FutureExt;
13use futures::StreamExt;
14use futures::future::{self, Either, select};
15use futures::stream::FuturesUnordered;
16use mysten_common::debug_fatal;
17use mysten_metrics::{
18 GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
19};
20use parking_lot::RwLockReadGuard;
21use prometheus::Histogram;
22use prometheus::HistogramVec;
23use prometheus::IntCounter;
24use prometheus::IntCounterVec;
25use prometheus::IntGauge;
26use prometheus::IntGaugeVec;
27use prometheus::Registry;
28use prometheus::{
29 register_histogram_vec_with_registry, register_histogram_with_registry,
30 register_int_counter_vec_with_registry, register_int_counter_with_registry,
31 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
32};
33use sui_types::base_types::AuthorityName;
34use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
35use sui_types::fp_ensure;
36use sui_types::messages_consensus::ConsensusPosition;
37use sui_types::messages_consensus::ConsensusTransactionKind;
38use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
39use sui_types::transaction::TransactionDataAPI;
40use tokio::sync::{Notify, Semaphore, SemaphorePermit, oneshot};
41use tokio::task::JoinHandle;
42use tokio::time::Duration;
43use tokio::time::{self};
44use tracing::{Instrument, debug, debug_span, info, instrument, trace, warn};
45
46use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
47use crate::checkpoints::CheckpointStore;
48use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
49use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
50
51#[cfg(test)]
52#[path = "unit_tests/consensus_tests.rs"]
53pub mod consensus_tests;
54
55pub struct ConsensusAdapterMetrics {
56 pub sequencing_certificate_attempt: IntCounterVec,
58 pub sequencing_certificate_success: IntCounterVec,
59 pub sequencing_certificate_failures: IntCounterVec,
60 pub sequencing_certificate_status: IntCounterVec,
61 pub sequencing_certificate_inflight: IntGaugeVec,
62 pub sequencing_acknowledge_latency: HistogramVec,
63 pub sequencing_certificate_latency: HistogramVec,
64 pub sequencing_certificate_processed: IntCounterVec,
65 pub sequencing_in_flight_semaphore_wait: IntGauge,
66 pub sequencing_in_flight_submissions: IntGauge,
67 pub sequencing_best_effort_timeout: IntCounterVec,
68 pub consensus_latency: Histogram,
69 pub num_rejected_cert_in_epoch_boundary: IntCounter,
70}
71
72impl ConsensusAdapterMetrics {
73 pub fn new(registry: &Registry) -> Self {
74 Self {
75 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
76 "sequencing_certificate_attempt",
77 "Counts the number of certificates the validator attempts to sequence.",
78 &["tx_type"],
79 registry,
80 )
81 .unwrap(),
82 sequencing_certificate_success: register_int_counter_vec_with_registry!(
83 "sequencing_certificate_success",
84 "Counts the number of successfully sequenced certificates.",
85 &["tx_type"],
86 registry,
87 )
88 .unwrap(),
89 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
90 "sequencing_certificate_failures",
91 "Counts the number of sequenced certificates that failed other than by timeout.",
92 &["tx_type"],
93 registry,
94 )
95 .unwrap(),
96 sequencing_certificate_status: register_int_counter_vec_with_registry!(
97 "sequencing_certificate_status",
98 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
99 &["tx_type", "status"],
100 registry,
101 )
102 .unwrap(),
103 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
104 "sequencing_certificate_inflight",
105 "The inflight requests to sequence certificates.",
106 &["tx_type"],
107 registry,
108 )
109 .unwrap(),
110 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
111 "sequencing_acknowledge_latency",
112 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
113 &["retry", "tx_type"],
114 LATENCY_SEC_BUCKETS.to_vec(),
115 registry,
116 ).unwrap(),
117 sequencing_certificate_latency: register_histogram_vec_with_registry!(
118 "sequencing_certificate_latency",
119 "The latency for sequencing a certificate.",
120 &["submitted", "tx_type", "processed_method"],
121 LATENCY_SEC_BUCKETS.to_vec(),
122 registry,
123 ).unwrap(),
124 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
125 "sequencing_certificate_processed",
126 "The number of certificates that have been processed either by consensus or checkpoint.",
127 &["source"],
128 registry
129 ).unwrap(),
130 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
131 "sequencing_in_flight_semaphore_wait",
132 "How many requests are blocked on submit_permit.",
133 registry,
134 )
135 .unwrap(),
136 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
137 "sequencing_in_flight_submissions",
138 "Number of transactions submitted to local consensus instance and not yet sequenced",
139 registry,
140 )
141 .unwrap(),
142 sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
143 "sequencing_best_effort_timeout",
144 "The number of times the best effort submission has timed out.",
145 &["tx_type"],
146 registry,
147 ).unwrap(),
148 consensus_latency: register_histogram_with_registry!(
151 "validator_service_consensus_latency",
152 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
153 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
154 registry,
155 ).unwrap(),
156 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
157 "validator_service_num_rejected_cert_in_epoch_boundary",
158 "Number of rejected transaction certificate during epoch transitioning",
159 registry,
160 ).unwrap(),
161 }
162 }
163
164 pub fn new_test() -> Self {
165 Self::new(&Registry::default())
166 }
167}
168
169pub trait ConsensusOverloadChecker: Sync + Send + 'static {
171 fn check_consensus_overload(&self) -> SuiResult;
172}
173
174pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
175
176#[mockall::automock]
177pub trait SubmitToConsensus: Sync + Send + 'static {
178 fn submit_to_consensus(
179 &self,
180 transactions: &[ConsensusTransaction],
181 epoch_store: &Arc<AuthorityPerEpochStore>,
182 ) -> SuiResult;
183
184 fn submit_best_effort(
185 &self,
186 transaction: &ConsensusTransaction,
187 epoch_store: &Arc<AuthorityPerEpochStore>,
188 timeout: Duration,
189 ) -> SuiResult;
190}
191
192#[mockall::automock]
193#[async_trait::async_trait]
194pub trait ConsensusClient: Sync + Send + 'static {
195 async fn submit(
196 &self,
197 transactions: &[ConsensusTransaction],
198 epoch_store: &Arc<AuthorityPerEpochStore>,
199 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
200}
201
202pub struct ConsensusAdapter {
204 consensus_client: Arc<dyn ConsensusClient>,
206 checkpoint_store: Arc<CheckpointStore>,
208 authority: AuthorityName,
210 max_pending_transactions: usize,
212 num_inflight_transactions: AtomicU64,
214 metrics: ConsensusAdapterMetrics,
216 submit_semaphore: Arc<Semaphore>,
218 inflight_slot_freed_notify: Arc<Notify>,
222}
223
224impl ConsensusAdapter {
225 pub fn new(
227 consensus_client: Arc<dyn ConsensusClient>,
228 checkpoint_store: Arc<CheckpointStore>,
229 authority: AuthorityName,
230 max_pending_transactions: usize,
231 max_pending_local_submissions: usize,
232 metrics: ConsensusAdapterMetrics,
233 inflight_slot_freed_notify: Arc<Notify>,
234 ) -> Self {
235 let num_inflight_transactions = Default::default();
236 Self {
237 consensus_client,
238 checkpoint_store,
239 authority,
240 max_pending_transactions,
241 num_inflight_transactions,
242 metrics,
243 submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
244 inflight_slot_freed_notify,
245 }
246 }
247
248 pub fn num_inflight_transactions(&self) -> u64 {
250 self.num_inflight_transactions.load(Ordering::Relaxed)
251 }
252
253 pub fn max_pending_transactions(&self) -> usize {
255 self.max_pending_transactions
256 }
257
258 pub async fn submit_and_get_positions(
261 self: &Arc<Self>,
262 consensus_transactions: Vec<ConsensusTransaction>,
263 epoch_store: &Arc<AuthorityPerEpochStore>,
264 submitter_client_addr: Option<IpAddr>,
265 ) -> Result<Vec<ConsensusPosition>, SuiError> {
266 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
267
268 {
269 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
271 if !reconfiguration_lock.should_accept_user_certs() {
272 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
273 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
274 }
275
276 let _metrics_guard = self.metrics.consensus_latency.start_timer();
280
281 self.submit_batch(
282 &consensus_transactions,
283 Some(&reconfiguration_lock),
284 epoch_store,
285 Some(tx_consensus_positions),
286 submitter_client_addr,
287 )?;
288 }
289
290 rx_consensus_positions.await.map_err(|e| {
291 SuiErrorKind::FailedToSubmitToConsensus(format!(
292 "Failed to get consensus position: {e}"
293 ))
294 .into()
295 })
296 }
297
298 pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
299 if epoch_store.should_send_end_of_publish() {
303 let transaction = ConsensusTransaction::new_end_of_publish(self.authority);
304 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
305 self.submit_unchecked(&[transaction], epoch_store, None, None);
306 }
307 }
308
309 pub fn submit(
317 self: &Arc<Self>,
318 transaction: ConsensusTransaction,
319 lock: Option<&RwLockReadGuard<ReconfigState>>,
320 epoch_store: &Arc<AuthorityPerEpochStore>,
321 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
322 submitter_client_addr: Option<IpAddr>,
323 ) -> SuiResult<JoinHandle<()>> {
324 self.submit_batch(
325 &[transaction],
326 lock,
327 epoch_store,
328 tx_consensus_position,
329 submitter_client_addr,
330 )
331 }
332
333 pub fn submit_batch(
336 self: &Arc<Self>,
337 transactions: &[ConsensusTransaction],
338 _lock: Option<&RwLockReadGuard<ReconfigState>>,
339 epoch_store: &Arc<AuthorityPerEpochStore>,
340 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
341 submitter_client_addr: Option<IpAddr>,
342 ) -> SuiResult<JoinHandle<()>> {
343 if transactions.len() > 1 {
344 for transaction in transactions {
346 fp_ensure!(
347 transaction.is_user_transaction(),
348 SuiErrorKind::InvalidTxKindInSoftBundle.into()
349 );
350 }
351 }
352
353 Ok(self.submit_unchecked(
354 transactions,
355 epoch_store,
356 tx_consensus_position,
357 submitter_client_addr,
358 ))
359 }
360
361 fn check_limits(&self) -> bool {
364 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
366 > self.max_pending_transactions
367 {
368 return false;
369 }
370 self.submit_semaphore.available_permits() > 0
372 }
373
374 fn submit_unchecked(
375 self: &Arc<Self>,
376 transactions: &[ConsensusTransaction],
377 epoch_store: &Arc<AuthorityPerEpochStore>,
378 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
379 submitter_client_addr: Option<IpAddr>,
380 ) -> JoinHandle<()> {
381 let async_stage = self
383 .clone()
384 .submit_and_wait(
385 transactions.to_vec(),
386 epoch_store.clone(),
387 tx_consensus_position,
388 submitter_client_addr,
389 )
390 .in_current_span();
391 let join_handle = spawn_monitored_task!(async_stage);
394 join_handle
395 }
396
397 async fn submit_and_wait(
398 self: Arc<Self>,
399 transactions: Vec<ConsensusTransaction>,
400 epoch_store: Arc<AuthorityPerEpochStore>,
401 tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
402 submitter_client_addr: Option<IpAddr>,
403 ) {
404 epoch_store
418 .within_alive_epoch(self.submit_and_wait_inner(
419 transactions,
420 &epoch_store,
421 tx_consensus_position,
422 submitter_client_addr,
423 ))
424 .await
425 .ok(); }
427
428 #[allow(clippy::option_map_unit_fn)]
429 #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
430 async fn submit_and_wait_inner(
431 self: Arc<Self>,
432 transactions: Vec<ConsensusTransaction>,
433 epoch_store: &Arc<AuthorityPerEpochStore>,
434 mut tx_consensus_positions: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
435 submitter_client_addr: Option<IpAddr>,
436 ) {
437 if transactions.is_empty() {
438 debug!(
441 "Performing a ping check, pinging consensus to get a consensus position in next block"
442 );
443 let (consensus_positions, _status_waiter) = self
444 .submit_inner(&transactions, epoch_store, &[], "ping")
445 .await;
446
447 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
448 let _ = tx_consensus_positions.send(consensus_positions);
449 } else {
450 debug_fatal!("Ping check must have a consensus position channel");
451 }
452 return;
453 }
454
455 for transaction in &transactions {
457 if let Some(tx) = transaction.kind.as_user_transaction() {
458 let amplification_factor = (tx.data().transaction_data().gas_price()
459 / epoch_store.reference_gas_price().max(1))
460 .max(1);
461 epoch_store.submitted_transaction_cache.record_submitted_tx(
462 tx.digest(),
463 amplification_factor as u32,
464 submitter_client_addr,
465 );
466 }
467 }
468
469 let skip_processed_checks = tx_consensus_positions.is_some();
473
474 let is_soft_bundle = transactions.len() > 1;
479
480 let mut transaction_keys = Vec::new();
481 let mut tx_consensus_positions = tx_consensus_positions;
482
483 for transaction in &transactions {
484 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
485 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
486 epoch_store.record_epoch_pending_certs_process_time_metric();
487 }
488
489 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
490 transaction_keys.push(transaction_key);
491 }
492 let tx_type = if is_soft_bundle {
493 "soft_bundle"
494 } else {
495 classify(&transactions[0])
496 };
497 tracing::Span::current().record("tx_type", tx_type);
498 tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
499
500 let mut guard = InflightDropGuard::acquire(&self, tx_type);
501
502 let already_processed = if skip_processed_checks {
507 None
508 } else {
509 self.check_processed_via_consensus_or_checkpoint(&transaction_keys, epoch_store)
510 };
511 if let Some(method) = &already_processed {
512 guard.processed_method = *method;
513 }
514
515 let _monitor = if matches!(
517 transactions[0].kind,
518 ConsensusTransactionKind::EndOfPublish(_)
519 | ConsensusTransactionKind::CapabilityNotification(_)
520 | ConsensusTransactionKind::CapabilityNotificationV2(_)
521 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
522 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
523 ) {
524 assert!(
525 !is_soft_bundle,
526 "System transactions should have been submitted individually"
527 );
528 let transaction_keys = transaction_keys.clone();
529 Some(CancelOnDrop(spawn_monitored_task!(async {
530 let mut i = 0u64;
531 loop {
532 i += 1;
533 const WARN_DELAY_S: u64 = 30;
534 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
535 let total_wait = i * WARN_DELAY_S;
536 warn!(
537 "Still waiting {} seconds for transactions {:?} to commit in consensus",
538 total_wait, transaction_keys
539 );
540 }
541 })))
542 } else {
543 None
544 };
545
546 if already_processed.is_none() {
547 debug!("Submitting {:?} to consensus", transaction_keys);
548 guard.submitted = true;
549
550 let _permit: SemaphorePermit = self
551 .submit_semaphore
552 .acquire()
553 .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
554 .await
555 .expect("Consensus adapter does not close semaphore");
556 let _in_flight_submission_guard =
557 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
558
559 let submit_fut = async {
562 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
563
564 loop {
565 let (consensus_positions, status_waiter) = self
567 .submit_inner(&transactions, epoch_store, &transaction_keys, tx_type)
568 .await;
569
570 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
571 tracing::Span::current().record(
572 "consensus_positions",
573 tracing::field::debug(&consensus_positions),
574 );
575 let _ = tx_consensus_positions.send(consensus_positions);
581 }
582
583 match status_waiter.await {
584 Ok(status @ BlockStatus::Sequenced(_)) => {
585 tracing::Span::current()
586 .record("status", tracing::field::debug(&status));
587 self.metrics
588 .sequencing_certificate_status
589 .with_label_values(&[tx_type, "sequenced"])
590 .inc();
591 trace!(
593 "Transaction {transaction_keys:?} has been sequenced by consensus."
594 );
595 break;
596 }
597 Ok(status @ BlockStatus::GarbageCollected(_)) => {
598 tracing::Span::current()
599 .record("status", tracing::field::debug(&status));
600 self.metrics
601 .sequencing_certificate_status
602 .with_label_values(&[tx_type, "garbage_collected"])
603 .inc();
604 debug!(
608 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
609 );
610 time::sleep(RETRY_DELAY_STEP).await;
611 continue;
612 }
613 Err(err) => {
614 warn!(
615 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
616 err
617 );
618 time::sleep(RETRY_DELAY_STEP).await;
619 continue;
620 }
621 }
622 }
623 };
624
625 guard.processed_method = if skip_processed_checks {
626 submit_fut.await;
628 ProcessedMethod::Consensus
629 } else {
630 let processed_waiter = self
636 .processed_notify(transaction_keys.clone(), epoch_store)
637 .boxed();
638 match select(processed_waiter, submit_fut.boxed()).await {
639 Either::Left((observed, _submit_fut)) => observed,
640 Either::Right(((), processed_waiter)) => {
641 debug!("Submitted {transaction_keys:?} to consensus");
642 processed_waiter.await
643 }
644 }
645 };
646 }
647 debug!("{transaction_keys:?} processed by consensus");
648
649 if transactions[0].is_user_transaction() && epoch_store.should_send_end_of_publish() {
653 if let Err(err) = self.submit(
655 ConsensusTransaction::new_end_of_publish(self.authority),
656 None,
657 epoch_store,
658 None,
659 None,
660 ) {
661 warn!("Error when sending end of publish message: {:?}", err);
662 } else {
663 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
664 }
665 }
666 self.metrics
667 .sequencing_certificate_success
668 .with_label_values(&[tx_type])
669 .inc();
670 }
671
672 #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
673 async fn submit_inner(
674 self: &Arc<Self>,
675 transactions: &[ConsensusTransaction],
676 epoch_store: &Arc<AuthorityPerEpochStore>,
677 transaction_keys: &[SequencedConsensusTransactionKey],
678 tx_type: &str,
679 ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
680 let ack_start = Instant::now();
681 let mut retries: u32 = 0;
682 let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
683 Duration::from_millis(100),
684 Duration::from_secs(10),
685 );
686
687 let (consensus_positions, status_waiter) = loop {
688 let span = debug_span!("client_submit");
689 match self
690 .consensus_client
691 .submit(transactions, epoch_store)
692 .instrument(span)
693 .await
694 {
695 Err(err) => {
696 if cfg!(msim) || retries > 3 {
698 warn!(
699 "Failed to submit transactions {transaction_keys:?} to consensus: {err}. Retry #{retries}"
700 );
701 }
702 self.metrics
703 .sequencing_certificate_failures
704 .with_label_values(&[tx_type])
705 .inc();
706 retries += 1;
707
708 time::sleep(backoff.next().unwrap()).await;
709 }
710 Ok((consensus_positions, status_waiter)) => {
711 break (consensus_positions, status_waiter);
712 }
713 }
714 };
715
716 let bucket = match retries {
720 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
722 21..=50 => "between_20_and_50".to_string(),
723 51..=100 => "between_50_and_100".to_string(),
724 _ => "over_100".to_string(),
725 };
726
727 self.metrics
728 .sequencing_acknowledge_latency
729 .with_label_values(&[bucket.as_str(), tx_type])
730 .observe(ack_start.elapsed().as_secs_f64());
731
732 (consensus_positions, status_waiter)
733 }
734
735 fn check_processed_via_consensus_or_checkpoint(
744 self: &Arc<Self>,
745 transaction_keys: &[SequencedConsensusTransactionKey],
746 epoch_store: &Arc<AuthorityPerEpochStore>,
747 ) -> Option<ProcessedMethod> {
748 let mut seen_checkpoint = false;
749 for transaction_key in transaction_keys {
750 if epoch_store
753 .is_consensus_message_processed(transaction_key)
754 .expect("Storage error when checking consensus message processed")
755 {
756 self.metrics
757 .sequencing_certificate_processed
758 .with_label_values(&["consensus"])
759 .inc();
760 continue;
761 }
762
763 if let SequencedConsensusTransactionKey::External(ConsensusTransactionKey::Certificate(
766 digest,
767 )) = transaction_key
768 && epoch_store
769 .is_transaction_executed_in_checkpoint(digest)
770 .expect("Storage error when checking transaction executed in checkpoint")
771 {
772 self.metrics
773 .sequencing_certificate_processed
774 .with_label_values(&["checkpoint"])
775 .inc();
776 seen_checkpoint = true;
777 continue;
778 }
779
780 if let SequencedConsensusTransactionKey::External(
784 ConsensusTransactionKey::CheckpointSignature(_, seq)
785 | ConsensusTransactionKey::CheckpointSignatureV2(_, seq, _),
786 ) = transaction_key
787 && let Some(synced_seq) = self
788 .checkpoint_store
789 .get_highest_synced_checkpoint_seq_number()
790 .expect("Storage error when reading highest synced checkpoint")
791 && synced_seq >= *seq
792 {
793 self.metrics
794 .sequencing_certificate_processed
795 .with_label_values(&["synced_checkpoint"])
796 .inc();
797 seen_checkpoint = true;
798 continue;
799 }
800
801 return None;
803 }
804
805 if seen_checkpoint {
806 Some(ProcessedMethod::Checkpoint)
807 } else {
808 Some(ProcessedMethod::Consensus)
809 }
810 }
811
812 async fn processed_notify(
819 self: &Arc<Self>,
820 transaction_keys: Vec<SequencedConsensusTransactionKey>,
821 epoch_store: &Arc<AuthorityPerEpochStore>,
822 ) -> ProcessedMethod {
823 let notifications = FuturesUnordered::new();
824 for transaction_key in transaction_keys {
825 let transaction_digests = match transaction_key {
826 SequencedConsensusTransactionKey::External(
827 ConsensusTransactionKey::Certificate(digest),
828 ) => vec![digest],
829 _ => vec![],
830 };
831
832 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
833 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
834 | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
835 ) = transaction_key
836 {
837 Either::Left(
840 self.checkpoint_store
841 .notify_read_synced_checkpoint(checkpoint_sequence_number),
842 )
843 } else {
844 Either::Right(future::pending())
845 };
846
847 notifications.push(async move {
851 tokio::select! {
852 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
853 processed.expect("Storage error when waiting for consensus message processed");
854 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
855 return ProcessedMethod::Consensus;
856 },
857 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
858 processed.expect("Storage error when waiting for transaction executed in checkpoint");
859 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
860 }
861 _ = checkpoint_synced_future => {
862 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
863 }
864 }
865 ProcessedMethod::Checkpoint
866 });
867 }
868
869 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
870 for method in processed_methods {
871 if method == ProcessedMethod::Checkpoint {
872 return ProcessedMethod::Checkpoint;
873 }
874 }
875 ProcessedMethod::Consensus
876 }
877}
878
879impl ConsensusOverloadChecker for ConsensusAdapter {
880 fn check_consensus_overload(&self) -> SuiResult {
881 fp_ensure!(
882 self.check_limits(),
883 SuiErrorKind::TooManyTransactionsPendingConsensus.into()
884 );
885 Ok(())
886 }
887}
888
889pub struct NoopConsensusOverloadChecker {}
890
891impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
892 fn check_consensus_overload(&self) -> SuiResult {
893 Ok(())
894 }
895}
896
897impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
898 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
902 {
903 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
904 if !reconfig_guard.should_accept_user_certs() {
905 return;
907 }
908 epoch_store.close_user_certs(reconfig_guard);
909 }
910 if epoch_store.should_send_end_of_publish() {
911 if let Err(err) = self.submit(
912 ConsensusTransaction::new_end_of_publish(self.authority),
913 None,
914 epoch_store,
915 None,
916 None,
917 ) {
918 warn!("Error when sending end of publish message: {:?}", err);
919 } else {
920 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
921 }
922 }
923 }
924}
925
926struct CancelOnDrop<T>(JoinHandle<T>);
927
928impl<T> Deref for CancelOnDrop<T> {
929 type Target = JoinHandle<T>;
930
931 fn deref(&self) -> &Self::Target {
932 &self.0
933 }
934}
935
936impl<T> Drop for CancelOnDrop<T> {
937 fn drop(&mut self) {
938 self.0.abort();
939 }
940}
941
942struct InflightDropGuard<'a> {
944 adapter: &'a ConsensusAdapter,
945 start: Instant,
946 submitted: bool,
947 tx_type: &'static str,
948 processed_method: ProcessedMethod,
949}
950
951#[derive(Copy, Clone, PartialEq, Eq)]
952enum ProcessedMethod {
953 Consensus,
954 Checkpoint,
955}
956
957impl<'a> InflightDropGuard<'a> {
958 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
959 adapter
960 .num_inflight_transactions
961 .fetch_add(1, Ordering::SeqCst);
962 adapter
963 .metrics
964 .sequencing_certificate_inflight
965 .with_label_values(&[tx_type])
966 .inc();
967 adapter
968 .metrics
969 .sequencing_certificate_attempt
970 .with_label_values(&[tx_type])
971 .inc();
972 Self {
973 adapter,
974 start: Instant::now(),
975 submitted: false,
976 tx_type,
977 processed_method: ProcessedMethod::Consensus,
978 }
979 }
980}
981
982impl Drop for InflightDropGuard<'_> {
983 fn drop(&mut self) {
984 self.adapter
985 .num_inflight_transactions
986 .fetch_sub(1, Ordering::SeqCst);
987 self.adapter
988 .metrics
989 .sequencing_certificate_inflight
990 .with_label_values(&[self.tx_type])
991 .dec();
992 self.adapter.inflight_slot_freed_notify.notify_one();
994
995 let latency = self.start.elapsed();
996 let processed_method = match self.processed_method {
997 ProcessedMethod::Consensus => "processed_via_consensus",
998 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
999 };
1000 let submitted = if self.submitted {
1001 "submitted"
1002 } else {
1003 "skipped"
1004 };
1005
1006 self.adapter
1007 .metrics
1008 .sequencing_certificate_latency
1009 .with_label_values(&[submitted, self.tx_type, processed_method])
1010 .observe(latency.as_secs_f64());
1011 }
1012}
1013
1014impl SubmitToConsensus for Arc<ConsensusAdapter> {
1015 fn submit_to_consensus(
1016 &self,
1017 transactions: &[ConsensusTransaction],
1018 epoch_store: &Arc<AuthorityPerEpochStore>,
1019 ) -> SuiResult {
1020 self.submit_batch(transactions, None, epoch_store, None, None)
1021 .map(|_| ())
1022 }
1023
1024 fn submit_best_effort(
1025 &self,
1026 transaction: &ConsensusTransaction,
1027 epoch_store: &Arc<AuthorityPerEpochStore>,
1028 timeout: Duration,
1030 ) -> SuiResult {
1031 let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1032 Ok(permit) => permit,
1033 Err(_) => {
1034 return Err(SuiErrorKind::TooManyTransactionsPendingConsensus.into());
1035 }
1036 };
1037
1038 let _in_flight_submission_guard =
1039 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1040
1041 let key = SequencedConsensusTransactionKey::External(transaction.key());
1042 let tx_type = classify(transaction);
1043
1044 let async_stage = {
1045 let transaction = transaction.clone();
1046 let epoch_store = epoch_store.clone();
1047 let this = self.clone();
1048
1049 async move {
1050 let _permit = permit; let result = tokio::time::timeout(
1053 timeout,
1054 this.submit_inner(&[transaction], &epoch_store, &[key], tx_type),
1055 )
1056 .await;
1057
1058 if let Err(e) = result {
1059 warn!("Consensus submission timed out: {e:?}");
1060 this.metrics
1061 .sequencing_best_effort_timeout
1062 .with_label_values(&[tx_type])
1063 .inc();
1064 }
1065 }
1066 };
1067
1068 let epoch_store = epoch_store.clone();
1069 spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1070 Ok(())
1071 }
1072}