sui_core/
verify_indexes.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Weak;
use std::{collections::BTreeMap, sync::Arc};

use crate::jsonrpc_index::{CoinIndexKey2, CoinInfo, IndexStore};
use anyhow::{anyhow, bail, Result};
use sui_types::{base_types::ObjectInfo, object::Owner};
use tracing::info;
use typed_store::traits::Map;

use crate::authority::AuthorityState;
use crate::{authority::authority_store_tables::LiveObject, state_accumulator::AccumulatorStore};

/// This is a very expensive function that verifies some of the secondary indexes. This is done by
/// iterating through the live object set and recalculating these secodary indexes.
pub fn verify_indexes(store: &dyn AccumulatorStore, indexes: Arc<IndexStore>) -> Result<()> {
    info!("Begin running index verification checks");

    let mut owner_index = BTreeMap::new();
    let mut coin_index = BTreeMap::new();

    tracing::info!("Reading live objects set");
    for object in store.iter_live_object_set(false) {
        let LiveObject::Normal(object) = object else {
            continue;
        };
        let Owner::AddressOwner(owner) = object.owner else {
            continue;
        };

        // Owner Index Calculation
        let owner_index_key = (owner, object.id());
        let object_info = ObjectInfo::new(&object.compute_object_reference(), &object);

        owner_index.insert(owner_index_key, object_info);

        // Coin Index Calculation
        if let Some(type_tag) = object.coin_type_maybe() {
            let info =
                CoinInfo::from_object(&object).expect("already checked that this is a coin type");
            let key = CoinIndexKey2::new(owner, type_tag.to_string(), info.balance, object.id());

            coin_index.insert(key, info);
        }
    }

    tracing::info!("Live objects set is prepared, about to verify indexes");

    // Verify Owner Index
    for item in indexes.tables().owner_index().safe_iter() {
        let (key, info) = item?;
        let calculated_info = owner_index.remove(&key).ok_or_else(|| {
            anyhow!(
                "owner_index: found extra, unexpected entry {:?}",
                (&key, &info)
            )
        })?;

        if calculated_info != info {
            bail!("owner_index: entry {key:?} is different: expected {calculated_info:?} found {info:?}");
        }
    }

    if !owner_index.is_empty() {
        bail!("owner_index: is missing entries: {owner_index:?}");
    }
    tracing::info!("Owner index is good");

    // Verify Coin Index
    for item in indexes.tables().coin_index().safe_iter() {
        let (key, info) = item?;
        let calculated_info = coin_index.remove(&key).ok_or_else(|| {
            anyhow!(
                "coin_index: found extra, unexpected entry {:?}",
                (&key, &info)
            )
        })?;

        if calculated_info != info {
            bail!("coin_index: entry {key:?} is different: expected {calculated_info:?} found {info:?}");
        }
    }
    tracing::info!("Coin index is good");

    if !coin_index.is_empty() {
        bail!("coin_index: is missing entries: {coin_index:?}");
    }

    info!("Finished running index verification checks");

    Ok(())
}

// temporary code to repair the coin index. This should be removed in the next release
pub async fn fix_indexes(authority_state: Weak<AuthorityState>) -> Result<()> {
    let is_violation = |coin_index_key: &CoinIndexKey2, state: &Arc<AuthorityState>| -> bool {
        if let Some(object) = state
            .get_object_store()
            .get_object(&coin_index_key.object_id)
        {
            if matches!(object.owner, Owner::AddressOwner(real_owner_id) | Owner::ObjectOwner(real_owner_id) if coin_index_key.owner == real_owner_id)
            {
                return false;
            }
        }
        true
    };

    tracing::info!("Starting fixing coin index");
    // populate candidate list without locking. Some entries are benign
    let authority_state_clone = authority_state.clone();
    let candidates = tokio::task::spawn_blocking(move || {
        if let Some(authority) = authority_state_clone.upgrade() {
            let mut batch = vec![];
            if let Some(indexes) = &authority.indexes {
                for entry in indexes.tables().coin_index().safe_iter() {
                    let (coin_index_key, _) = entry?;
                    if is_violation(&coin_index_key, &authority) {
                        batch.push(coin_index_key);
                    }
                }
            }
            return Ok::<Vec<_>, anyhow::Error>(batch);
        }
        Ok(vec![])
    })
    .await??;

    if let Some(authority) = authority_state.upgrade() {
        if let Some(indexes) = &authority.indexes {
            for chunk in candidates.chunks(100) {
                let _locks = indexes
                    .caches
                    .locks
                    .acquire_locks(chunk.iter().map(|key| key.owner));
                let mut batch = vec![];
                for key in chunk {
                    if is_violation(key, &authority) {
                        batch.push(key);
                    }
                }
                let mut wb = indexes.tables().coin_index().batch();
                wb.delete_batch(indexes.tables().coin_index(), batch)?;
                wb.write()?;
            }
        }
    }
    tracing::info!("Finished fix for the coin index");
    Ok(())
}