use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use dashmap::try_result::TryResult;
use dashmap::DashMap;
use futures::future::{select, Either};
use futures::pin_mut;
use futures::FutureExt;
use itertools::Itertools;
use narwhal_types::{TransactionProto, TransactionsClient};
use narwhal_worker::LazyNarwhalClient;
use parking_lot::RwLockReadGuard;
use prometheus::Histogram;
use prometheus::HistogramVec;
use prometheus::IntCounterVec;
use prometheus::IntGauge;
use prometheus::IntGaugeVec;
use prometheus::Registry;
use prometheus::{
register_histogram_vec_with_registry, register_histogram_with_registry,
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
register_int_gauge_with_registry,
};
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::collections::HashMap;
use std::future::Future;
use std::ops::Deref;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;
use sui_types::base_types::TransactionDigest;
use sui_types::committee::{Committee, CommitteeTrait};
use sui_types::error::{SuiError, SuiResult};
use tap::prelude::*;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio::task::JoinHandle;
use tokio::time::{self};
use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::consensus_handler::{classify, SequencedConsensusTransactionKey};
use crate::consensus_throughput_calculator::{ConsensusThroughputProfiler, Level};
use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
use crate::metrics::LatencyObserver;
use mysten_metrics::{spawn_monitored_task, GaugeGuard, GaugeGuardFutureExt};
use sui_protocol_config::ProtocolConfig;
use sui_simulator::anemo::PeerId;
use sui_simulator::narwhal_network::connectivity::ConnectionStatus;
use sui_types::base_types::AuthorityName;
use sui_types::fp_ensure;
use sui_types::messages_consensus::ConsensusTransaction;
use sui_types::messages_consensus::ConsensusTransactionKind;
use tokio::time::Duration;
use tracing::{debug, info, warn};
#[cfg(test)]
#[path = "unit_tests/consensus_tests.rs"]
pub mod consensus_tests;
const SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS: &[f64] = &[
0.1, 0.25, 0.5, 0.75, 1., 1.25, 1.5, 1.75, 2., 2.25, 2.5, 2.75, 3., 4., 5., 6., 7., 10., 15.,
20., 25., 30., 60.,
];
const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[0., 1., 2., 3., 5., 10.];
pub struct ConsensusAdapterMetrics {
pub sequencing_certificate_attempt: IntCounterVec,
pub sequencing_certificate_success: IntCounterVec,
pub sequencing_certificate_failures: IntCounterVec,
pub sequencing_certificate_inflight: IntGaugeVec,
pub sequencing_acknowledge_latency: mysten_metrics::histogram::HistogramVec,
pub sequencing_certificate_latency: HistogramVec,
pub sequencing_certificate_authority_position: Histogram,
pub sequencing_certificate_positions_moved: Histogram,
pub sequencing_certificate_preceding_disconnected: Histogram,
pub sequencing_in_flight_semaphore_wait: IntGauge,
pub sequencing_in_flight_submissions: IntGauge,
pub sequencing_estimated_latency: IntGauge,
pub sequencing_resubmission_interval_ms: IntGauge,
}
impl ConsensusAdapterMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
"sequencing_certificate_attempt",
"Counts the number of certificates the validator attempts to sequence.",
&["tx_type"],
registry,
)
.unwrap(),
sequencing_certificate_success: register_int_counter_vec_with_registry!(
"sequencing_certificate_success",
"Counts the number of successfully sequenced certificates.",
&["tx_type"],
registry,
)
.unwrap(),
sequencing_certificate_failures: register_int_counter_vec_with_registry!(
"sequencing_certificate_failures",
"Counts the number of sequenced certificates that failed other than by timeout.",
&["tx_type"],
registry,
)
.unwrap(),
sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
"sequencing_certificate_inflight",
"The inflight requests to sequence certificates.",
&["tx_type"],
registry,
)
.unwrap(),
sequencing_acknowledge_latency: mysten_metrics::histogram::HistogramVec::new_in_registry(
"sequencing_acknowledge_latency",
"The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
&["retry", "tx_type"],
registry,
),
sequencing_certificate_latency: register_histogram_vec_with_registry!(
"sequencing_certificate_latency",
"The latency for sequencing a certificate.",
&["position", "tx_type"],
SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
sequencing_certificate_authority_position: register_histogram_with_registry!(
"sequencing_certificate_authority_position",
"The position of the authority when submitted a certificate to consensus.",
SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
registry,
).unwrap(),
sequencing_certificate_positions_moved: register_histogram_with_registry!(
"sequencing_certificate_positions_moved",
"The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
registry,
).unwrap(),
sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
"sequencing_certificate_preceding_disconnected",
"The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
registry,
).unwrap(),
sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
"sequencing_in_flight_semaphore_wait",
"How many requests are blocked on submit_permit.",
registry,
)
.unwrap(),
sequencing_in_flight_submissions: register_int_gauge_with_registry!(
"sequencing_in_flight_submissions",
"Number of transactions submitted to local narwhal instance and not yet sequenced",
registry,
)
.unwrap(),
sequencing_estimated_latency: register_int_gauge_with_registry!(
"sequencing_estimated_latency",
"Consensus latency estimated by consensus adapter in milliseconds",
registry,
)
.unwrap(),
sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
"sequencing_resubmission_interval_ms",
"Resubmission interval used by consensus adapter in milliseconds",
registry,
)
.unwrap(),
}
}
pub fn new_test() -> Self {
Self::new(&Registry::default())
}
}
#[mockall::automock]
#[async_trait::async_trait]
pub trait SubmitToConsensus: Sync + Send + 'static {
async fn submit_to_consensus(
&self,
transaction: &ConsensusTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult;
}
#[async_trait::async_trait]
impl SubmitToConsensus for TransactionsClient<sui_network::tonic::transport::Channel> {
async fn submit_to_consensus(
&self,
transaction: &ConsensusTransaction,
_epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
let serialized =
bcs::to_bytes(transaction).expect("Serializing consensus transaction cannot fail");
let bytes = Bytes::from(serialized.clone());
self.clone()
.submit_transaction(TransactionProto { transaction: bytes })
.await
.map_err(|e| SuiError::ConsensusConnectionBroken(format!("{:?}", e)))
.tap_err(|r| {
warn!("Submit transaction failed with: {:?}", r);
})?;
Ok(())
}
}
#[async_trait::async_trait]
impl SubmitToConsensus for LazyNarwhalClient {
async fn submit_to_consensus(
&self,
transaction: &ConsensusTransaction,
_epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
let transaction =
bcs::to_bytes(transaction).expect("Serializing consensus transaction cannot fail");
let client = {
let c = self.client.load();
if c.is_some() {
c
} else {
self.client.store(Some(self.get().await));
self.client.load()
}
};
let client = client.as_ref().unwrap().load();
client
.submit_transaction(transaction)
.await
.map_err(|e| SuiError::FailedToSubmitToConsensus(format!("{:?}", e)))
.tap_err(|r| {
warn!("Submit transaction failed with: {:?}", r);
})?;
Ok(())
}
}
pub struct ConsensusAdapter {
consensus_client: Arc<dyn SubmitToConsensus>,
authority: AuthorityName,
max_pending_transactions: usize,
num_inflight_transactions: AtomicU64,
max_submit_position: Option<usize>,
submit_delay_step_override: Option<Duration>,
connection_monitor_status: Arc<dyn CheckConnection>,
low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
consensus_throughput_profiler: ArcSwapOption<ConsensusThroughputProfiler>,
metrics: ConsensusAdapterMetrics,
submit_semaphore: Semaphore,
latency_observer: LatencyObserver,
protocol_config: ProtocolConfig,
}
pub trait CheckConnection: Send + Sync {
fn check_connection(
&self,
ourself: &AuthorityName,
authority: &AuthorityName,
) -> Option<ConnectionStatus>;
fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
}
pub struct ConnectionMonitorStatus {
pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
}
pub struct ConnectionMonitorStatusForTests {}
impl ConsensusAdapter {
pub fn new(
consensus_client: Arc<dyn SubmitToConsensus>,
authority: AuthorityName,
connection_monitor_status: Arc<dyn CheckConnection>,
max_pending_transactions: usize,
max_pending_local_submissions: usize,
max_submit_position: Option<usize>,
submit_delay_step_override: Option<Duration>,
metrics: ConsensusAdapterMetrics,
protocol_config: ProtocolConfig,
) -> Self {
let num_inflight_transactions = Default::default();
let low_scoring_authorities =
ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
Self {
consensus_client,
authority,
max_pending_transactions,
max_submit_position,
submit_delay_step_override,
num_inflight_transactions,
connection_monitor_status,
low_scoring_authorities,
metrics,
submit_semaphore: Semaphore::new(max_pending_local_submissions),
latency_observer: LatencyObserver::new(),
consensus_throughput_profiler: ArcSwapOption::empty(),
protocol_config,
}
}
pub fn swap_low_scoring_authorities(
&self,
new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
) {
self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
}
pub fn swap_throughput_profiler(&self, profiler: Arc<ConsensusThroughputProfiler>) {
self.consensus_throughput_profiler.store(Some(profiler))
}
pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
let mut recovered = epoch_store.get_all_pending_consensus_transactions();
#[allow(clippy::collapsible_if)] if epoch_store
.get_reconfig_state_read_lock_guard()
.is_reject_user_certs()
&& epoch_store.pending_consensus_certificates_empty()
{
if recovered
.iter()
.any(ConsensusTransaction::is_end_of_publish)
{
recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
}
}
debug!(
"Submitting {:?} recovered pending consensus transactions to Narwhal",
recovered.len()
);
for transaction in recovered {
self.submit_unchecked(transaction, epoch_store);
}
}
fn await_submit_delay(
&self,
committee: &Committee,
transaction: &ConsensusTransaction,
) -> (impl Future<Output = ()>, usize, usize, usize) {
let (duration, position, positions_moved, preceding_disconnected) = match &transaction.kind
{
ConsensusTransactionKind::UserTransaction(certificate) => {
self.await_submit_delay_user_transaction(committee, certificate.digest())
}
_ => (Duration::ZERO, 0, 0, 0),
};
(
tokio::time::sleep(duration),
position,
positions_moved,
preceding_disconnected,
)
}
fn await_submit_delay_user_transaction(
&self,
committee: &Committee,
tx_digest: &TransactionDigest,
) -> (Duration, usize, usize, usize) {
let (position, positions_moved, preceding_disconnected) =
self.submission_position(committee, tx_digest);
const MAX_LATENCY: Duration = Duration::from_secs(5 * 60);
const DEFAULT_LATENCY: Duration = Duration::from_secs(3); let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
self.metrics
.sequencing_estimated_latency
.set(latency.as_millis() as i64);
let latency = std::cmp::max(latency, DEFAULT_LATENCY);
let latency = std::cmp::min(latency, MAX_LATENCY);
let latency = latency * 2;
let latency = self.override_by_throughput_profiler(position, latency);
let (delay_step, position) =
self.override_by_max_submit_position_settings(latency, position);
self.metrics
.sequencing_resubmission_interval_ms
.set(delay_step.as_millis() as i64);
(
delay_step * position as u32,
position,
positions_moved,
preceding_disconnected,
)
}
fn override_by_throughput_profiler(&self, position: usize, latency: Duration) -> Duration {
const LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 0;
const MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 2_500;
const HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS: u64 = 3_500;
let p = self.consensus_throughput_profiler.load();
if let Some(profiler) = p.as_ref() {
let (level, _) = profiler.throughput_level();
if self.protocol_config.throughput_aware_consensus_submission() && position == 1 {
return match level {
Level::Low => Duration::from_millis(LOW_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS),
Level::Medium => {
Duration::from_millis(MEDIUM_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS)
}
Level::High => {
let l = Duration::from_millis(HIGH_THROUGHPUT_DELAY_BEFORE_SUBMIT_MS);
if latency >= 2 * l {
latency
} else {
l
}
}
};
}
}
latency
}
fn override_by_max_submit_position_settings(
&self,
latency: Duration,
mut position: usize,
) -> (Duration, usize) {
if let Some(max_submit_position) = self.max_submit_position {
position = std::cmp::min(position, max_submit_position);
}
let delay_step = self.submit_delay_step_override.unwrap_or(latency);
(delay_step, position)
}
fn submission_position(
&self,
committee: &Committee,
tx_digest: &TransactionDigest,
) -> (usize, usize, usize) {
let positions = order_validators_for_submission(committee, tx_digest);
self.check_submission_wrt_connectivity_and_scores(positions)
}
fn check_submission_wrt_connectivity_and_scores(
&self,
positions: Vec<AuthorityName>,
) -> (usize, usize, usize) {
let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
if low_scoring_authorities.get(&self.authority).is_some() {
return (positions.len(), 0, 0);
}
let initial_position = get_position_in_list(self.authority, positions.clone());
let mut preceding_disconnected = 0;
let mut before_our_position = true;
let filtered_positions: Vec<_> = positions
.into_iter()
.filter(|authority| {
let keep = self.authority == *authority; if keep {
before_our_position = false;
}
let connected = self
.connection_monitor_status
.check_connection(&self.authority, authority)
.unwrap_or(ConnectionStatus::Disconnected)
== ConnectionStatus::Connected;
if !connected && before_our_position {
preceding_disconnected += 1; }
let high_scoring = low_scoring_authorities.get(authority).is_none();
keep || (connected && high_scoring)
})
.collect();
let position = get_position_in_list(self.authority, filtered_positions);
(
position,
initial_position - position,
preceding_disconnected,
)
}
pub fn submit(
self: &Arc<Self>,
transaction: ConsensusTransaction,
lock: Option<&RwLockReadGuard<ReconfigState>>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<JoinHandle<()>> {
epoch_store.insert_pending_consensus_transactions(&transaction, lock)?;
Ok(self.submit_unchecked(transaction, epoch_store))
}
pub fn check_limits(&self) -> bool {
if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
> self.max_pending_transactions
{
return false;
}
self.submit_semaphore.available_permits() > 0
}
pub(crate) fn check_consensus_overload(&self) -> SuiResult {
fp_ensure!(
self.check_limits(),
SuiError::TooManyTransactionsPendingConsensus
);
Ok(())
}
fn submit_unchecked(
self: &Arc<Self>,
transaction: ConsensusTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> JoinHandle<()> {
let async_stage = self
.clone()
.submit_and_wait(transaction, epoch_store.clone());
let join_handle = spawn_monitored_task!(async_stage);
join_handle
}
async fn submit_and_wait(
self: Arc<Self>,
transaction: ConsensusTransaction,
epoch_store: Arc<AuthorityPerEpochStore>,
) {
epoch_store
.within_alive_epoch(self.submit_and_wait_inner(transaction, &epoch_store))
.await
.ok(); }
#[allow(clippy::option_map_unit_fn)]
async fn submit_and_wait_inner(
self: Arc<Self>,
transaction: ConsensusTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to Narwhal");
epoch_store.record_epoch_pending_certs_process_time_metric();
}
let tx_type = classify(&transaction);
let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
let processed_waiter = epoch_store
.consensus_message_processed_notify(transaction_key)
.boxed();
pin_mut!(processed_waiter);
let (await_submit, position, positions_moved, preceding_disconnected) =
self.await_submit_delay(epoch_store.committee(), &transaction);
let mut guard = InflightDropGuard::acquire(&self, tx_type.to_string());
let processed_waiter = tokio::select! {
_ = await_submit => Some(processed_waiter),
_ = epoch_store.user_certs_closed_notify() => {
warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
Some(processed_waiter)
}
processed = &mut processed_waiter => {
processed.expect("Storage error when waiting for consensus message processed");
None
}
};
let transaction_key = transaction.key();
let _monitor = if matches!(
transaction.kind,
ConsensusTransactionKind::EndOfPublish(_)
| ConsensusTransactionKind::CapabilityNotification(_)
| ConsensusTransactionKind::RandomnessDkgMessage(_, _)
| ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
) {
let transaction_key = transaction_key.clone();
Some(CancelOnDrop(spawn_monitored_task!(async {
let mut i = 0u64;
loop {
i += 1;
const WARN_DELAY_S: u64 = 30;
tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
let total_wait = i * WARN_DELAY_S;
warn!(
"Still waiting {} seconds for transaction {:?} to commit in narwhal",
total_wait, transaction_key
);
}
})))
} else {
None
};
if let Some(processed_waiter) = processed_waiter {
debug!("Submitting {:?} to consensus", transaction_key);
guard.position = Some(position);
guard.positions_moved = Some(positions_moved);
guard.preceding_disconnected = Some(preceding_disconnected);
let _permit: SemaphorePermit = self
.submit_semaphore
.acquire()
.count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
.await
.expect("Consensus adapter does not close semaphore");
let _in_flight_submission_guard =
GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
let submit_inner = async {
let ack_start = Instant::now();
let mut retries: u32 = 0;
while let Err(e) = self
.consensus_client
.submit_to_consensus(&transaction, epoch_store)
.await
{
if retries > 30 || (retries > 3 && !transaction.kind.is_dkg()) {
warn!(
"Failed to submit transaction {transaction_key:?} to own narwhal worker: {e:?}. Retry #{retries}"
);
}
self.metrics
.sequencing_certificate_failures
.with_label_values(&[tx_type])
.inc();
retries += 1;
if transaction.kind.is_dkg() {
time::sleep(Duration::from_millis(100)).await;
} else {
time::sleep(Duration::from_secs(10)).await;
};
}
let bucket = match retries {
0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
21..=50 => "between_20_and_50".to_string(),
51..=100 => "between_50_and_100".to_string(),
_ => "over_100".to_string(),
};
self.metrics
.sequencing_acknowledge_latency
.with_label_values(&[&bucket, tx_type])
.report(ack_start.elapsed().as_millis() as u64);
};
match select(processed_waiter, submit_inner.boxed()).await {
Either::Left((processed, _submit_inner)) => processed,
Either::Right(((), processed_waiter)) => {
debug!("Submitted {transaction_key:?} to consensus");
processed_waiter.await
}
}
.expect("Storage error when waiting for consensus message processed");
}
debug!("{transaction_key:?} processed by consensus");
epoch_store
.remove_pending_consensus_transaction(&transaction.key())
.expect("Storage error when removing consensus transaction");
let send_end_of_publish = if let ConsensusTransactionKind::UserTransaction(_cert) =
&transaction.kind
{
if epoch_store
.get_reconfig_state_read_lock_guard()
.is_reject_user_certs()
{
let pending_count = epoch_store.pending_consensus_certificates_count();
debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
pending_count == 0 } else {
false
}
} else {
false
};
if send_end_of_publish {
if let Err(err) = self.submit(
ConsensusTransaction::new_end_of_publish(self.authority),
None,
epoch_store,
) {
warn!("Error when sending end of publish message: {:?}", err);
}
}
self.metrics
.sequencing_certificate_success
.with_label_values(&[tx_type])
.inc();
}
}
impl CheckConnection for ConnectionMonitorStatus {
fn check_connection(
&self,
ourself: &AuthorityName,
authority: &AuthorityName,
) -> Option<ConnectionStatus> {
if ourself == authority {
return Some(ConnectionStatus::Connected);
}
let mapping = self.authority_names_to_peer_ids.load_full();
let peer_id = match mapping.get(authority) {
Some(p) => p,
None => {
warn!(
"failed to find peer {:?} in connection monitor listener",
authority
);
return None;
}
};
let res = match self.connection_statuses.try_get(peer_id) {
TryResult::Present(c) => Some(c.value().clone()),
TryResult::Absent => None,
TryResult::Locked => {
Some(ConnectionStatus::Disconnected)
}
};
res
}
fn update_mapping_for_epoch(
&self,
authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
) {
self.authority_names_to_peer_ids
.swap(Arc::new(authority_names_to_peer_ids));
}
}
impl CheckConnection for ConnectionMonitorStatusForTests {
fn check_connection(
&self,
_ourself: &AuthorityName,
_authority: &AuthorityName,
) -> Option<ConnectionStatus> {
Some(ConnectionStatus::Connected)
}
fn update_mapping_for_epoch(
&self,
_authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
) {
}
}
pub fn get_position_in_list(
search_authority: AuthorityName,
positions: Vec<AuthorityName>,
) -> usize {
positions
.into_iter()
.find_position(|authority| *authority == search_authority)
.expect("Couldn't find ourselves in shuffled committee")
.0
}
pub fn order_validators_for_submission(
committee: &Committee,
tx_digest: &TransactionDigest,
) -> Vec<AuthorityName> {
let digest_bytes = tx_digest.into_inner();
let mut rng = StdRng::from_seed(digest_bytes);
committee.shuffle_by_stake_with_rng(None, None, &mut rng)
}
impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
let send_end_of_publish = {
let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
if !reconfig_guard.should_accept_user_certs() {
return;
}
let pending_count = epoch_store.pending_consensus_certificates_count();
debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
let send_end_of_publish = pending_count == 0;
epoch_store.close_user_certs(reconfig_guard);
send_end_of_publish
};
if send_end_of_publish {
if let Err(err) = self.submit(
ConsensusTransaction::new_end_of_publish(self.authority),
None,
epoch_store,
) {
warn!("Error when sending end of publish message: {:?}", err);
}
}
}
}
struct CancelOnDrop<T>(JoinHandle<T>);
impl<T> Deref for CancelOnDrop<T> {
type Target = JoinHandle<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> Drop for CancelOnDrop<T> {
fn drop(&mut self) {
self.0.abort();
}
}
struct InflightDropGuard<'a> {
adapter: &'a ConsensusAdapter,
start: Instant,
position: Option<usize>,
positions_moved: Option<usize>,
preceding_disconnected: Option<usize>,
tx_type: String,
}
impl<'a> InflightDropGuard<'a> {
pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: String) -> Self {
let inflight = adapter
.num_inflight_transactions
.fetch_add(1, Ordering::SeqCst);
adapter
.metrics
.sequencing_certificate_attempt
.with_label_values(&[&tx_type])
.inc();
adapter
.metrics
.sequencing_certificate_inflight
.with_label_values(&[&tx_type])
.set(inflight as i64);
Self {
adapter,
start: Instant::now(),
position: None,
positions_moved: None,
preceding_disconnected: None,
tx_type,
}
}
}
impl<'a> Drop for InflightDropGuard<'a> {
fn drop(&mut self) {
let inflight = self
.adapter
.num_inflight_transactions
.fetch_sub(1, Ordering::SeqCst);
self.adapter
.metrics
.sequencing_certificate_inflight
.with_label_values(&[&self.tx_type])
.set(inflight as i64);
let position = if let Some(position) = self.position {
self.adapter
.metrics
.sequencing_certificate_authority_position
.observe(position as f64);
position.to_string()
} else {
"not_submitted".to_string()
};
if let Some(positions_moved) = self.positions_moved {
self.adapter
.metrics
.sequencing_certificate_positions_moved
.observe(positions_moved as f64);
};
if let Some(preceding_disconnected) = self.preceding_disconnected {
self.adapter
.metrics
.sequencing_certificate_preceding_disconnected
.observe(preceding_disconnected as f64);
};
let latency = self.start.elapsed();
if self.position == Some(0) {
self.adapter.latency_observer.report(latency);
}
self.adapter
.metrics
.sequencing_certificate_latency
.with_label_values(&[&position, &self.tx_type])
.observe(latency.as_secs_f64());
}
}
#[async_trait::async_trait]
impl SubmitToConsensus for Arc<ConsensusAdapter> {
async fn submit_to_consensus(
&self,
transaction: &ConsensusTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
self.submit(transaction.clone(), None, epoch_store)
.map(|_| ())
}
}
pub fn position_submit_certificate(
committee: &Committee,
ourselves: &AuthorityName,
tx_digest: &TransactionDigest,
) -> usize {
let validators = order_validators_for_submission(committee, tx_digest);
get_position_in_list(*ourselves, validators)
}
#[cfg(test)]
mod adapter_tests {
use super::position_submit_certificate;
use crate::consensus_adapter::{
ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
LazyNarwhalClient,
};
use fastcrypto::traits::KeyPair;
use rand::Rng;
use rand::{rngs::StdRng, SeedableRng};
use std::sync::Arc;
use std::time::Duration;
use sui_types::{
base_types::TransactionDigest,
committee::Committee,
crypto::{get_key_pair_from_rng, AuthorityKeyPair, AuthorityPublicKeyBytes},
};
fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
let authorities = (0..size)
.map(|_k| {
(
AuthorityPublicKeyBytes::from(
get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
),
rng.gen_range(0u64..10u64),
)
})
.collect::<Vec<_>>();
Committee::new_for_testing_with_normalized_voting_power(
0,
authorities.iter().cloned().collect(),
)
}
#[tokio::test]
async fn test_await_submit_delay_user_transaction() {
let mut rng = StdRng::from_seed([0; 32]);
let committee = test_committee(&mut rng, 10);
let consensus_adapter = ConsensusAdapter::new(
Arc::new(LazyNarwhalClient::new(
"/ip4/127.0.0.1/tcp/0/http".parse().unwrap(),
)),
*committee.authority_by_index(0).unwrap(),
Arc::new(ConnectionMonitorStatusForTests {}),
100_000,
100_000,
Some(1),
Some(Duration::from_secs(2)),
ConsensusAdapterMetrics::new_test(),
sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
);
let tx_digest = TransactionDigest::generate(&mut rng);
let (position, positions_moved, _) =
consensus_adapter.submission_position(&committee, &tx_digest);
assert_eq!(position, 7);
assert!(!positions_moved > 0);
let (delay_step, position, positions_moved, _) =
consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
assert_eq!(position, 1);
assert_eq!(delay_step, Duration::from_secs(2));
assert!(!positions_moved > 0);
let consensus_adapter = ConsensusAdapter::new(
Arc::new(LazyNarwhalClient::new(
"/ip4/127.0.0.1/tcp/0/http".parse().unwrap(),
)),
*committee.authority_by_index(0).unwrap(),
Arc::new(ConnectionMonitorStatusForTests {}),
100_000,
100_000,
None,
None,
ConsensusAdapterMetrics::new_test(),
sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
);
let (delay_step, position, positions_moved, _) =
consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
assert_eq!(position, 7);
assert_eq!(delay_step, Duration::from_secs(42));
assert!(!positions_moved > 0);
}
#[test]
fn test_position_submit_certificate() {
let mut rng = StdRng::from_seed([0; 32]);
let committee = test_committee(&mut rng, 10);
const NUM_TEST_TRANSACTIONS: usize = 1000;
for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
let tx_digest = TransactionDigest::generate(&mut rng);
let mut zero_found = false;
for (name, _) in committee.members() {
let f = position_submit_certificate(&committee, name, &tx_digest);
assert!(f < committee.num_members());
if f == 0 {
assert!(!zero_found);
zero_found = true;
}
}
assert!(zero_found);
}
}
}