sui_core/
verify_indexes.rs1use std::sync::Weak;
5use std::{collections::BTreeMap, sync::Arc};
6
7use crate::jsonrpc_index::{CoinIndexKey2, CoinInfo, IndexStore};
8use anyhow::{Result, anyhow, bail};
9use sui_types::{base_types::ObjectInfo, object::Owner};
10use tracing::info;
11use typed_store::traits::Map;
12
13use crate::authority::AuthorityState;
14use crate::{
15 authority::authority_store_tables::LiveObject, global_state_hasher::GlobalStateHashStore,
16};
17
18pub fn verify_indexes(store: &dyn GlobalStateHashStore, indexes: Arc<IndexStore>) -> Result<()> {
21 info!("Begin running index verification checks");
22
23 let mut owner_index = BTreeMap::new();
24 let mut coin_index = BTreeMap::new();
25
26 tracing::info!("Reading live objects set");
27 for object in store.iter_live_object_set(false) {
28 let LiveObject::Normal(object) = object else {
29 continue;
30 };
31 let Owner::AddressOwner(owner) = object.owner else {
32 continue;
33 };
34
35 let owner_index_key = (owner, object.id());
37 let object_info = ObjectInfo::new(&object.compute_object_reference(), &object);
38
39 owner_index.insert(owner_index_key, object_info);
40
41 if let Some(type_tag) = object.coin_type_maybe() {
43 let info =
44 CoinInfo::from_object(&object).expect("already checked that this is a coin type");
45 let key = CoinIndexKey2::new(owner, type_tag.to_string(), info.balance, object.id());
46
47 coin_index.insert(key, info);
48 }
49 }
50
51 tracing::info!("Live objects set is prepared, about to verify indexes");
52
53 for item in indexes.tables().owner_index().safe_iter() {
55 let (key, info) = item?;
56 let calculated_info = owner_index.remove(&key).ok_or_else(|| {
57 anyhow!(
58 "owner_index: found extra, unexpected entry {:?}",
59 (&key, &info)
60 )
61 })?;
62
63 if calculated_info != info {
64 bail!(
65 "owner_index: entry {key:?} is different: expected {calculated_info:?} found {info:?}"
66 );
67 }
68 }
69
70 if !owner_index.is_empty() {
71 bail!("owner_index: is missing entries: {owner_index:?}");
72 }
73 tracing::info!("Owner index is good");
74
75 for item in indexes.tables().coin_index().safe_iter() {
77 let (key, info) = item?;
78 let calculated_info = coin_index.remove(&key).ok_or_else(|| {
79 anyhow!(
80 "coin_index: found extra, unexpected entry {:?}",
81 (&key, &info)
82 )
83 })?;
84
85 if calculated_info != info {
86 bail!(
87 "coin_index: entry {key:?} is different: expected {calculated_info:?} found {info:?}"
88 );
89 }
90 }
91 tracing::info!("Coin index is good");
92
93 if !coin_index.is_empty() {
94 bail!("coin_index: is missing entries: {coin_index:?}");
95 }
96
97 info!("Finished running index verification checks");
98
99 Ok(())
100}
101
102pub async fn fix_indexes(authority_state: Weak<AuthorityState>) -> Result<()> {
104 let is_violation = |coin_index_key: &CoinIndexKey2, state: &Arc<AuthorityState>| -> bool {
105 if let Some(object) = state
106 .get_object_store()
107 .get_object(&coin_index_key.object_id)
108 && matches!(object.owner, Owner::AddressOwner(real_owner_id) | Owner::ObjectOwner(real_owner_id) if coin_index_key.owner == real_owner_id)
109 {
110 return false;
111 }
112 true
113 };
114
115 tracing::info!("Starting fixing coin index");
116 let authority_state_clone = authority_state.clone();
118 let candidates = tokio::task::spawn_blocking(move || {
119 if let Some(authority) = authority_state_clone.upgrade() {
120 let mut batch = vec![];
121 if let Some(indexes) = &authority.indexes {
122 for entry in indexes.tables().coin_index().safe_iter() {
123 let (coin_index_key, _) = entry?;
124 if is_violation(&coin_index_key, &authority) {
125 batch.push(coin_index_key);
126 }
127 }
128 }
129 return Ok::<Vec<_>, anyhow::Error>(batch);
130 }
131 Ok(vec![])
132 })
133 .await??;
134
135 if let Some(authority) = authority_state.upgrade()
136 && let Some(indexes) = &authority.indexes
137 {
138 for chunk in candidates.chunks(100) {
139 let _locks = indexes
140 .caches
141 .locks
142 .acquire_locks(chunk.iter().map(|key| key.owner));
143 let mut batch = vec![];
144 for key in chunk {
145 if is_violation(key, &authority) {
146 batch.push(key);
147 }
148 }
149 let mut wb = indexes.tables().coin_index().batch();
150 wb.delete_batch(indexes.tables().coin_index(), batch)?;
151 wb.write()?;
152 }
153 }
154 tracing::info!("Finished fix for the coin index");
155 Ok(())
156}