sui_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::IpAddr;
5use std::ops::Deref;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use std::time::Instant;
10
11use consensus_core::BlockStatus;
12use futures::FutureExt;
13use futures::StreamExt;
14use futures::future::{self, Either, select};
15use futures::stream::FuturesUnordered;
16use mysten_common::debug_fatal;
17use mysten_metrics::{
18    GaugeGuard, InflightGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task,
19};
20use parking_lot::RwLockReadGuard;
21use prometheus::Histogram;
22use prometheus::HistogramVec;
23use prometheus::IntCounter;
24use prometheus::IntCounterVec;
25use prometheus::IntGauge;
26use prometheus::IntGaugeVec;
27use prometheus::Registry;
28use prometheus::{
29    register_histogram_vec_with_registry, register_histogram_with_registry,
30    register_int_counter_vec_with_registry, register_int_counter_with_registry,
31    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
32};
33use sui_types::base_types::AuthorityName;
34use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
35use sui_types::fp_ensure;
36use sui_types::messages_consensus::ConsensusPosition;
37use sui_types::messages_consensus::ConsensusTransactionKind;
38use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
39use sui_types::transaction::TransactionDataAPI;
40use tokio::sync::{Notify, Semaphore, SemaphorePermit, oneshot};
41use tokio::task::JoinHandle;
42use tokio::time::Duration;
43use tokio::time::{self};
44use tracing::{Instrument, debug, debug_span, info, instrument, trace, warn};
45
46use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
47use crate::checkpoints::CheckpointStore;
48use crate::consensus_handler::{SequencedConsensusTransactionKey, classify};
49use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
50
51#[cfg(test)]
52#[path = "unit_tests/consensus_tests.rs"]
53pub mod consensus_tests;
54
55pub struct ConsensusAdapterMetrics {
56    // Certificate sequencing metrics
57    pub sequencing_certificate_attempt: IntCounterVec,
58    pub sequencing_certificate_success: IntCounterVec,
59    pub sequencing_certificate_failures: IntCounterVec,
60    pub sequencing_certificate_status: IntCounterVec,
61    pub sequencing_certificate_inflight: IntGaugeVec,
62    pub sequencing_acknowledge_latency: HistogramVec,
63    pub sequencing_certificate_latency: HistogramVec,
64    pub sequencing_certificate_processed: IntCounterVec,
65    pub sequencing_in_flight_semaphore_wait: IntGauge,
66    pub sequencing_in_flight_submissions: IntGauge,
67    pub sequencing_best_effort_timeout: IntCounterVec,
68    pub consensus_latency: Histogram,
69    pub num_rejected_cert_in_epoch_boundary: IntCounter,
70}
71
72impl ConsensusAdapterMetrics {
73    pub fn new(registry: &Registry) -> Self {
74        Self {
75            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
76                "sequencing_certificate_attempt",
77                "Counts the number of certificates the validator attempts to sequence.",
78                &["tx_type"],
79                registry,
80            )
81                .unwrap(),
82            sequencing_certificate_success: register_int_counter_vec_with_registry!(
83                "sequencing_certificate_success",
84                "Counts the number of successfully sequenced certificates.",
85                &["tx_type"],
86                registry,
87            )
88                .unwrap(),
89            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
90                "sequencing_certificate_failures",
91                "Counts the number of sequenced certificates that failed other than by timeout.",
92                &["tx_type"],
93                registry,
94            )
95                .unwrap(),
96                sequencing_certificate_status: register_int_counter_vec_with_registry!(
97                "sequencing_certificate_status",
98                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
99                &["tx_type", "status"],
100                registry,
101            )
102                .unwrap(),
103            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
104                "sequencing_certificate_inflight",
105                "The inflight requests to sequence certificates.",
106                &["tx_type"],
107                registry,
108            )
109                .unwrap(),
110            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
111                "sequencing_acknowledge_latency",
112                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
113                &["retry", "tx_type"],
114                LATENCY_SEC_BUCKETS.to_vec(),
115                registry,
116            ).unwrap(),
117            sequencing_certificate_latency: register_histogram_vec_with_registry!(
118                "sequencing_certificate_latency",
119                "The latency for sequencing a certificate.",
120                &["submitted", "tx_type", "processed_method"],
121                LATENCY_SEC_BUCKETS.to_vec(),
122                registry,
123            ).unwrap(),
124            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
125                "sequencing_certificate_processed",
126                "The number of certificates that have been processed either by consensus or checkpoint.",
127                &["source"],
128                registry
129            ).unwrap(),
130            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
131                "sequencing_in_flight_semaphore_wait",
132                "How many requests are blocked on submit_permit.",
133                registry,
134            )
135                .unwrap(),
136            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
137                "sequencing_in_flight_submissions",
138                "Number of transactions submitted to local consensus instance and not yet sequenced",
139                registry,
140            )
141                .unwrap(),
142            sequencing_best_effort_timeout: register_int_counter_vec_with_registry!(
143                "sequencing_best_effort_timeout",
144                "The number of times the best effort submission has timed out.",
145                &["tx_type"],
146                registry,
147            ).unwrap(),
148            // These two metrics originally lived in ValidatorServiceMetrics (authority_server.rs)
149            // and keep their legacy names for dashboard compatibility.
150            consensus_latency: register_histogram_with_registry!(
151                "validator_service_consensus_latency",
152                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
153                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
154                registry,
155            ).unwrap(),
156            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
157                "validator_service_num_rejected_cert_in_epoch_boundary",
158                "Number of rejected transaction certificate during epoch transitioning",
159                registry,
160            ).unwrap(),
161        }
162    }
163
164    pub fn new_test() -> Self {
165        Self::new(&Registry::default())
166    }
167}
168
169/// An object that can be used to check if the consensus is overloaded.
170pub trait ConsensusOverloadChecker: Sync + Send + 'static {
171    fn check_consensus_overload(&self) -> SuiResult;
172}
173
174pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
175
176#[mockall::automock]
177pub trait SubmitToConsensus: Sync + Send + 'static {
178    fn submit_to_consensus(
179        &self,
180        transactions: &[ConsensusTransaction],
181        epoch_store: &Arc<AuthorityPerEpochStore>,
182    ) -> SuiResult;
183
184    fn submit_best_effort(
185        &self,
186        transaction: &ConsensusTransaction,
187        epoch_store: &Arc<AuthorityPerEpochStore>,
188        timeout: Duration,
189    ) -> SuiResult;
190}
191
192#[mockall::automock]
193#[async_trait::async_trait]
194pub trait ConsensusClient: Sync + Send + 'static {
195    async fn submit(
196        &self,
197        transactions: &[ConsensusTransaction],
198        epoch_store: &Arc<AuthorityPerEpochStore>,
199    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)>;
200}
201
202/// Submit Sui certificates to the consensus.
203pub struct ConsensusAdapter {
204    /// The network client connecting to the consensus node of this authority.
205    consensus_client: Arc<dyn ConsensusClient>,
206    /// The checkpoint store for the validator
207    checkpoint_store: Arc<CheckpointStore>,
208    /// Authority pubkey.
209    authority: AuthorityName,
210    /// The limit to number of inflight transactions at this node.
211    max_pending_transactions: usize,
212    /// Number of submitted transactions still inflight at this node.
213    num_inflight_transactions: AtomicU64,
214    /// A structure to register metrics
215    metrics: ConsensusAdapterMetrics,
216    /// Semaphore limiting parallel submissions to consensus
217    submit_semaphore: Arc<Semaphore>,
218    /// Notified when an inflight slot is freed (`InflightDropGuard` dropped).
219    /// Used by the admission queue drainer to wake up and submit more
220    /// transactions.
221    inflight_slot_freed_notify: Arc<Notify>,
222}
223
224impl ConsensusAdapter {
225    /// Make a new Consensus adapter instance.
226    pub fn new(
227        consensus_client: Arc<dyn ConsensusClient>,
228        checkpoint_store: Arc<CheckpointStore>,
229        authority: AuthorityName,
230        max_pending_transactions: usize,
231        max_pending_local_submissions: usize,
232        metrics: ConsensusAdapterMetrics,
233        inflight_slot_freed_notify: Arc<Notify>,
234    ) -> Self {
235        let num_inflight_transactions = Default::default();
236        Self {
237            consensus_client,
238            checkpoint_store,
239            authority,
240            max_pending_transactions,
241            num_inflight_transactions,
242            metrics,
243            submit_semaphore: Arc::new(Semaphore::new(max_pending_local_submissions)),
244            inflight_slot_freed_notify,
245        }
246    }
247
248    /// Get the current number of in-flight transactions
249    pub fn num_inflight_transactions(&self) -> u64 {
250        self.num_inflight_transactions.load(Ordering::Relaxed)
251    }
252
253    /// Get the maximum number of pending transactions (consensus capacity limit).
254    pub fn max_pending_transactions(&self) -> usize {
255        self.max_pending_transactions
256    }
257
258    /// Submits transactions to consensus within the reconfiguration lock and
259    /// returns their consensus positions.
260    pub async fn submit_and_get_positions(
261        self: &Arc<Self>,
262        consensus_transactions: Vec<ConsensusTransaction>,
263        epoch_store: &Arc<AuthorityPerEpochStore>,
264        submitter_client_addr: Option<IpAddr>,
265    ) -> Result<Vec<ConsensusPosition>, SuiError> {
266        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
267
268        {
269            // code block within reconfiguration lock
270            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
271            if !reconfiguration_lock.should_accept_user_certs() {
272                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
273                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
274            }
275
276            // Submit to consensus and wait for position, we do not check if tx
277            // has been processed by consensus already as this method is called
278            // to get back a consensus position.
279            let _metrics_guard = self.metrics.consensus_latency.start_timer();
280
281            self.submit_batch(
282                &consensus_transactions,
283                Some(&reconfiguration_lock),
284                epoch_store,
285                Some(tx_consensus_positions),
286                submitter_client_addr,
287            )?;
288        }
289
290        rx_consensus_positions.await.map_err(|e| {
291            SuiErrorKind::FailedToSubmitToConsensus(format!(
292                "Failed to get consensus position: {e}"
293            ))
294            .into()
295        })
296    }
297
298    pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
299        // Send EndOfPublish if needed.
300        // This handles the case where the node crashed after setting reconfig lock state
301        // but before the EndOfPublish message was sent to consensus.
302        if epoch_store.should_send_end_of_publish() {
303            let transaction = ConsensusTransaction::new_end_of_publish(self.authority);
304            info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
305            self.submit_unchecked(&[transaction], epoch_store, None, None);
306        }
307    }
308
309    /// This method blocks until transaction is persisted in local database
310    /// It then returns handle to async task, user can join this handle to await while transaction is processed by consensus
311    ///
312    /// This method guarantees that once submit(but not returned async handle) returns,
313    /// transaction is persisted and will eventually be sent to consensus even after restart
314    ///
315    /// When submitting a certificate caller **must** provide a ReconfigState lock guard
316    pub fn submit(
317        self: &Arc<Self>,
318        transaction: ConsensusTransaction,
319        lock: Option<&RwLockReadGuard<ReconfigState>>,
320        epoch_store: &Arc<AuthorityPerEpochStore>,
321        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
322        submitter_client_addr: Option<IpAddr>,
323    ) -> SuiResult<JoinHandle<()>> {
324        self.submit_batch(
325            &[transaction],
326            lock,
327            epoch_store,
328            tx_consensus_position,
329            submitter_client_addr,
330        )
331    }
332
333    // Submits the provided transactions to consensus in a batched fashion. The `transactions` vector can be also empty in case of a ping check.
334    // In this case the system will simulate a transaction submission to consensus and return the consensus position.
335    pub fn submit_batch(
336        self: &Arc<Self>,
337        transactions: &[ConsensusTransaction],
338        _lock: Option<&RwLockReadGuard<ReconfigState>>,
339        epoch_store: &Arc<AuthorityPerEpochStore>,
340        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
341        submitter_client_addr: Option<IpAddr>,
342    ) -> SuiResult<JoinHandle<()>> {
343        if transactions.len() > 1 {
344            // Soft bundles must contain only UserTransactionV2 transactions.
345            for transaction in transactions {
346                fp_ensure!(
347                    transaction.is_user_transaction(),
348                    SuiErrorKind::InvalidTxKindInSoftBundle.into()
349                );
350            }
351        }
352
353        Ok(self.submit_unchecked(
354            transactions,
355            epoch_store,
356            tx_consensus_position,
357            submitter_client_addr,
358        ))
359    }
360
361    /// Performs weakly consistent checks on internal buffers to quickly
362    /// discard transactions if we are overloaded
363    fn check_limits(&self) -> bool {
364        // First check total transactions (waiting and in submission)
365        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
366            > self.max_pending_transactions
367        {
368            return false;
369        }
370        // Then check if submit_semaphore has permits
371        self.submit_semaphore.available_permits() > 0
372    }
373
374    fn submit_unchecked(
375        self: &Arc<Self>,
376        transactions: &[ConsensusTransaction],
377        epoch_store: &Arc<AuthorityPerEpochStore>,
378        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
379        submitter_client_addr: Option<IpAddr>,
380    ) -> JoinHandle<()> {
381        // Reconfiguration lock is dropped when pending_consensus_transactions is persisted, before it is handled by consensus
382        let async_stage = self
383            .clone()
384            .submit_and_wait(
385                transactions.to_vec(),
386                epoch_store.clone(),
387                tx_consensus_position,
388                submitter_client_addr,
389            )
390            .in_current_span();
391        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
392        // (Limit is not applied atomically, and only to user transactions.)
393        let join_handle = spawn_monitored_task!(async_stage);
394        join_handle
395    }
396
397    async fn submit_and_wait(
398        self: Arc<Self>,
399        transactions: Vec<ConsensusTransaction>,
400        epoch_store: Arc<AuthorityPerEpochStore>,
401        tx_consensus_position: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
402        submitter_client_addr: Option<IpAddr>,
403    ) {
404        // When epoch_terminated signal is received all pending submit_and_wait_inner are dropped.
405        //
406        // This is needed because submit_and_wait_inner waits on read_notify for consensus message to be processed,
407        // which may never happen on epoch boundary.
408        //
409        // In addition to that, within_alive_epoch ensures that all pending consensus
410        // adapter tasks are stopped before reconfiguration can proceed.
411        //
412        // This is essential because after epoch change, this validator may exit the committee and become a full node.
413        // So it is no longer able to submit to consensus.
414        //
415        // Also, submission to consensus is not gated on epoch. Although it is ok to submit user transactions
416        // to the new epoch, we want to cancel system transaction submissions from the current epoch to the new epoch.
417        epoch_store
418            .within_alive_epoch(self.submit_and_wait_inner(
419                transactions,
420                &epoch_store,
421                tx_consensus_position,
422                submitter_client_addr,
423            ))
424            .await
425            .ok(); // result here indicates if epoch ended earlier, we don't care about it
426    }
427
428    #[allow(clippy::option_map_unit_fn)]
429    #[instrument(name="ConsensusAdapter::submit_and_wait_inner", level="trace", skip_all, fields(tx_count = ?transactions.len(), tx_type = tracing::field::Empty, tx_keys = tracing::field::Empty, submit_status = tracing::field::Empty, consensus_positions = tracing::field::Empty))]
430    async fn submit_and_wait_inner(
431        self: Arc<Self>,
432        transactions: Vec<ConsensusTransaction>,
433        epoch_store: &Arc<AuthorityPerEpochStore>,
434        mut tx_consensus_positions: Option<oneshot::Sender<Vec<ConsensusPosition>>>,
435        submitter_client_addr: Option<IpAddr>,
436    ) {
437        if transactions.is_empty() {
438            // If transactions are empty, then we attempt to ping consensus and simulate a transaction submission to consensus.
439            // We intentionally do not wait for the block status, as we are only interested in the consensus position and return it immediately.
440            debug!(
441                "Performing a ping check, pinging consensus to get a consensus position in next block"
442            );
443            let (consensus_positions, _status_waiter) = self
444                .submit_inner(&transactions, epoch_store, &[], "ping")
445                .await;
446
447            if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
448                let _ = tx_consensus_positions.send(consensus_positions);
449            } else {
450                debug_fatal!("Ping check must have a consensus position channel");
451            }
452            return;
453        }
454
455        // Record submitted transactions early for DoS protection
456        for transaction in &transactions {
457            if let Some(tx) = transaction.kind.as_user_transaction() {
458                let amplification_factor = (tx.data().transaction_data().gas_price()
459                    / epoch_store.reference_gas_price().max(1))
460                .max(1);
461                epoch_store.submitted_transaction_cache.record_submitted_tx(
462                    tx.digest(),
463                    amplification_factor as u32,
464                    submitter_client_addr,
465                );
466            }
467        }
468
469        // If tx_consensus_positions channel is provided, the caller is looking for a
470        // consensus position for mfp. Therefore we will skip shortcutting submission
471        // if txes have already been processed.
472        let skip_processed_checks = tx_consensus_positions.is_some();
473
474        // Current code path ensures:
475        // - If transactions.len() > 1, it is a soft bundle. System transactions should have been submitted individually.
476        // - If is_soft_bundle, then all transactions are of CertifiedTransaction or UserTransaction kind.
477        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and transactions[0] can be of any kind.
478        let is_soft_bundle = transactions.len() > 1;
479
480        let mut transaction_keys = Vec::new();
481        let mut tx_consensus_positions = tx_consensus_positions;
482
483        for transaction in &transactions {
484            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
485                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
486                epoch_store.record_epoch_pending_certs_process_time_metric();
487            }
488
489            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
490            transaction_keys.push(transaction_key);
491        }
492        let tx_type = if is_soft_bundle {
493            "soft_bundle"
494        } else {
495            classify(&transactions[0])
496        };
497        tracing::Span::current().record("tx_type", tx_type);
498        tracing::Span::current().record("tx_keys", tracing::field::debug(&transaction_keys));
499
500        let mut guard = InflightDropGuard::acquire(&self, tx_type);
501
502        // Skip submission if the tx is already processed via consensus output
503        // or checkpoint state sync. `skip_processed_checks` is set by callers
504        // that need a consensus position (mfp/ping), so we must submit even
505        // if already processed.
506        let already_processed = if skip_processed_checks {
507            None
508        } else {
509            self.check_processed_via_consensus_or_checkpoint(&transaction_keys, epoch_store)
510        };
511        if let Some(method) = &already_processed {
512            guard.processed_method = *method;
513        }
514
515        // Log warnings for administrative transactions that fail to get sequenced
516        let _monitor = if matches!(
517            transactions[0].kind,
518            ConsensusTransactionKind::EndOfPublish(_)
519                | ConsensusTransactionKind::CapabilityNotification(_)
520                | ConsensusTransactionKind::CapabilityNotificationV2(_)
521                | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
522                | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
523        ) {
524            assert!(
525                !is_soft_bundle,
526                "System transactions should have been submitted individually"
527            );
528            let transaction_keys = transaction_keys.clone();
529            Some(CancelOnDrop(spawn_monitored_task!(async {
530                let mut i = 0u64;
531                loop {
532                    i += 1;
533                    const WARN_DELAY_S: u64 = 30;
534                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
535                    let total_wait = i * WARN_DELAY_S;
536                    warn!(
537                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
538                        total_wait, transaction_keys
539                    );
540                }
541            })))
542        } else {
543            None
544        };
545
546        if already_processed.is_none() {
547            debug!("Submitting {:?} to consensus", transaction_keys);
548            guard.submitted = true;
549
550            let _permit: SemaphorePermit = self
551                .submit_semaphore
552                .acquire()
553                .count_in_flight(self.metrics.sequencing_in_flight_semaphore_wait.clone())
554                .await
555                .expect("Consensus adapter does not close semaphore");
556            let _in_flight_submission_guard =
557                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
558
559            // Submit the transaction to consensus, racing against the processed waiter in
560            // case another validator sequences the transaction first.
561            let submit_fut = async {
562                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
563
564                loop {
565                    // Submit the transaction to consensus and return the submit result with a status waiter
566                    let (consensus_positions, status_waiter) = self
567                        .submit_inner(&transactions, epoch_store, &transaction_keys, tx_type)
568                        .await;
569
570                    if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
571                        tracing::Span::current().record(
572                            "consensus_positions",
573                            tracing::field::debug(&consensus_positions),
574                        );
575                        // We send the first consensus position returned by consensus
576                        // to the submitting client even if it is retried internally within
577                        // consensus adapter due to an error or GC. They can handle retries
578                        // as needed if the consensus position does not return the desired
579                        // results (e.g. not sequenced due to garbage collection).
580                        let _ = tx_consensus_positions.send(consensus_positions);
581                    }
582
583                    match status_waiter.await {
584                        Ok(status @ BlockStatus::Sequenced(_)) => {
585                            tracing::Span::current()
586                                .record("status", tracing::field::debug(&status));
587                            self.metrics
588                                .sequencing_certificate_status
589                                .with_label_values(&[tx_type, "sequenced"])
590                                .inc();
591                            // Block has been sequenced. Nothing more to do, we do have guarantees that the transaction will appear in consensus output.
592                            trace!(
593                                "Transaction {transaction_keys:?} has been sequenced by consensus."
594                            );
595                            break;
596                        }
597                        Ok(status @ BlockStatus::GarbageCollected(_)) => {
598                            tracing::Span::current()
599                                .record("status", tracing::field::debug(&status));
600                            self.metrics
601                                .sequencing_certificate_status
602                                .with_label_values(&[tx_type, "garbage_collected"])
603                                .inc();
604                            // Block has been garbage collected and we have no guarantees that the transaction will appear in consensus output. We'll
605                            // resubmit the transaction to consensus. If the transaction has been already "processed", then probably someone else has submitted
606                            // the transaction and managed to get sequenced. Then this future will have been cancelled anyways so no need to check here on the processed output.
607                            debug!(
608                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
609                            );
610                            time::sleep(RETRY_DELAY_STEP).await;
611                            continue;
612                        }
613                        Err(err) => {
614                            warn!(
615                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
616                                err
617                            );
618                            time::sleep(RETRY_DELAY_STEP).await;
619                            continue;
620                        }
621                    }
622                }
623            };
624
625            guard.processed_method = if skip_processed_checks {
626                // When getting consensus positions, we only care about submit_fut completing.
627                submit_fut.await;
628                ProcessedMethod::Consensus
629            } else {
630                // Race `processed_notify` against the submit loop. If the tx is
631                // processed via another path (consensus output from another
632                // validator's submission, or checkpoint state sync) while we're
633                // inside the submit loop, the submission future is dropped and
634                // the retry loop is cancelled cleanly.
635                let processed_waiter = self
636                    .processed_notify(transaction_keys.clone(), epoch_store)
637                    .boxed();
638                match select(processed_waiter, submit_fut.boxed()).await {
639                    Either::Left((observed, _submit_fut)) => observed,
640                    Either::Right(((), processed_waiter)) => {
641                        debug!("Submitted {transaction_keys:?} to consensus");
642                        processed_waiter.await
643                    }
644                }
645            };
646        }
647        debug!("{transaction_keys:?} processed by consensus");
648
649        // After a user transaction or soft bundle submission,
650        // send EndOfPublish if the epoch is closing.
651        // EndOfPublish can also be sent during consensus commit handling, checkpoint execution and recovery.
652        if transactions[0].is_user_transaction() && epoch_store.should_send_end_of_publish() {
653            // sending message outside of any locks scope
654            if let Err(err) = self.submit(
655                ConsensusTransaction::new_end_of_publish(self.authority),
656                None,
657                epoch_store,
658                None,
659                None,
660            ) {
661                warn!("Error when sending end of publish message: {:?}", err);
662            } else {
663                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
664            }
665        }
666        self.metrics
667            .sequencing_certificate_success
668            .with_label_values(&[tx_type])
669            .inc();
670    }
671
672    #[instrument(name = "ConsensusAdapter::submit_inner", level = "trace", skip_all)]
673    async fn submit_inner(
674        self: &Arc<Self>,
675        transactions: &[ConsensusTransaction],
676        epoch_store: &Arc<AuthorityPerEpochStore>,
677        transaction_keys: &[SequencedConsensusTransactionKey],
678        tx_type: &str,
679    ) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
680        let ack_start = Instant::now();
681        let mut retries: u32 = 0;
682        let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
683            Duration::from_millis(100),
684            Duration::from_secs(10),
685        );
686
687        let (consensus_positions, status_waiter) = loop {
688            let span = debug_span!("client_submit");
689            match self
690                .consensus_client
691                .submit(transactions, epoch_store)
692                .instrument(span)
693                .await
694            {
695                Err(err) => {
696                    // This can happen during reconfig, so keep retrying until succeed.
697                    if cfg!(msim) || retries > 3 {
698                        warn!(
699                            "Failed to submit transactions {transaction_keys:?} to consensus: {err}. Retry #{retries}"
700                        );
701                    }
702                    self.metrics
703                        .sequencing_certificate_failures
704                        .with_label_values(&[tx_type])
705                        .inc();
706                    retries += 1;
707
708                    time::sleep(backoff.next().unwrap()).await;
709                }
710                Ok((consensus_positions, status_waiter)) => {
711                    break (consensus_positions, status_waiter);
712                }
713            }
714        };
715
716        // we want to record the num of retries when reporting latency but to avoid label
717        // cardinality we do some simple bucketing to give us a good enough idea of how
718        // many retries happened associated with the latency.
719        let bucket = match retries {
720            0..=10 => retries.to_string(), // just report the retry count as is
721            11..=20 => "between_10_and_20".to_string(),
722            21..=50 => "between_20_and_50".to_string(),
723            51..=100 => "between_50_and_100".to_string(),
724            _ => "over_100".to_string(),
725        };
726
727        self.metrics
728            .sequencing_acknowledge_latency
729            .with_label_values(&[bucket.as_str(), tx_type])
730            .observe(ack_start.elapsed().as_secs_f64());
731
732        (consensus_positions, status_waiter)
733    }
734
735    /// Sync check for whether `transaction_keys` are already processed via
736    /// consensus output or checkpoint state sync. Returns `Some(method)` if
737    /// every key is already processed (Checkpoint dominates when any key was
738    /// processed via checkpoint or synced-checkpoint), else `None`.
739    ///
740    /// Also increments `sequencing_certificate_processed` with the matching
741    /// label for each key found processed, mirroring what `processed_notify`
742    /// emits for its async wake-ups.
743    fn check_processed_via_consensus_or_checkpoint(
744        self: &Arc<Self>,
745        transaction_keys: &[SequencedConsensusTransactionKey],
746        epoch_store: &Arc<AuthorityPerEpochStore>,
747    ) -> Option<ProcessedMethod> {
748        let mut seen_checkpoint = false;
749        for transaction_key in transaction_keys {
750            // Check consensus-processed first; if already visible in consensus
751            // output we don't need to submit again.
752            if epoch_store
753                .is_consensus_message_processed(transaction_key)
754                .expect("Storage error when checking consensus message processed")
755            {
756                self.metrics
757                    .sequencing_certificate_processed
758                    .with_label_values(&["consensus"])
759                    .inc();
760                continue;
761            }
762
763            // For a cert-shaped key, check whether state sync executed the tx
764            // via a checkpoint.
765            if let SequencedConsensusTransactionKey::External(ConsensusTransactionKey::Certificate(
766                digest,
767            )) = transaction_key
768                && epoch_store
769                    .is_transaction_executed_in_checkpoint(digest)
770                    .expect("Storage error when checking transaction executed in checkpoint")
771            {
772                self.metrics
773                    .sequencing_certificate_processed
774                    .with_label_values(&["checkpoint"])
775                    .inc();
776                seen_checkpoint = true;
777                continue;
778            }
779
780            // For a checkpoint-signature key, check whether a checkpoint at
781            // or above the target sequence number has already been synced —
782            // in which case the signature is redundant.
783            if let SequencedConsensusTransactionKey::External(
784                ConsensusTransactionKey::CheckpointSignature(_, seq)
785                | ConsensusTransactionKey::CheckpointSignatureV2(_, seq, _),
786            ) = transaction_key
787                && let Some(synced_seq) = self
788                    .checkpoint_store
789                    .get_highest_synced_checkpoint_seq_number()
790                    .expect("Storage error when reading highest synced checkpoint")
791                && synced_seq >= *seq
792            {
793                self.metrics
794                    .sequencing_certificate_processed
795                    .with_label_values(&["synced_checkpoint"])
796                    .inc();
797                seen_checkpoint = true;
798                continue;
799            }
800
801            // Not processed via any path — caller must submit.
802            return None;
803        }
804
805        if seen_checkpoint {
806            Some(ProcessedMethod::Checkpoint)
807        } else {
808            Some(ProcessedMethod::Consensus)
809        }
810    }
811
812    /// Async wait for any of `transaction_keys` to become processed via
813    /// consensus output or a checkpoint (either state-synced or executed
814    /// locally). Used in the in-flight race against submission: cancelling
815    /// the submit future when we learn the tx is processed by another path.
816    /// Returns `Checkpoint` if any key resolves via a checkpoint path, else
817    /// `Consensus`.
818    async fn processed_notify(
819        self: &Arc<Self>,
820        transaction_keys: Vec<SequencedConsensusTransactionKey>,
821        epoch_store: &Arc<AuthorityPerEpochStore>,
822    ) -> ProcessedMethod {
823        let notifications = FuturesUnordered::new();
824        for transaction_key in transaction_keys {
825            let transaction_digests = match transaction_key {
826                SequencedConsensusTransactionKey::External(
827                    ConsensusTransactionKey::Certificate(digest),
828                ) => vec![digest],
829                _ => vec![],
830            };
831
832            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
833                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number)
834                | ConsensusTransactionKey::CheckpointSignatureV2(_, checkpoint_sequence_number, _),
835            ) = transaction_key
836            {
837                // If the transaction is a checkpoint signature, we can also wait to get notified when a checkpoint with equal or higher sequence
838                // number has been already synced. This way we don't try to unnecessarily sequence the signature for an already verified checkpoint.
839                Either::Left(
840                    self.checkpoint_store
841                        .notify_read_synced_checkpoint(checkpoint_sequence_number),
842                )
843            } else {
844                Either::Right(future::pending())
845            };
846
847            // We wait for each transaction individually to be processed by consensus or executed in a checkpoint. We could equally just
848            // get notified in aggregate when all transactions are processed, but with this approach can get notified in a more fine-grained way
849            // as transactions can be marked as processed in different ways. This is mostly a concern for the soft-bundle transactions.
850            notifications.push(async move {
851                tokio::select! {
852                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
853                        processed.expect("Storage error when waiting for consensus message processed");
854                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
855                        return ProcessedMethod::Consensus;
856                    },
857                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
858                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
859                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
860                    }
861                    _ = checkpoint_synced_future => {
862                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
863                    }
864                }
865                ProcessedMethod::Checkpoint
866            });
867        }
868
869        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
870        for method in processed_methods {
871            if method == ProcessedMethod::Checkpoint {
872                return ProcessedMethod::Checkpoint;
873            }
874        }
875        ProcessedMethod::Consensus
876    }
877}
878
879impl ConsensusOverloadChecker for ConsensusAdapter {
880    fn check_consensus_overload(&self) -> SuiResult {
881        fp_ensure!(
882            self.check_limits(),
883            SuiErrorKind::TooManyTransactionsPendingConsensus.into()
884        );
885        Ok(())
886    }
887}
888
889pub struct NoopConsensusOverloadChecker {}
890
891impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
892    fn check_consensus_overload(&self) -> SuiResult {
893        Ok(())
894    }
895}
896
897impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
898    /// This method is called externally to begin reconfiguration
899    /// It sets reconfig state to reject new certificates from user.
900    /// ConsensusAdapter will send EndOfPublish message once pending certificate queue is drained.
901    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
902        {
903            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
904            if !reconfig_guard.should_accept_user_certs() {
905                // Allow caller to call this method multiple times
906                return;
907            }
908            epoch_store.close_user_certs(reconfig_guard);
909        }
910        if epoch_store.should_send_end_of_publish() {
911            if let Err(err) = self.submit(
912                ConsensusTransaction::new_end_of_publish(self.authority),
913                None,
914                epoch_store,
915                None,
916                None,
917            ) {
918                warn!("Error when sending end of publish message: {:?}", err);
919            } else {
920                info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
921            }
922        }
923    }
924}
925
926struct CancelOnDrop<T>(JoinHandle<T>);
927
928impl<T> Deref for CancelOnDrop<T> {
929    type Target = JoinHandle<T>;
930
931    fn deref(&self) -> &Self::Target {
932        &self.0
933    }
934}
935
936impl<T> Drop for CancelOnDrop<T> {
937    fn drop(&mut self) {
938        self.0.abort();
939    }
940}
941
942/// Tracks number of inflight consensus requests and relevant metrics
943struct InflightDropGuard<'a> {
944    adapter: &'a ConsensusAdapter,
945    start: Instant,
946    submitted: bool,
947    tx_type: &'static str,
948    processed_method: ProcessedMethod,
949}
950
951#[derive(Copy, Clone, PartialEq, Eq)]
952enum ProcessedMethod {
953    Consensus,
954    Checkpoint,
955}
956
957impl<'a> InflightDropGuard<'a> {
958    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
959        adapter
960            .num_inflight_transactions
961            .fetch_add(1, Ordering::SeqCst);
962        adapter
963            .metrics
964            .sequencing_certificate_inflight
965            .with_label_values(&[tx_type])
966            .inc();
967        adapter
968            .metrics
969            .sequencing_certificate_attempt
970            .with_label_values(&[tx_type])
971            .inc();
972        Self {
973            adapter,
974            start: Instant::now(),
975            submitted: false,
976            tx_type,
977            processed_method: ProcessedMethod::Consensus,
978        }
979    }
980}
981
982impl Drop for InflightDropGuard<'_> {
983    fn drop(&mut self) {
984        self.adapter
985            .num_inflight_transactions
986            .fetch_sub(1, Ordering::SeqCst);
987        self.adapter
988            .metrics
989            .sequencing_certificate_inflight
990            .with_label_values(&[self.tx_type])
991            .dec();
992        // Wake the admission queue drainer so it can submit more transactions.
993        self.adapter.inflight_slot_freed_notify.notify_one();
994
995        let latency = self.start.elapsed();
996        let processed_method = match self.processed_method {
997            ProcessedMethod::Consensus => "processed_via_consensus",
998            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
999        };
1000        let submitted = if self.submitted {
1001            "submitted"
1002        } else {
1003            "skipped"
1004        };
1005
1006        self.adapter
1007            .metrics
1008            .sequencing_certificate_latency
1009            .with_label_values(&[submitted, self.tx_type, processed_method])
1010            .observe(latency.as_secs_f64());
1011    }
1012}
1013
1014impl SubmitToConsensus for Arc<ConsensusAdapter> {
1015    fn submit_to_consensus(
1016        &self,
1017        transactions: &[ConsensusTransaction],
1018        epoch_store: &Arc<AuthorityPerEpochStore>,
1019    ) -> SuiResult {
1020        self.submit_batch(transactions, None, epoch_store, None, None)
1021            .map(|_| ())
1022    }
1023
1024    fn submit_best_effort(
1025        &self,
1026        transaction: &ConsensusTransaction,
1027        epoch_store: &Arc<AuthorityPerEpochStore>,
1028        // timeout is required, or the spawned task can run forever
1029        timeout: Duration,
1030    ) -> SuiResult {
1031        let permit = match self.submit_semaphore.clone().try_acquire_owned() {
1032            Ok(permit) => permit,
1033            Err(_) => {
1034                return Err(SuiErrorKind::TooManyTransactionsPendingConsensus.into());
1035            }
1036        };
1037
1038        let _in_flight_submission_guard =
1039            GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
1040
1041        let key = SequencedConsensusTransactionKey::External(transaction.key());
1042        let tx_type = classify(transaction);
1043
1044        let async_stage = {
1045            let transaction = transaction.clone();
1046            let epoch_store = epoch_store.clone();
1047            let this = self.clone();
1048
1049            async move {
1050                let _permit = permit; // Hold permit for lifetime of task
1051
1052                let result = tokio::time::timeout(
1053                    timeout,
1054                    this.submit_inner(&[transaction], &epoch_store, &[key], tx_type),
1055                )
1056                .await;
1057
1058                if let Err(e) = result {
1059                    warn!("Consensus submission timed out: {e:?}");
1060                    this.metrics
1061                        .sequencing_best_effort_timeout
1062                        .with_label_values(&[tx_type])
1063                        .inc();
1064                }
1065            }
1066        };
1067
1068        let epoch_store = epoch_store.clone();
1069        spawn_monitored_task!(epoch_store.within_alive_epoch(async_stage));
1070        Ok(())
1071    }
1072}