sui_indexer_alt_jsonrpc/data/system_package_task.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use std::time::Duration;
use diesel::{sql_query, sql_types::BigInt, QueryableByName};
use sui_types::SYSTEM_PACKAGE_ADDRESSES;
use tokio::{task::JoinHandle, time};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use crate::context::Context;
#[derive(clap::Args, Debug, Clone)]
pub struct SystemPackageTaskArgs {
/// How long to wait between checking for epoch changes.
#[clap(long, default_value_t = Self::default().epoch_polling_interval_ms)]
epoch_polling_interval_ms: u64,
}
/// Background task responsible for evicting system package from the package resolver's cache after
/// detecting an epoch boundary.
pub(crate) struct SystemPackageTask {
/// Access to the database and package resolver.
context: Context,
/// How long to wait between checks.
interval: Duration,
/// Signal to cancel the task.
cancel: CancellationToken,
}
impl SystemPackageTaskArgs {
pub fn epoch_polling_interval(&self) -> Duration {
Duration::from_millis(self.epoch_polling_interval_ms)
}
}
impl SystemPackageTask {
pub(crate) fn new(
context: Context,
args: SystemPackageTaskArgs,
cancel: CancellationToken,
) -> Self {
Self {
context,
interval: args.epoch_polling_interval(),
cancel,
}
}
/// Start a new task that regularly polls the database for the latest epoch and evicts system
/// packages if it detects that the epoch has changed (which means that a framework upgrade
/// could have happened).
///
/// This operation consumes the `self` and returns a handle to the spawned tokio task. The task
/// will continue to run until its cancellation token is triggered.
pub(crate) fn run(self) -> JoinHandle<()> {
tokio::spawn(async move {
let Self {
context,
interval,
cancel,
} = self;
let mut last_epoch: i64 = 0;
let mut interval = time::interval(interval);
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!("Shutdown signal received, terminating system package eviction task");
break;
}
_ = interval.tick() => {
let mut conn = match context.pg_reader().connect().await {
Ok(conn) => conn,
Err(e) => {
error!("Failed to connect to database: {:?}", e);
continue;
}
};
#[derive(QueryableByName, Copy, Clone)]
struct Watermark {
#[diesel(sql_type = BigInt)]
epoch_hi_inclusive: i64,
}
let query = sql_query(r#"
SELECT epoch_hi_inclusive FROM watermarks WHERE pipeline = 'sum_packages'
"#);
let Watermark { epoch_hi_inclusive: next_epoch } = match conn
.results(query)
.await
.as_deref()
{
Ok([epoch]) => *epoch,
Ok([]) => {
info!("Package index isn't populated yet, no epoch information");
continue;
}
Ok(_) => {
error!("Expected exactly one row from the watermarks table");
continue;
},
Err(e) => {
error!("Failed to fetch latest epoch: {e}");
continue;
}
};
if next_epoch > last_epoch {
info!(last_epoch, next_epoch, "Detected epoch boundary, evicting system packages from cache");
last_epoch = next_epoch;
context.package_resolver().package_store().evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied())
}
}
}
}
})
}
}
impl Default for SystemPackageTaskArgs {
fn default() -> Self {
Self {
epoch_polling_interval_ms: 10_000,
}
}
}