sui_core/transaction_driver/
reconfig_observer.rs1use super::AuthorityAggregatorUpdatable;
5use crate::{
6 authority_client::NetworkAuthorityClient, epoch::committee_store::CommitteeStore,
7 execution_cache::ObjectCacheRead, safe_client::SafeClientMetricsBase,
8};
9use async_trait::async_trait;
10use std::sync::Arc;
11use sui_types::sui_system_state::SuiSystemState;
12use sui_types::sui_system_state::SuiSystemStateTrait;
13use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
14use tokio::sync::broadcast::error::RecvError;
15use tracing::{info, warn};
16
17#[async_trait]
18pub trait ReconfigObserver<A: Clone> {
19 async fn run(&mut self, epoch_updatable: Arc<dyn AuthorityAggregatorUpdatable<A>>);
20 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync>;
21}
22
23pub struct OnsiteReconfigObserver {
26 reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
27 execution_cache: Arc<dyn ObjectCacheRead>,
28 committee_store: Arc<CommitteeStore>,
29 safe_client_metrics_base: SafeClientMetricsBase,
30}
31
32impl OnsiteReconfigObserver {
33 pub fn new(
34 reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
35 execution_cache: Arc<dyn ObjectCacheRead>,
36 committee_store: Arc<CommitteeStore>,
37 safe_client_metrics_base: SafeClientMetricsBase,
38 ) -> Self {
39 Self {
40 reconfig_rx,
41 execution_cache,
42 committee_store,
43 safe_client_metrics_base,
44 }
45 }
46}
47
48#[async_trait]
49impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
50 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<NetworkAuthorityClient> + Send + Sync> {
51 Box::new(Self {
52 reconfig_rx: self.reconfig_rx.resubscribe(),
53 execution_cache: self.execution_cache.clone(),
54 committee_store: self.committee_store.clone(),
55 safe_client_metrics_base: self.safe_client_metrics_base.clone(),
56 })
57 }
58
59 async fn run(
60 &mut self,
61 updatable: Arc<dyn AuthorityAggregatorUpdatable<NetworkAuthorityClient>>,
62 ) {
63 loop {
64 match self.reconfig_rx.recv().await {
65 Ok(system_state) => {
66 let epoch_start_state = system_state.into_epoch_start_state();
67 let committee = epoch_start_state.get_sui_committee();
68 info!("Got reconfig message. New committee: {}", committee);
69 if committee.epoch() > updatable.epoch() {
70 let new_auth_agg = updatable
71 .authority_aggregator()
72 .recreate_with_new_epoch_start_state(&epoch_start_state);
73 updatable.update_authority_aggregator(Arc::new(new_auth_agg));
74 } else {
75 warn!("Epoch number decreased - ignoring committee: {}", committee);
77 }
78 }
79 Err(RecvError::Lagged(_)) => {
81 continue;
82 }
83 Err(RecvError::Closed) => {
84 if cfg!(msim) {
86 return;
87 } else {
88 panic!("Do not expect the channel to be closed")
89 }
90 }
91 }
92 }
93 }
94}