1use std::net::IpAddr;
5use std::ops::Deref;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11use consensus_core::BlockStatus;
12use futures::FutureExt;
13use futures::StreamExt;
14use futures::future::{self, Either, select};
15use futures::stream::FuturesUnordered;
16use mysten_common::debug_fatal;
17use mysten_metrics::{
18 GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
19};
20use parking_lot::RwLockReadGuard;
21use prometheus::Histogram;
22use prometheus::HistogramVec;
23use prometheus::IntCounter;
24use prometheus::IntCounterVec;
25use prometheus::IntGauge;
26use prometheus::IntGaugeVec;
27use prometheus::Registry;
28use prometheus::{
29 register_histogram_vec_with_registry, register_histogram_with_registry,
30 register_int_counter_vec_with_registry, register_int_counter_with_registry,
31 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
32};
33use sui_types::base_types::AuthorityName;
34use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
35use sui_types::fp_ensure;
36use sui_types::messages_consensus::ConsensusPosition;
37use sui_types::messages_consensus::ConsensusTransactionKind;
38use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
39use sui_types::transaction::TransactionDataAPI;
40use tokio::sync::{Notify, Semaphore, SemaphorePermit, oneshot};
41use tokio::task::JoinHandle;
42use tokio::time::Duration;
43use tokio::time::{self};
44use tracing::{Instrument, debug, debug_span, info, instrument, warn};
45
46use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
47use crate::checkpoints::CheckpointStore;
48use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
49use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
50
51#[cfg(test)]
52#[path = "unit_tests/consensus_tests.rs"]
53pub mod consensus_tests;
54
55pub struct ConsensusAdapterMetrics {
56 pub sequencing_certificate_attempt: IntCounterVec,
58 pub sequencing_certificate_success: IntCounterVec,
59 pub sequencing_certificate_failures: IntCounterVec,
60 pub sequencing_certificate_status: IntCounterVec,
61 pub sequencing_certificate_inflight: IntGaugeVec,
62 pub sequencing_acknowledge_latency: HistogramVec,
63 pub sequencing_certificate_latency: HistogramVec,
64 pub sequencing_certificate_processed: IntCounterVec,
65 pub sequencing_in_flight_semaphore_wait: IntGauge,
66 pub sequencing_in_flight_submissions: IntGauge,
67 pub sequencing_best_effort_timeout: IntCounterVec,
68 pub consensus_latency: Histogram,
69 pub num_rejected_cert_in_epoch_boundary: IntCounter,
70}
71
72impl ConsensusAdapterMetrics {
73 pub fn new(registry: &Registry) -> Self {
74 Self {
75 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
76 "sequencing_certificate_attempt",
77 "Counts the number of certificates the validator attempts to sequence.",
78 &["tx_type"],
79 registry,
80 )
81 .unwrap(),
82 sequencing_certificate_success: register_int_counter_vec_with_registry!(
83 "sequencing_certificate_success",
84 "Counts the number of successfully sequenced certificates.",
85 &["tx_type"],
86 registry,
87 )
88 .unwrap(),
89 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
90 "sequencing_certificate_failures",
91 "Counts the number of sequenced certificates that failed other than by timeout.",
92 &["tx_type"],
93 registry,
94 )
95 .unwrap(),
96 sequencing_certificate_status: register_int_counter_vec_with_registry!(
97 "sequencing_certificate_status",
98 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
99 &["tx_type", "status"],
100 registry,
101 )
102 .unwrap(),
103 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
104 "sequencing_certificate_inflight",
105 "The inflight requests to sequence certificates.",
106 &["tx_type"],
107 registry,
108 )
109 .unwrap(),
110 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
111 "sequencing_acknowledge_latency",
112 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
113 &["retry", "tx_type"],
114 LATENCY_SEC_BUCKETS.to_vec(),
115 registry,
116 ).unwrap(),
117 sequencing_certificate_latency: register_histogram_vec_with_registry!(
118 "sequencing_certificate_latency",
119 "The latency for sequencing a certificate.",
120 &["submitted", "tx_type", "processed_method"],
121 LATENCY_SEC_BUCKETS.to_vec(),
122 registry,
123 ).unwrap(),
124 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
125 "sequencing_certificate_processed",
126 "The number of certificates that have been processed either by consensus or checkpoint.",
127 &["source"],
128 registry
129 ).unwrap(),
130 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
131 "sequencing_in_flight_semaphore_wait",
132 "How many requests are blocked on submit_permit.",
133 registry,
134 )
135 .unwrap(),
136 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
137 "sequencing_in_flight_submissions",
138 "Number of transactions submitted to local consensus instance and not yet sequenced",
139 registry,
140 )
141 .unwrap(),
142 sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
143 "sequencing_best_effort_timeout",
144 "The number of times the best effort submission has timed out.",
145 &["tx_type"],
146 registry,
147 ).unwrap(),
148 consensus_latency: register_histogram_with_registry!(
151 "validator_service_consensus_latency",
152 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
153 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
154 registry,
155 ).unwrap(),
156 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
157 "validator_service_num_rejected_cert_in_epoch_boundary",
158 "Number of rejected transaction certificate during epoch transitioning",
159 registry,
160 ).unwrap(),
161 }
162 }
163
164 pub fn new_test() -> Self {
165 Self::new(&Registry::default())
166 }
167}
168
169pub trait ConsensusOverloadChecker: Sync + Send + 'static {
171 fn check_consensus_overload(&self) -> SuiResult;
172}
173
174pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
175
176#[mockall::automock]
177pub trait SubmitToConsensus: Sync + Send + 'static {
178 fn submit_to_consensus(
179 &self,
180 transactions: &[ConsensusTransaction],
181 epoch_store: &Arc<AuthorityPerEpochStore>,
182 ) -> SuiResult;
183
184 fn submit_best_effort(
193 &self,
194 transaction: &ConsensusTransaction,
195 epoch_store: &Arc<AuthorityPerEpochStore>,
196 timeout: Duration,
197 ) -> SuiResult;
198}
199
200#[mockall::automock]
201#[async_trait::async_trait]
202pub trait ConsensusClient: Sync + Send + 'static {
203 async fn submit(
204 &self,
205 transactions: &[ConsensusTransaction],
206 epoch_store: &Arc<AuthorityPerEpochStore>,
207 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
208}
209
210pub struct ConsensusAdapter {
212 consensus_client: Arc<dyn ConsensusClient>,
214 checkpoint_store: Arc<CheckpointStore>,
216 authority: AuthorityName,
218 max_pending_transactions: usize,
220 num_inflight_transactions: AtomicU64,
222 metrics: ConsensusAdapterMetrics,
224 submit_semaphore: Arc<Semaphore>,
226 inflight_slot_freed_notify: Arc<Notify>,
230}
231
232impl ConsensusAdapter {
233 pub fn new(
235 consensus_client: Arc<dyn ConsensusClient>,
236 checkpoint_store: Arc<CheckpointStore>,
237 authority: AuthorityName,
238 max_pending_transactions: usize,
239 max_pending_local_submissions: usize,
240 metrics: ConsensusAdapterMetrics,
241 inflight_slot_freed_notify: Arc<Notify>,
242 ) -> Self {
243 let num_inflight_transactions = Default::default();
244 Self {
245 consensus_client,
246 checkpoint_store,
247 authority,
248 max_pending_transactions,
249 num_inflight_transactions,
250 metrics,
251 submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
252 inflight_slot_freed_notify,
253 }
254 }
255
256 pub fn num_inflight_transactions(&self) -> u64 {
258 self.num_inflight_transactions.load(Ordering::Relaxed)
259 }
260
261 pub fn max_pending_transactions(&self) -> usize {
263 self.max_pending_transactions
264 }
265
266 pub async fn submit_and_get_positions(
269 self: &Arc<Self>,
270 consensus_transactions: Vec<ConsensusTransaction>,
271 epoch_store: &Arc<AuthorityPerEpochStore>,
272 submitter_client_addr: Option<IpAddr>,
273 ) -> Result<Vec<ConsensusPosition>, SuiError> {
274 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
275
276 {
277 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
279 if !reconfiguration_lock.should_accept_user_certs() {
280 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
281 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
282 }
283
284 let _metrics_guard = self.metrics.consensus_latency.start_timer();
288
289 self.submit_batch(
290 &consensus_transactions,
291 Some(&reconfiguration_lock),
292 epoch_store,
293 Some(tx_consensus_positions),
294 submitter_client_addr,
295 )?;
296 }
297
298 rx_consensus_positions.await.map_err(|e| {
299 SuiError::from(SuiErrorKind::FailedToSubmitToConsensus(format!(
300 "Failed to get consensus position: {e}"
301 )))
302 })?
303 }
304
305 pub fn recover_end_of_publish(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
306 if epoch_store.should_send_end_of_publish() {
309 let transaction = ConsensusTransaction::new_end_of_publish(self.authority);
310 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
311 self.submit_unchecked(&[transaction], epoch_store, None, None);
312 }
313 }
314
315 pub fn submit(
323 self: &Arc<Self>,
324 transaction: ConsensusTransaction,
325 lock: Option<&RwLockReadGuard<ReconfigState>>,
326 epoch_store: &Arc<AuthorityPerEpochStore>,
327 tx_consensus_position: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
328 submitter_client_addr: Option<IpAddr>,
329 ) -> SuiResult<JoinHandle<()>> {
330 self.submit_batch(
331 &[transaction],
332 lock,
333 epoch_store,
334 tx_consensus_position,
335 submitter_client_addr,
336 )
337 }
338
339 pub fn submit_batch(
342 self: &Arc<Self>,
343 transactions: &[ConsensusTransaction],
344 _lock: Option<&RwLockReadGuard<ReconfigState>>,
345 epoch_store: &Arc<AuthorityPerEpochStore>,
346 tx_consensus_position: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
347 submitter_client_addr: Option<IpAddr>,
348 ) -> SuiResult<JoinHandle<()>> {
349 if transactions.len() > 1 {
350 for transaction in transactions {
352 fp_ensure!(
353 transaction.is_user_transaction(),
354 SuiErrorKind::InvalidTxKindInSoftBundle.into()
355 );
356 }
357 }
358
359 Ok(self.submit_unchecked(
360 transactions,
361 epoch_store,
362 tx_consensus_position,
363 submitter_client_addr,
364 ))
365 }
366
367 fn check_limits(&self) -> bool {
370 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
372 >= self.max_pending_transactions
373 {
374 return false;
375 }
376 self.submit_semaphore.available_permits() > 0
378 }
379
380 fn submit_unchecked(
381 self: &Arc<Self>,
382 transactions: &[ConsensusTransaction],
383 epoch_store: &Arc<AuthorityPerEpochStore>,
384 tx_consensus_position: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
385 submitter_client_addr: Option<IpAddr>,
386 ) -> JoinHandle<()> {
387 let async_stage = self
389 .clone()
390 .submit_and_wait(
391 transactions.to_vec(),
392 epoch_store.clone(),
393 tx_consensus_position,
394 submitter_client_addr,
395 )
396 .in_current_span();
397 let join_handle = spawn_monitored_task!(async_stage);
400 join_handle
401 }
402
403 async fn submit_and_wait(
404 self: Arc<Self>,
405 transactions: Vec<ConsensusTransaction>,
406 epoch_store: Arc<AuthorityPerEpochStore>,
407 tx_consensus_position: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
408 submitter_client_addr: Option<IpAddr>,
409 ) {
410 epoch_store
424 .within_alive_epoch(self.submit_and_wait_inner(
425 transactions,
426 &epoch_store,
427 tx_consensus_position,
428 submitter_client_addr,
429 ))
430 .await
431 .ok(); }
433
434 #[allow(clippy::option_map_unit_fn)]
435 #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
436 async fn submit_and_wait_inner(
437 self: Arc<Self>,
438 transactions: Vec<ConsensusTransaction>,
439 epoch_store: &Arc<AuthorityPerEpochStore>,
440 mut tx_consensus_positions: Option<oneshot::Sender<SuiResult<Vec<ConsensusPosition>>>>,
441 submitter_client_addr: Option<IpAddr>,
442 ) {
443 if transactions.is_empty() {
444 debug!(
447 "Performing a ping check, pinging consensus to get a consensus position in next block"
448 );
449 let (consensus_positions, _status_waiter) = self
450 .submit_inner(&transactions, epoch_store, &[], "ping")
451 .await;
452
453 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
454 let _ = tx_consensus_positions.send(Ok(consensus_positions));
455 } else {
456 debug_fatal!("Ping check must have a consensus position channel");
457 }
458 return;
459 }
460
461 for transaction in &transactions {
463 if let Some(tx) = transaction.kind.as_user_transaction() {
464 let amplification_factor = (tx.data().transaction_data().gas_price()
465 / epoch_store.reference_gas_price().max(1))
466 .max(1);
467 epoch_store.submitted_transaction_cache.record_submitted_tx(
468 tx.digest(),
469 amplification_factor as u32,
470 submitter_client_addr,
471 );
472 }
473 }
474
475 let is_soft_bundle = transactions.len() > 1;
480 let is_system_message = !transactions[0].is_user_transaction();
481
482 let mut transaction_keys = Vec::new();
483 let mut tx_consensus_positions = tx_consensus_positions;
484
485 for transaction in &transactions {
486 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
487 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
488 epoch_store.record_epoch_pending_certs_process_time_metric();
489 }
490
491 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
492 transaction_keys.push(transaction_key);
493 }
494 let tx_type = if is_soft_bundle {
495 "soft_bundle"
496 } else {
497 classify(&transactions[0])
498 };
499 tracing::Span::current().record("tx_type", tx_type);
500 tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
501
502 let mut guard = InflightDropGuard::acquire(&self, tx_type, transactions.len() as u64);
503
504 let make_processing_error = |method: ProcessedMethod| -> SuiError {
509 let digest = transactions
510 .iter()
511 .find_map(|t| t.kind.as_user_transaction().map(|tx| *tx.digest()))
512 .unwrap_or_default();
513 SuiErrorKind::TransactionProcessing {
514 digest,
515 status: format!("processed via {}", method.processed_via()),
516 }
517 .into()
518 };
519
520 let already_processed =
523 self.check_processed_via_consensus_or_checkpoint(&transaction_keys, epoch_store);
524 if let Some(method) = already_processed {
525 guard.processed_method = method;
526 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
527 let _ = tx_consensus_positions.send(Err(make_processing_error(method)));
528 }
529 }
530
531 let _monitor = if matches!(
533 transactions[0].kind,
534 ConsensusTransactionKind::EndOfPublish(_)
535 | ConsensusTransactionKind::CapabilityNotification(_)
536 | ConsensusTransactionKind::CapabilityNotificationV2(_)
537 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
538 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
539 ) {
540 assert!(
541 !is_soft_bundle,
542 "System transactions should have been submitted individually"
543 );
544 let transaction_keys = transaction_keys.clone();
545 Some(CancelOnDrop(spawn_monitored_task!(async {
546 let mut i = 0u64;
547 loop {
548 i += 1;
549 const WARN_DELAY_S: u64 = 30;
550 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
551 let total_wait = i * WARN_DELAY_S;
552 warn!(
553 "Still waiting {} seconds for transactions {:?} to commit in consensus",
554 total_wait, transaction_keys
555 );
556 }
557 })))
558 } else {
559 None
560 };
561
562 if already_processed.is_none() {
563 debug!("Submitting {:?} to consensus", transaction_keys);
564 guard.submitted = true;
565
566 let _permit: Option<SemaphorePermit> = if is_system_message {
570 None
571 } else {
572 Some(
573 self.submit_semaphore
574 .acquire()
575 .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
576 .await
577 .expect("Consensus adapter does not close semaphore"),
578 )
579 };
580 let _in_flight_submission_guard =
581 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
582
583 let submit_fut = async {
586 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
587
588 loop {
589 let (consensus_positions, status_waiter) = self
591 .submit_inner(&transactions, epoch_store, &transaction_keys, tx_type)
592 .await;
593
594 if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
595 tracing::Span::current().record(
596 "consensus_positions",
597 tracing::field::debug(&consensus_positions),
598 );
599 let _ = tx_consensus_positions.send(Ok(consensus_positions));
605 }
606
607 match status_waiter.await {
608 Ok(status @ BlockStatus::Sequenced(_)) => {
609 tracing::Span::current()
610 .record("status", tracing::field::debug(&status));
611 self.metrics
612 .sequencing_certificate_status
613 .with_label_values(&[tx_type, "sequenced"])
614 .inc();
615 debug!(
617 "Transaction {transaction_keys:?} has been sequenced by consensus."
618 );
619 break;
620 }
621 Ok(status @ BlockStatus::GarbageCollected(_)) => {
622 tracing::Span::current()
623 .record("status", tracing::field::debug(&status));
624 self.metrics
625 .sequencing_certificate_status
626 .with_label_values(&[tx_type, "garbage_collected"])
627 .inc();
628 debug!(
632 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
633 );
634 time::sleep(RETRY_DELAY_STEP).await;
635 continue;
636 }
637 Err(err) => {
638 warn!(
639 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
640 err
641 );
642 time::sleep(RETRY_DELAY_STEP).await;
643 continue;
644 }
645 }
646 }
647 };
648
649 let processed_waiter = self
655 .processed_notify(transaction_keys.clone(), epoch_store)
656 .boxed();
657 let processed_via_notify;
658 guard.processed_method = match select(processed_waiter, submit_fut.boxed()).await {
659 Either::Left((observed, _submit_fut)) => {
660 processed_via_notify = true;
661 observed
662 }
663 Either::Right(((), processed_waiter)) => {
664 debug!("Submitted {transaction_keys:?} to consensus");
665 processed_via_notify = false;
666 processed_waiter.await
667 }
668 };
669 if processed_via_notify
674 && let Some(tx_consensus_positions) = tx_consensus_positions.take()
675 {
676 let _ =
677 tx_consensus_positions.send(Err(make_processing_error(guard.processed_method)));
678 }
679 }
680 debug!(
681 "{transaction_keys:?} processed via {}",
682 guard.processed_method.processed_via()
683 );
684
685 if transactions[0].is_user_transaction()
689 && epoch_store.should_send_end_of_publish()
690 && !epoch_store.protocol_config().timestamp_based_epoch_close()
691 {
692 if let Err(err) = self.submit(
694 ConsensusTransaction::new_end_of_publish(self.authority),
695 None,
696 epoch_store,
697 None,
698 None,
699 ) {
700 warn!("Error when sending end of publish message: {:?}", err);
701 } else {
702 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
703 }
704 }
705 self.metrics
706 .sequencing_certificate_success
707 .with_label_values(&[tx_type])
708 .inc();
709 }
710
711 #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
712 async fn submit_inner(
713 self: &Arc<Self>,
714 transactions: &[ConsensusTransaction],
715 epoch_store: &Arc<AuthorityPerEpochStore>,
716 transaction_keys: &[SequencedConsensusTransactionKey],
717 tx_type: &str,
718 ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
719 let ack_start = Instant::now();
720 let mut retries: u32 = 0;
721 let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
722 Duration::from_millis(100),
723 Duration::from_secs(10),
724 );
725
726 let (consensus_positions, status_waiter) = loop {
727 let span = debug_span!("client_submit");
728 match self
729 .consensus_client
730 .submit(transactions, epoch_store)
731 .instrument(span)
732 .await
733 {
734 Err(err) => {
735 if cfg!(msim) || retries > 3 {
737 warn!(
738 "Failed to submit transactions {transaction_keys:?} to consensus: {err}. Retry #{retries}"
739 );
740 }
741 self.metrics
742 .sequencing_certificate_failures
743 .with_label_values(&[tx_type])
744 .inc();
745 retries += 1;
746
747 time::sleep(backoff.next().unwrap()).await;
748 }
749 Ok((consensus_positions, status_waiter)) => {
750 break (consensus_positions, status_waiter);
751 }
752 }
753 };
754
755 let bucket = match retries {
759 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
761 21..=50 => "between_20_and_50".to_string(),
762 51..=100 => "between_50_and_100".to_string(),
763 _ => "over_100".to_string(),
764 };
765
766 self.metrics
767 .sequencing_acknowledge_latency
768 .with_label_values(&[bucket.as_str(), tx_type])
769 .observe(ack_start.elapsed().as_secs_f64());
770
771 (consensus_positions, status_waiter)
772 }
773
774 fn check_processed_via_consensus_or_checkpoint(
783 self: &Arc<Self>,
784 transaction_keys: &[SequencedConsensusTransactionKey],
785 epoch_store: &Arc<AuthorityPerEpochStore>,
786 ) -> Option<ProcessedMethod> {
787 let mut seen_checkpoint = false;
788 for transaction_key in transaction_keys {
789 if epoch_store
792 .is_consensus_message_processed(transaction_key)
793 .expect("Storage error when checking consensus message processed")
794 {
795 self.metrics
796 .sequencing_certificate_processed
797 .with_label_values(&["consensus"])
798 .inc();
799 continue;
800 }
801
802 if let SequencedConsensusTransactionKey::External(ConsensusTransactionKey::Certificate(
805 digest,
806 )) = transaction_key
807 && epoch_store
808 .is_transaction_executed_in_checkpoint(digest)
809 .expect("Storage error when checking transaction executed in checkpoint")
810 {
811 self.metrics
812 .sequencing_certificate_processed
813 .with_label_values(&["checkpoint"])
814 .inc();
815 seen_checkpoint = true;
816 continue;
817 }
818
819 if let SequencedConsensusTransactionKey::External(
823 ConsensusTransactionKey::CheckpointSignature(_, seq)
824 | ConsensusTransactionKey::CheckpointSignatureV2(_, seq, _),
825 ) = transaction_key
826 && let Some(synced_seq) = self
827 .checkpoint_store
828 .get_highest_synced_checkpoint_seq_number()
829 .expect("Storage error when reading highest synced checkpoint")
830 && synced_seq >= *seq
831 {
832 self.metrics
833 .sequencing_certificate_processed
834 .with_label_values(&["synced_checkpoint"])
835 .inc();
836 seen_checkpoint = true;
837 continue;
838 }
839
840 return None;
842 }
843
844 if seen_checkpoint {
845 Some(ProcessedMethod::Checkpoint)
846 } else {
847 Some(ProcessedMethod::Consensus)
848 }
849 }
850
851 async fn processed_notify(
858 self: &Arc<Self>,
859 transaction_keys: Vec<SequencedConsensusTransactionKey>,
860 epoch_store: &Arc<AuthorityPerEpochStore>,
861 ) -> ProcessedMethod {
862 let notifications = FuturesUnordered::new();
863 for transaction_key in transaction_keys {
864 let transaction_digests = match transaction_key {
865 SequencedConsensusTransactionKey::External(
866 ConsensusTransactionKey::Certificate(digest),
867 ) => vec![digest],
868 _ => vec![],
869 };
870
871 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
872 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
873 | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
874 ) = transaction_key
875 {
876 Either::Left(
879 self.checkpoint_store
880 .notify_read_synced_checkpoint(checkpoint_sequence_number),
881 )
882 } else {
883 Either::Right(future::pending())
884 };
885
886 notifications.push(async move {
890 tokio::select! {
891 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
892 processed.expect("Storage error when waiting for consensus message processed");
893 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
894 return ProcessedMethod::Consensus;
895 },
896 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
897 processed.expect("Storage error when waiting for transaction executed in checkpoint");
898 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
899 }
900 _ = checkpoint_synced_future => {
901 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
902 }
903 }
904 ProcessedMethod::Checkpoint
905 });
906 }
907
908 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
909 for method in processed_methods {
910 if method == ProcessedMethod::Checkpoint {
911 return ProcessedMethod::Checkpoint;
912 }
913 }
914 ProcessedMethod::Consensus
915 }
916}
917
918impl ConsensusOverloadChecker for ConsensusAdapter {
919 fn check_consensus_overload(&self) -> SuiResult {
920 fp_ensure!(
921 self.check_limits(),
922 SuiErrorKind::TooManyTransactionsPendingConsensus.into()
923 );
924 Ok(())
925 }
926}
927
928pub struct NoopConsensusOverloadChecker {}
929
930impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
931 fn check_consensus_overload(&self) -> SuiResult {
932 Ok(())
933 }
934}
935
936impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
937 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
941 {
942 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
943 if !reconfig_guard.should_accept_user_certs() {
944 return;
946 }
947 epoch_store.close_user_certs(reconfig_guard);
948 }
949 if epoch_store.should_send_end_of_publish() {
950 if let Err(err) = self.submit(
951 ConsensusTransaction::new_end_of_publish(self.authority),
952 None,
953 epoch_store,
954 None,
955 None,
956 ) {
957 warn!("Error when sending end of publish message: {:?}", err);
958 } else {
959 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
960 }
961 }
962 }
963}
964
965struct CancelOnDrop<T>(JoinHandle<T>);
966
967impl<T> Deref for CancelOnDrop<T> {
968 type Target = JoinHandle<T>;
969
970 fn deref(&self) -> &Self::Target {
971 &self.0
972 }
973}
974
975impl<T> Drop for CancelOnDrop<T> {
976 fn drop(&mut self) {
977 self.0.abort();
978 }
979}
980
981struct InflightDropGuard<'a> {
983 adapter: &'a ConsensusAdapter,
984 start: Instant,
985 submitted: bool,
986 tx_type: &'static str,
987 processed_method: ProcessedMethod,
988 inflight_count: u64,
991}
992
993#[derive(Copy, Clone, PartialEq, Eq)]
994enum ProcessedMethod {
995 Consensus,
996 Checkpoint,
997}
998
999impl ProcessedMethod {
1000 fn processed_via(self) -> &'static str {
1001 match self {
1002 ProcessedMethod::Consensus => "consensus output",
1003 ProcessedMethod::Checkpoint => "checkpoint execution",
1004 }
1005 }
1006
1007 fn latency_metric_label(self) -> &'static str {
1008 match self {
1009 ProcessedMethod::Consensus => "processed_via_consensus",
1010 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1011 }
1012 }
1013}
1014
1015impl<'a> InflightDropGuard<'a> {
1016 pub fn acquire(
1017 adapter: &'a ConsensusAdapter,
1018 tx_type: &'static str,
1019 inflight_count: u64,
1020 ) -> Self {
1021 adapter
1022 .num_inflight_transactions
1023 .fetch_add(inflight_count, Ordering::SeqCst);
1024 adapter
1025 .metrics
1026 .sequencing_certificate_inflight
1027 .with_label_values(&[tx_type])
1028 .inc();
1029 adapter
1030 .metrics
1031 .sequencing_certificate_attempt
1032 .with_label_values(&[tx_type])
1033 .inc();
1034 Self {
1035 adapter,
1036 start: Instant::now(),
1037 submitted: false,
1038 tx_type,
1039 processed_method: ProcessedMethod::Consensus,
1040 inflight_count,
1041 }
1042 }
1043}
1044
1045impl Drop for InflightDropGuard<'_> {
1046 fn drop(&mut self) {
1047 self.adapter
1048 .num_inflight_transactions
1049 .fetch_sub(self.inflight_count, Ordering::SeqCst);
1050 self.adapter
1051 .metrics
1052 .sequencing_certificate_inflight
1053 .with_label_values(&[self.tx_type])
1054 .dec();
1055 self.adapter.inflight_slot_freed_notify.notify_one();
1057
1058 let latency = self.start.elapsed();
1059 let submitted = if self.submitted {
1060 "submitted"
1061 } else {
1062 "skipped"
1063 };
1064
1065 self.adapter
1066 .metrics
1067 .sequencing_certificate_latency
1068 .with_label_values(&[
1069 submitted,
1070 self.tx_type,
1071 self.processed_method.latency_metric_label(),
1072 ])
1073 .observe(latency.as_secs_f64());
1074 }
1075}
1076
1077impl SubmitToConsensus for Arc<ConsensusAdapter> {
1078 fn submit_to_consensus(
1079 &self,
1080 transactions: &[ConsensusTransaction],
1081 epoch_store: &Arc<AuthorityPerEpochStore>,
1082 ) -> SuiResult {
1083 self.submit_batch(transactions, None, epoch_store, None, None)
1084 .map(|_| ())
1085 }
1086
1087 fn submit_best_effort(
1088 &self,
1089 transaction: &ConsensusTransaction,
1090 epoch_store: &Arc<AuthorityPerEpochStore>,
1091 timeout: Duration,
1093 ) -> SuiResult {
1094 if transaction.is_user_transaction() {
1095 debug_fatal!("submit_best_effort called with a user transaction");
1096 return Err(SuiErrorKind::GenericAuthorityError {
1097 error: "submit_best_effort does not accept user transactions".to_string(),
1098 }
1099 .into());
1100 }
1101
1102 let _in_flight_submission_guard =
1104 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1105
1106 let key = SequencedConsensusTransactionKey::External(transaction.key());
1107 let tx_type = classify(transaction);
1108
1109 let async_stage = {
1110 let transaction = transaction.clone();
1111 let epoch_store = epoch_store.clone();
1112 let this = self.clone();
1113
1114 async move {
1115 let result = tokio::time::timeout(
1116 timeout,
1117 this.submit_inner(&[transaction], &epoch_store, &[key], tx_type),
1118 )
1119 .await;
1120
1121 if let Err(e) = result {
1122 warn!("Consensus submission timed out: {e:?}");
1123 this.metrics
1124 .sequencing_best_effort_timeout
1125 .with_label_values(&[tx_type])
1126 .inc();
1127 }
1128 }
1129 };
1130
1131 let epoch_store = epoch_store.clone();
1132 spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1133 Ok(())
1134 }
1135}