sui_core/quorum_driver/
reconfig_observer.rs1use super::AuthorityAggregatorUpdatable;
5use crate::{
6 authority_aggregator::AuthAggMetrics,
7 authority_client::{AuthorityAPI, NetworkAuthorityClient},
8 epoch::committee_store::CommitteeStore,
9 execution_cache::ObjectCacheRead,
10 safe_client::SafeClientMetricsBase,
11};
12use async_trait::async_trait;
13use std::sync::Arc;
14use sui_types::sui_system_state::SuiSystemState;
15use sui_types::sui_system_state::SuiSystemStateTrait;
16use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
17use tokio::sync::broadcast::error::RecvError;
18use tracing::{info, warn};
19
20#[async_trait]
21pub trait ReconfigObserver<A: Clone> {
22 async fn run(&mut self, epoch_updatable: Arc<dyn AuthorityAggregatorUpdatable<A>>);
23 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync>;
24}
25
26pub struct OnsiteReconfigObserver {
29 reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
30 execution_cache: Arc<dyn ObjectCacheRead>,
31 committee_store: Arc<CommitteeStore>,
32 safe_client_metrics_base: SafeClientMetricsBase,
34 auth_agg_metrics: AuthAggMetrics,
35}
36
37impl OnsiteReconfigObserver {
38 pub fn new(
39 reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
40 execution_cache: Arc<dyn ObjectCacheRead>,
41 committee_store: Arc<CommitteeStore>,
42 safe_client_metrics_base: SafeClientMetricsBase,
43 auth_agg_metrics: AuthAggMetrics,
44 ) -> Self {
45 Self {
46 reconfig_rx,
47 execution_cache,
48 committee_store,
49 safe_client_metrics_base,
50 auth_agg_metrics,
51 }
52 }
53}
54
55#[async_trait]
56impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
57 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<NetworkAuthorityClient> + Send + Sync> {
58 Box::new(Self {
59 reconfig_rx: self.reconfig_rx.resubscribe(),
60 execution_cache: self.execution_cache.clone(),
61 committee_store: self.committee_store.clone(),
62 safe_client_metrics_base: self.safe_client_metrics_base.clone(),
63 auth_agg_metrics: self.auth_agg_metrics.clone(),
64 })
65 }
66
67 async fn run(
68 &mut self,
69 updatable: Arc<dyn AuthorityAggregatorUpdatable<NetworkAuthorityClient>>,
70 ) {
71 loop {
72 match self.reconfig_rx.recv().await {
73 Ok(system_state) => {
74 let epoch_start_state = system_state.into_epoch_start_state();
75 let committee = epoch_start_state.get_sui_committee();
76 info!("Got reconfig message. New committee: {}", committee);
77 if committee.epoch() > updatable.epoch() {
78 let new_auth_agg = updatable
79 .authority_aggregator()
80 .recreate_with_new_epoch_start_state(&epoch_start_state);
81 updatable.update_authority_aggregator(Arc::new(new_auth_agg));
82 } else {
83 warn!("Epoch number decreased - ignoring committee: {}", committee);
85 }
86 }
87 Err(RecvError::Lagged(_)) => {
89 continue;
90 }
91 Err(RecvError::Closed) => {
92 if cfg!(msim) {
94 return;
95 } else {
96 panic!("Do not expect the channel to be closed")
97 }
98 }
99 }
100 }
101 }
102}
103pub struct DummyReconfigObserver;
105
106#[async_trait]
107impl<A> ReconfigObserver<A> for DummyReconfigObserver
108where
109 A: AuthorityAPI + Send + Sync + Clone + 'static,
110{
111 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync> {
112 Box::new(Self {})
113 }
114
115 async fn run(&mut self, _quorum_driver: Arc<dyn AuthorityAggregatorUpdatable<A>>) {}
116}