use std::{sync::Arc, time::Duration};
use arc_swap::{ArcSwapOption, Guard};
use consensus_core::TransactionClient;
use sui_types::{
error::{SuiError, SuiResult},
messages_consensus::ConsensusTransaction,
};
use tap::prelude::*;
use tokio::time::{sleep, timeout};
use tracing::warn;
use crate::{
authority::authority_per_epoch_store::AuthorityPerEpochStore,
consensus_adapter::SubmitToConsensus,
};
#[derive(Default, Clone)]
pub struct LazyMysticetiClient {
client: Arc<ArcSwapOption<TransactionClient>>,
}
impl LazyMysticetiClient {
pub fn new() -> Self {
Self {
client: Arc::new(ArcSwapOption::empty()),
}
}
async fn get(&self) -> Guard<Option<Arc<TransactionClient>>> {
let client = self.client.load();
if client.is_some() {
return client;
}
const MYSTICETI_START_TIMEOUT: Duration = Duration::from_secs(30);
const LOAD_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
if let Ok(client) = timeout(MYSTICETI_START_TIMEOUT, async {
loop {
let client = self.client.load();
if client.is_some() {
return client;
} else {
sleep(LOAD_RETRY_TIMEOUT).await;
}
}
})
.await
{
return client;
}
panic!(
"Timed out after {:?} waiting for Mysticeti to start!",
MYSTICETI_START_TIMEOUT,
);
}
pub fn set(&self, client: Arc<TransactionClient>) {
self.client.store(Some(client));
}
}
#[async_trait::async_trait]
impl SubmitToConsensus for LazyMysticetiClient {
async fn submit_to_consensus(
&self,
transaction: &ConsensusTransaction,
_epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
let client = self.get().await;
let tx_bytes = bcs::to_bytes(&transaction).expect("Serialization should not fail.");
client
.as_ref()
.expect("Client should always be returned")
.submit(tx_bytes)
.await
.tap_err(|r| {
warn!("Submit transaction failed with: {:?}", r);
})
.map_err(|err| SuiError::FailedToSubmitToConsensus(err.to_string()))?;
Ok(())
}
}