sui_core/transaction_driver/
reconfig_observer.rs1use super::AuthorityAggregatorUpdatable;
5use crate::{
6 authority_client::{AuthorityAPI, NetworkAuthorityClient},
7 epoch::committee_store::CommitteeStore,
8 execution_cache::ObjectCacheRead,
9 safe_client::SafeClientMetricsBase,
10};
11use async_trait::async_trait;
12use std::sync::Arc;
13use sui_types::sui_system_state::SuiSystemState;
14use sui_types::sui_system_state::SuiSystemStateTrait;
15use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
16use tokio::sync::broadcast::error::RecvError;
17use tracing::{info, warn};
18
19#[async_trait]
20pub trait ReconfigObserver<A: Clone> {
21 async fn run(&mut self, epoch_updatable: Arc<dyn AuthorityAggregatorUpdatable<A>>);
22 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync>;
23}
24
25pub struct OnsiteReconfigObserver {
28 reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
29 execution_cache: Arc<dyn ObjectCacheRead>,
30 committee_store: Arc<CommitteeStore>,
31 safe_client_metrics_base: SafeClientMetricsBase,
32}
33
34impl OnsiteReconfigObserver {
35 pub fn new(
36 reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
37 execution_cache: Arc<dyn ObjectCacheRead>,
38 committee_store: Arc<CommitteeStore>,
39 safe_client_metrics_base: SafeClientMetricsBase,
40 ) -> Self {
41 Self {
42 reconfig_rx,
43 execution_cache,
44 committee_store,
45 safe_client_metrics_base,
46 }
47 }
48}
49
50#[async_trait]
51impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
52 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<NetworkAuthorityClient> + Send + Sync> {
53 Box::new(Self {
54 reconfig_rx: self.reconfig_rx.resubscribe(),
55 execution_cache: self.execution_cache.clone(),
56 committee_store: self.committee_store.clone(),
57 safe_client_metrics_base: self.safe_client_metrics_base.clone(),
58 })
59 }
60
61 async fn run(
62 &mut self,
63 updatable: Arc<dyn AuthorityAggregatorUpdatable<NetworkAuthorityClient>>,
64 ) {
65 loop {
66 match self.reconfig_rx.recv().await {
67 Ok(system_state) => {
68 let epoch_start_state = system_state.into_epoch_start_state();
69 let committee = epoch_start_state.get_sui_committee();
70 info!("Got reconfig message. New committee: {}", committee);
71 if committee.epoch() > updatable.epoch() {
72 let new_auth_agg = updatable
73 .authority_aggregator()
74 .recreate_with_new_epoch_start_state(&epoch_start_state);
75 updatable.update_authority_aggregator(Arc::new(new_auth_agg));
76 } else {
77 warn!("Epoch number decreased - ignoring committee: {}", committee);
79 }
80 }
81 Err(RecvError::Lagged(_)) => {
83 continue;
84 }
85 Err(RecvError::Closed) => {
86 if cfg!(msim) {
88 return;
89 } else {
90 panic!("Do not expect the channel to be closed")
91 }
92 }
93 }
94 }
95 }
96}
97pub struct DummyReconfigObserver;
99
100#[async_trait]
101impl<A> ReconfigObserver<A> for DummyReconfigObserver
102where
103 A: AuthorityAPI + Send + Sync + Clone + 'static,
104{
105 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync> {
106 Box::new(Self {})
107 }
108
109 async fn run(&mut self, _quorum_driver: Arc<dyn AuthorityAggregatorUpdatable<A>>) {}
110}