sui_core/authority/authority_store_migrations.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::authority::AuthorityStore;
use std::sync::Arc;
use sui_types::effects::TransactionEffectsAPI;
use typed_store::traits::Map;
// Temporary migration task that can be removed in a release or two once we've removed the old
// events table and are sure we don't need to revert to using the old events table
pub async fn migrate_events(store: Arc<AuthorityStore>) {
tracing::info!("Starting events table migration");
let result = tokio::task::spawn_blocking(move || {
let mut batch = store.perpetual_tables.events_2.batch();
for entry in store.perpetual_tables.executed_effects.safe_iter() {
let (txn_digest, effects_digest) = entry?;
// If there's already an entry for this transaction in the new table we can skip it
if store.perpetual_tables.events_2.contains_key(&txn_digest)? {
continue;
}
let effects = if let Some(effects) = store.get_effects(&effects_digest)? {
effects
} else {
// Skip this one if we can't find the effects
continue;
};
let events_digest = if let Some(events_digest) = effects.events_digest() {
events_digest
} else {
// There are no events so we can continue to the next entry
continue;
};
let events = if let Some(events) = store.get_events_by_events_digest(events_digest)? {
// Check that the events we're loading do match the expected events digest for this
// transaction
let fetched_events_digest = events.digest();
if &fetched_events_digest != events_digest {
tracing::warn!(
expected_events_digest =? events_digest,
fetched_events_digest =? fetched_events_digest,
"fetched events don't matched expected digest; skipping",
);
continue;
}
events
} else {
// Skip this one if we can't find the events. This means they were liked already
// pruned
continue;
};
batch.insert_batch(&store.perpetual_tables.events_2, [(&txn_digest, &events)])?;
// If the batch size grows to greater that 128MB then write out to the DB so that the
// data we need to hold in memory doesn't grown unbounded.
if batch.size_in_bytes() >= 1 << 27 {
std::mem::replace(&mut batch, store.perpetual_tables.events_2.batch()).write()?;
}
}
batch.write()?;
Ok::<_, anyhow::Error>(())
})
.await
.unwrap();
if let Err(e) = result {
tracing::warn!("Error encountered while Finished events table migration: {e}");
}
tracing::info!("Finished events table migration");
}