sui_swarm/memory/
container.rsuse super::node::RuntimeType;
use futures::FutureExt;
use std::sync::{Arc, Weak};
use std::thread;
use sui_config::NodeConfig;
use sui_node::{SuiNode, SuiNodeHandle};
use sui_types::base_types::ConciseableName;
use sui_types::crypto::{AuthorityPublicKeyBytes, KeypairTraits};
use telemetry_subscribers::get_global_telemetry_config;
use tracing::{info, trace};
#[derive(Debug)]
pub(crate) struct Container {
join_handle: Option<thread::JoinHandle<()>>,
cancel_sender: Option<tokio::sync::oneshot::Sender<()>>,
node: Weak<SuiNode>,
}
impl Drop for Container {
fn drop(&mut self) {
trace!("dropping Container");
let thread = self.join_handle.take().unwrap();
let cancel_handle = self.cancel_sender.take().unwrap();
let _ = cancel_handle.send(());
thread.join().unwrap();
trace!("finished dropping Container");
}
}
impl Container {
pub async fn spawn(config: NodeConfig, runtime: RuntimeType) -> Self {
let (startup_sender, startup_receiver) = tokio::sync::oneshot::channel();
let (cancel_sender, cancel_receiver) = tokio::sync::oneshot::channel();
let name = AuthorityPublicKeyBytes::from(config.protocol_key_pair().public())
.concise()
.to_string();
let thread = thread::Builder::new().name(name).spawn(move || {
let span = if get_global_telemetry_config()
.map(|c| c.enable_otlp_tracing)
.unwrap_or(false)
{
None
} else {
Some(tracing::span!(
tracing::Level::INFO,
"node",
name =% AuthorityPublicKeyBytes::from(config.protocol_key_pair().public()).concise(),
))
};
let _guard = span.as_ref().map(|span| span.enter());
let mut builder = match runtime {
RuntimeType::SingleThreaded => tokio::runtime::Builder::new_current_thread(),
RuntimeType::MultiThreaded => {
thread_local! {
static SPAN: std::cell::RefCell<Option<tracing::span::EnteredSpan>> =
const { std::cell::RefCell::new(None) };
}
let mut builder = tokio::runtime::Builder::new_multi_thread();
let span = span.clone();
builder
.on_thread_start(move || {
SPAN.with(|maybe_entered_span| {
if let Some(span) = &span {
*maybe_entered_span.borrow_mut() = Some(span.clone().entered());
}
});
})
.on_thread_stop(|| {
SPAN.with(|maybe_entered_span| {
maybe_entered_span.borrow_mut().take();
});
});
builder
}
};
let runtime = builder.enable_all().build().unwrap();
runtime.block_on(async move {
let registry_service = mysten_metrics::start_prometheus_server(config.metrics_address);
info!(
"Started Prometheus HTTP endpoint. To query metrics use\n\tcurl -s http://{}/metrics",
config.metrics_address
);
let server = SuiNode::start(config, registry_service, None).await.unwrap();
let _ = startup_sender.send(Arc::downgrade(&server));
cancel_receiver.map(|_| ()).await;
trace!("cancellation received; shutting down thread");
});
}).unwrap();
let node = startup_receiver.await.unwrap();
Self {
join_handle: Some(thread),
cancel_sender: Some(cancel_sender),
node,
}
}
pub fn get_node_handle(&self) -> Option<SuiNodeHandle> {
Some(SuiNodeHandle::new(self.node.upgrade()?))
}
pub fn is_alive(&self) -> bool {
if let Some(cancel_sender) = &self.cancel_sender {
!cancel_sender.is_closed()
} else {
false
}
}
}