sui_core/quorum_driver/
reconfig_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::AuthorityAggregatorUpdatable;
5use crate::{
6    authority_aggregator::AuthAggMetrics,
7    authority_client::{AuthorityAPI, NetworkAuthorityClient},
8    epoch::committee_store::CommitteeStore,
9    execution_cache::ObjectCacheRead,
10    safe_client::SafeClientMetricsBase,
11};
12use async_trait::async_trait;
13use std::sync::Arc;
14use sui_types::sui_system_state::SuiSystemState;
15use sui_types::sui_system_state::SuiSystemStateTrait;
16use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
17use tokio::sync::broadcast::error::RecvError;
18use tracing::{info, warn};
19
20#[async_trait]
21pub trait ReconfigObserver<A: Clone> {
22    async fn run(&mut self, epoch_updatable: Arc<dyn AuthorityAggregatorUpdatable<A>>);
23    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync>;
24}
25
26/// A ReconfigObserver that subscribes to a reconfig channel of new committee.
27/// This is used in TransactionOrchestrator.
28pub struct OnsiteReconfigObserver {
29    reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
30    execution_cache: Arc<dyn ObjectCacheRead>,
31    committee_store: Arc<CommitteeStore>,
32    // TODO: Use Arc for both metrics.
33    safe_client_metrics_base: SafeClientMetricsBase,
34    auth_agg_metrics: AuthAggMetrics,
35}
36
37impl OnsiteReconfigObserver {
38    pub fn new(
39        reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
40        execution_cache: Arc<dyn ObjectCacheRead>,
41        committee_store: Arc<CommitteeStore>,
42        safe_client_metrics_base: SafeClientMetricsBase,
43        auth_agg_metrics: AuthAggMetrics,
44    ) -> Self {
45        Self {
46            reconfig_rx,
47            execution_cache,
48            committee_store,
49            safe_client_metrics_base,
50            auth_agg_metrics,
51        }
52    }
53}
54
55#[async_trait]
56impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
57    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<NetworkAuthorityClient> + Send + Sync> {
58        Box::new(Self {
59            reconfig_rx: self.reconfig_rx.resubscribe(),
60            execution_cache: self.execution_cache.clone(),
61            committee_store: self.committee_store.clone(),
62            safe_client_metrics_base: self.safe_client_metrics_base.clone(),
63            auth_agg_metrics: self.auth_agg_metrics.clone(),
64        })
65    }
66
67    async fn run(
68        &mut self,
69        updatable: Arc<dyn AuthorityAggregatorUpdatable<NetworkAuthorityClient>>,
70    ) {
71        loop {
72            match self.reconfig_rx.recv().await {
73                Ok(system_state) => {
74                    let epoch_start_state = system_state.into_epoch_start_state();
75                    let committee = epoch_start_state.get_sui_committee();
76                    info!("Got reconfig message. New committee: {}", committee);
77                    if committee.epoch() > updatable.epoch() {
78                        let new_auth_agg = updatable
79                            .authority_aggregator()
80                            .recreate_with_new_epoch_start_state(&epoch_start_state);
81                        updatable.update_authority_aggregator(Arc::new(new_auth_agg));
82                    } else {
83                        // This should only happen when the node just starts
84                        warn!("Epoch number decreased - ignoring committee: {}", committee);
85                    }
86                }
87                // It's ok to miss messages due to overflow here
88                Err(RecvError::Lagged(_)) => {
89                    continue;
90                }
91                Err(RecvError::Closed) => {
92                    // Closing the channel only happens in simtest when a node is shut down.
93                    if cfg!(msim) {
94                        return;
95                    } else {
96                        panic!("Do not expect the channel to be closed")
97                    }
98                }
99            }
100        }
101    }
102}
103/// A dummy ReconfigObserver for testing.
104pub struct DummyReconfigObserver;
105
106#[async_trait]
107impl<A> ReconfigObserver<A> for DummyReconfigObserver
108where
109    A: AuthorityAPI + Send + Sync + Clone + 'static,
110{
111    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync> {
112        Box::new(Self {})
113    }
114
115    async fn run(&mut self, _quorum_driver: Arc<dyn AuthorityAggregatorUpdatable<A>>) {}
116}