sui_core/transaction_driver/
reconfig_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::AuthorityAggregatorUpdatable;
5use crate::{
6    authority_client::NetworkAuthorityClient, epoch::committee_store::CommitteeStore,
7    execution_cache::ObjectCacheRead, safe_client::SafeClientMetricsBase,
8};
9use async_trait::async_trait;
10use std::sync::Arc;
11use sui_types::sui_system_state::SuiSystemState;
12use sui_types::sui_system_state::SuiSystemStateTrait;
13use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
14use tokio::sync::broadcast::error::RecvError;
15use tracing::{info, warn};
16
17#[async_trait]
18pub trait ReconfigObserver<A: Clone> {
19    async fn run(&mut self, epoch_updatable: Arc<dyn AuthorityAggregatorUpdatable<A>>);
20    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync>;
21}
22
23/// A ReconfigObserver that subscribes to a reconfig channel of new committee.
24/// This is used in TransactionOrchestrator.
25pub struct OnsiteReconfigObserver {
26    reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
27    execution_cache: Arc<dyn ObjectCacheRead>,
28    committee_store: Arc<CommitteeStore>,
29    safe_client_metrics_base: SafeClientMetricsBase,
30}
31
32impl OnsiteReconfigObserver {
33    pub fn new(
34        reconfig_rx: tokio::sync::broadcast::Receiver<SuiSystemState>,
35        execution_cache: Arc<dyn ObjectCacheRead>,
36        committee_store: Arc<CommitteeStore>,
37        safe_client_metrics_base: SafeClientMetricsBase,
38    ) -> Self {
39        Self {
40            reconfig_rx,
41            execution_cache,
42            committee_store,
43            safe_client_metrics_base,
44        }
45    }
46}
47
48#[async_trait]
49impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
50    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<NetworkAuthorityClient> + Send + Sync> {
51        Box::new(Self {
52            reconfig_rx: self.reconfig_rx.resubscribe(),
53            execution_cache: self.execution_cache.clone(),
54            committee_store: self.committee_store.clone(),
55            safe_client_metrics_base: self.safe_client_metrics_base.clone(),
56        })
57    }
58
59    async fn run(
60        &mut self,
61        updatable: Arc<dyn AuthorityAggregatorUpdatable<NetworkAuthorityClient>>,
62    ) {
63        loop {
64            match self.reconfig_rx.recv().await {
65                Ok(system_state) => {
66                    let epoch_start_state = system_state.into_epoch_start_state();
67                    let committee = epoch_start_state.get_sui_committee();
68                    info!("Got reconfig message. New committee: {}", committee);
69                    if committee.epoch() > updatable.epoch() {
70                        let new_auth_agg = updatable
71                            .authority_aggregator()
72                            .recreate_with_new_epoch_start_state(&epoch_start_state);
73                        updatable.update_authority_aggregator(Arc::new(new_auth_agg));
74                    } else {
75                        // This should only happen when the node just starts
76                        warn!("Epoch number decreased - ignoring committee: {}", committee);
77                    }
78                }
79                // It's ok to miss messages due to overflow here
80                Err(RecvError::Lagged(_)) => {
81                    continue;
82                }
83                Err(RecvError::Closed) => {
84                    // Closing the channel only happens in simtest when a node is shut down.
85                    if cfg!(msim) {
86                        return;
87                    } else {
88                        panic!("Do not expect the channel to be closed")
89                    }
90                }
91            }
92        }
93    }
94}