sui_core/epoch/
committee_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use parking_lot::RwLock;
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use sui_types::base_types::ObjectID;
9use sui_types::committee::{Committee, EpochId};
10use sui_types::error::{SuiErrorKind, SuiResult};
11use typed_store::rocks::{DBMap, DBOptions, MetricConf, default_db_options};
12use typed_store::rocksdb::Options;
13
14use typed_store::DBMapUtils;
15use typed_store::Map;
16
17use sui_macros::nondeterministic;
18
19pub struct CommitteeStore {
20    tables: CommitteeStoreTables,
21    cache: RwLock<HashMap<EpochId, Arc<Committee>>>,
22}
23
24#[derive(DBMapUtils)]
25pub struct CommitteeStoreTables {
26    /// Map from each epoch ID to the committee information.
27    #[default_options_override_fn = "committee_table_default_config"]
28    committee_map: DBMap<EpochId, Committee>,
29}
30
31// These functions are used to initialize the DB tables
32fn committee_table_default_config() -> DBOptions {
33    default_db_options().optimize_for_point_lookup(64)
34}
35
36impl CommitteeStore {
37    pub fn new(path: PathBuf, genesis_committee: &Committee, db_options: Option<Options>) -> Self {
38        let tables = CommitteeStoreTables::open_tables_read_write(
39            path,
40            MetricConf::new("committee"),
41            db_options,
42            None,
43        );
44        let store = Self {
45            tables,
46            cache: RwLock::new(HashMap::new()),
47        };
48        if store
49            .database_is_empty()
50            .expect("CommitteeStore initialization failed")
51        {
52            store
53                .init_genesis_committee(genesis_committee.clone())
54                .expect("Init genesis committee data must not fail");
55        }
56        store
57    }
58
59    pub fn new_for_testing(genesis_committee: &Committee) -> Self {
60        let dir = std::env::temp_dir();
61        let path = dir.join(format!("DB_{:?}", nondeterministic!(ObjectID::random())));
62        Self::new(path, genesis_committee, None)
63    }
64
65    pub fn init_genesis_committee(&self, genesis_committee: Committee) -> SuiResult {
66        assert_eq!(genesis_committee.epoch, 0);
67        self.tables.committee_map.insert(&0, &genesis_committee)?;
68        self.cache.write().insert(0, Arc::new(genesis_committee));
69        Ok(())
70    }
71
72    pub fn insert_new_committee(&self, new_committee: &Committee) -> SuiResult {
73        if let Some(old_committee) = self.get_committee(&new_committee.epoch)? {
74            // If somehow we already have this committee in the store, they must be the same.
75            assert_eq!(&*old_committee, new_committee);
76        } else {
77            self.tables
78                .committee_map
79                .insert(&new_committee.epoch, new_committee)?;
80            self.cache
81                .write()
82                .insert(new_committee.epoch, Arc::new(new_committee.clone()));
83        }
84        Ok(())
85    }
86
87    pub fn get_committee(&self, epoch_id: &EpochId) -> SuiResult<Option<Arc<Committee>>> {
88        if let Some(committee) = self.cache.read().get(epoch_id) {
89            return Ok(Some(committee.clone()));
90        }
91        let committee = self.tables.committee_map.get(epoch_id)?;
92        let committee = committee.map(Arc::new);
93        if let Some(committee) = committee.as_ref() {
94            self.cache.write().insert(*epoch_id, committee.clone());
95        }
96        Ok(committee)
97    }
98
99    // todo - make use of cache or remove this method
100    pub fn get_latest_committee(&self) -> SuiResult<Committee> {
101        Ok(self
102            .tables
103            .committee_map
104            .reversed_safe_iter_with_bounds(None, None)?
105            .next()
106            .transpose()?
107            // unwrap safe because we guarantee there is at least a genesis epoch
108            // when initializing the store.
109            .unwrap()
110            .1)
111    }
112    /// Return the committee specified by `epoch`. If `epoch` is `None`, return the latest committee.
113    // todo - make use of cache or remove this method
114    pub fn get_or_latest_committee(&self, epoch: Option<EpochId>) -> SuiResult<Committee> {
115        Ok(match epoch {
116            Some(epoch) => self
117                .get_committee(&epoch)?
118                .ok_or(SuiErrorKind::MissingCommitteeAtEpoch(epoch))
119                .map(|c| Committee::clone(&*c))?,
120            None => self.get_latest_committee()?,
121        })
122    }
123
124    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
125        self.tables
126            .committee_map
127            .checkpoint_db(path)
128            .map_err(Into::into)
129    }
130
131    fn database_is_empty(&self) -> SuiResult<bool> {
132        Ok(self
133            .tables
134            .committee_map
135            .safe_iter()
136            .next()
137            .transpose()?
138            .is_none())
139    }
140}