sui_core/
mysticeti_adapter.rs1use std::{sync::Arc, time::Duration};
5
6use arc_swap::{ArcSwapOption, Guard};
7use consensus_core::{ClientError, TransactionClient};
8use sui_types::{
9 error::{SuiErrorKind, SuiResult},
10 messages_consensus::{ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind},
11};
12use tap::prelude::*;
13use tokio::time::{Instant, sleep};
14use tracing::{error, info, warn};
15
16use crate::{
17 authority::authority_per_epoch_store::AuthorityPerEpochStore,
18 consensus_adapter::{BlockStatusReceiver, ConsensusClient},
19 consensus_handler::SequencedConsensusTransactionKey,
20};
21
22#[derive(Default, Clone)]
27pub struct LazyMysticetiClient {
28 client: Arc<ArcSwapOption<TransactionClient>>,
29}
30
31impl LazyMysticetiClient {
32 pub fn new() -> Self {
33 Self {
34 client: Arc::new(ArcSwapOption::empty()),
35 }
36 }
37
38 async fn get(&self) -> Guard<Option<Arc<TransactionClient>>> {
39 let client = self.client.load();
40 if client.is_some() {
41 return client;
42 }
43
44 let mut count = 0;
49 let start = Instant::now();
50 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
51 loop {
52 let client = self.client.load();
53 if client.is_some() {
54 return client;
55 } else {
56 sleep(RETRY_INTERVAL).await;
57 count += 1;
58 if count % 100 == 0 {
59 warn!(
60 "Waiting for consensus to initialize after {:?}",
61 Instant::now() - start
62 );
63 }
64 }
65 }
66 }
67
68 pub fn set(&self, client: Arc<TransactionClient>) {
69 self.client.store(Some(client));
70 }
71
72 pub fn clear(&self) {
73 self.client.store(None);
74 }
75}
76
77#[async_trait::async_trait]
78impl ConsensusClient for LazyMysticetiClient {
79 async fn submit(
80 &self,
81 transactions: &[ConsensusTransaction],
82 _epoch_store: &Arc<AuthorityPerEpochStore>,
83 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
84 let client_guard = self.get().await;
88 let client = client_guard
89 .as_ref()
90 .expect("Client should always be returned");
91 let transactions_bytes = transactions
92 .iter()
93 .map(|t| bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail"))
94 .collect::<Vec<_>>();
95 let (block_ref, tx_indices, status_waiter) = client
96 .submit(transactions_bytes)
97 .await
98 .tap_err(|err| {
99 let msg = format!("Transaction submission failed with: {:?}", err);
101 match err {
102 ClientError::ConsensusShuttingDown(_) => {
103 info!("{}", msg);
104 }
105 ClientError::OversizedTransaction(_, _)
106 | ClientError::OversizedTransactionBundleBytes(_, _)
107 | ClientError::OversizedTransactionBundleCount(_, _) => {
108 if cfg!(debug_assertions) {
109 panic!("{}", msg);
110 } else {
111 error!("{}", msg);
112 }
113 }
114 };
115 })
116 .map_err(|err| SuiErrorKind::FailedToSubmitToConsensus(err.to_string()))?;
117
118 let is_soft_bundle = transactions.len() > 1;
119 let is_ping = transactions.is_empty();
120
121 if !is_soft_bundle
122 && !is_ping
123 && matches!(
124 transactions[0].kind,
125 ConsensusTransactionKind::EndOfPublish(_)
126 | ConsensusTransactionKind::CapabilityNotification(_)
127 | ConsensusTransactionKind::CapabilityNotificationV2(_)
128 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
129 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
130 )
131 {
132 let transaction_key = SequencedConsensusTransactionKey::External(transactions[0].key());
133 tracing::info!("Transaction {transaction_key:?} was included in {block_ref}",)
134 };
135
136 let mut consensus_positions = Vec::new();
138 for index in tx_indices {
139 consensus_positions.push(ConsensusPosition {
140 epoch: client.epoch(),
141 block: block_ref,
142 index,
143 });
144 }
145 Ok((consensus_positions, status_waiter))
146 }
147}