sui_swarm/memory/
container.rs1use super::node::RuntimeType;
5use futures::FutureExt;
6use std::sync::{Arc, Weak};
7use std::thread;
8use sui_config::NodeConfig;
9use sui_node::{SuiNode, SuiNodeHandle};
10use sui_types::base_types::ConciseableName;
11use sui_types::crypto::{AuthorityPublicKeyBytes, KeypairTraits};
12use telemetry_subscribers::get_global_telemetry_config;
13use tracing::{info, trace};
14
15#[derive(Debug)]
16pub(crate) struct Container {
17 join_handle: Option<thread::JoinHandle<()>>,
18 cancel_sender: Option<tokio::sync::oneshot::Sender<()>>,
19 node: Weak<SuiNode>,
20}
21
22impl Drop for Container {
24 fn drop(&mut self) {
25 trace!("dropping Container");
26
27 let thread = self.join_handle.take().unwrap();
28
29 let cancel_handle = self.cancel_sender.take().unwrap();
30
31 let _ = cancel_handle.send(());
33
34 thread.join().unwrap();
36
37 trace!("finished dropping Container");
38 }
39}
40
41impl Container {
42 pub async fn spawn(config: NodeConfig, runtime: RuntimeType) -> Self {
44 let (startup_sender, startup_receiver) = tokio::sync::oneshot::channel();
45 let (cancel_sender, cancel_receiver) = tokio::sync::oneshot::channel();
46 let name = AuthorityPublicKeyBytes::from(config.protocol_key_pair().public())
47 .concise()
48 .to_string();
49
50 let thread = thread::Builder::new().name(name).spawn(move || {
51 let span = if get_global_telemetry_config()
52 .map(|c| c.enable_otlp_tracing)
53 .unwrap_or(false)
54 {
55 None
57 } else {
58 Some(tracing::span!(
59 tracing::Level::INFO,
60 "node",
61 name =% AuthorityPublicKeyBytes::from(config.protocol_key_pair().public()).concise(),
62 ))
63 };
64
65 let _guard = span.as_ref().map(|span| span.enter());
66
67 let mut builder = match runtime {
68 RuntimeType::SingleThreaded => tokio::runtime::Builder::new_current_thread(),
69 RuntimeType::MultiThreaded => {
70 thread_local! {
71 static SPAN: std::cell::RefCell<Option<tracing::span::EnteredSpan>> =
72 const { std::cell::RefCell::new(None) };
73 }
74 let mut builder = tokio::runtime::Builder::new_multi_thread();
75 let span = span.clone();
76 builder
77 .on_thread_start(move || {
78 SPAN.with(|maybe_entered_span| {
79 if let Some(span) = &span {
80 *maybe_entered_span.borrow_mut() = Some(span.clone().entered());
81 }
82 });
83 })
84 .on_thread_stop(|| {
85 SPAN.with(|maybe_entered_span| {
86 maybe_entered_span.borrow_mut().take();
87 });
88 });
89
90 builder
91 }
92 };
93 let runtime = builder.enable_all().build().unwrap();
94
95 runtime.block_on(async move {
96 let registry_service = mysten_metrics::start_prometheus_server(config.metrics_address);
97 info!(
98 "Started Prometheus HTTP endpoint. To query metrics use\n\tcurl -s http://{}/metrics",
99 config.metrics_address
100 );
101 let admin_interface_port = config.admin_interface_port;
102 let server = SuiNode::start(config, registry_service).await.unwrap();
103 let admin_node = server.clone();
104 tokio::spawn(async move {
105 sui_node::admin::run_admin_server(admin_node, admin_interface_port, None).await;
106 });
107 let _ = startup_sender.send(Arc::downgrade(&server));
109 cancel_receiver.map(|_| ()).await;
111
112 trace!("cancellation received; shutting down thread");
113 });
114 }).unwrap();
115
116 let node = startup_receiver.await.unwrap();
117
118 Self {
119 join_handle: Some(thread),
120 cancel_sender: Some(cancel_sender),
121 node,
122 }
123 }
124
125 pub fn get_node_handle(&self) -> Option<SuiNodeHandle> {
127 Some(SuiNodeHandle::new(self.node.upgrade()?))
128 }
129
130 pub fn is_alive(&self) -> bool {
137 if let Some(cancel_sender) = &self.cancel_sender {
138 !cancel_sender.is_closed()
139 } else {
140 false
141 }
142 }
143}