sui_core/transaction_driver/
reconfig_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::AuthorityAggregatorUpdatable;
5use crate::{
6    authority_client::{AuthorityAPI, NetworkAuthorityClient},
7    epoch::committee_store::CommitteeStore,
8    execution_cache::ObjectCacheRead,
9    safe_client::SafeClientMetricsBase,
10};
11use async_trait::async_trait;
12use std::sync::Arc;
13use sui_types::sui_system_state::SuiSystemState;
14use sui_types::sui_system_state::SuiSystemStateTrait;
15use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
16use tokio::sync::broadcast::error::RecvError;
17use tracing::{info, warn};
18
19#[async_trait]
20pub trait ReconfigObserver<A: Clone> {
21    async fn run(&mut self, epoch_updatable: Arc<dyn AuthorityAggregatorUpdatable<A>>);
22    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync>;
23}
24
25/// A ReconfigObserver that subscribes to a reconfig channel of new committee.
26/// This is used in TransactionOrchestrator.
27pub struct OnsiteReconfigObserver {
28    reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
29    execution_cache: Arc<dyn ObjectCacheRead>,
30    committee_store: Arc<CommitteeStore>,
31    safe_client_metrics_base: SafeClientMetricsBase,
32}
33
34impl OnsiteReconfigObserver {
35    pub fn new(
36        reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
37        execution_cache: Arc<dyn ObjectCacheRead>,
38        committee_store: Arc<CommitteeStore>,
39        safe_client_metrics_base: SafeClientMetricsBase,
40    ) -> Self {
41        Self {
42            reconfig_rx,
43            execution_cache,
44            committee_store,
45            safe_client_metrics_base,
46        }
47    }
48}
49
50#[async_trait]
51impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
52    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<NetworkAuthorityClient> + Send + Sync> {
53        Box::new(Self {
54            reconfig_rx: self.reconfig_rx.resubscribe(),
55            execution_cache: self.execution_cache.clone(),
56            committee_store: self.committee_store.clone(),
57            safe_client_metrics_base: self.safe_client_metrics_base.clone(),
58        })
59    }
60
61    async fn run(
62        &mut self,
63        updatable: Arc<dyn AuthorityAggregatorUpdatable<NetworkAuthorityClient>>,
64    ) {
65        loop {
66            match self.reconfig_rx.recv().await {
67                Ok(system_state) => {
68                    let epoch_start_state = system_state.into_epoch_start_state();
69                    let committee = epoch_start_state.get_sui_committee();
70                    info!("Got reconfig message. New committee: {}", committee);
71                    if committee.epoch() > updatable.epoch() {
72                        let new_auth_agg = updatable
73                            .authority_aggregator()
74                            .recreate_with_new_epoch_start_state(&epoch_start_state);
75                        updatable.update_authority_aggregator(Arc::new(new_auth_agg));
76                    } else {
77                        // This should only happen when the node just starts
78                        warn!("Epoch number decreased - ignoring committee: {}", committee);
79                    }
80                }
81                // It's ok to miss messages due to overflow here
82                Err(RecvError::Lagged(_)) => {
83                    continue;
84                }
85                Err(RecvError::Closed) => {
86                    // Closing the channel only happens in simtest when a node is shut down.
87                    if cfg!(msim) {
88                        return;
89                    } else {
90                        panic!("Do not expect the channel to be closed")
91                    }
92                }
93            }
94        }
95    }
96}
97/// A dummy ReconfigObserver for testing.
98pub struct DummyReconfigObserver;
99
100#[async_trait]
101impl<A> ReconfigObserver<A> for DummyReconfigObserver
102where
103    A: AuthorityAPI + Send + Sync + Clone + 'static,
104{
105    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync> {
106        Box::new(Self {})
107    }
108
109    async fn run(&mut self, _quorum_driver: Arc<dyn AuthorityAggregatorUpdatable<A>>) {}
110}