sui_swarm/memory/
container.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::node::RuntimeType;
5use futures::FutureExt;
6use std::sync::{Arc, Weak};
7use std::thread;
8use sui_config::NodeConfig;
9use sui_node::{SuiNode, SuiNodeHandle};
10use sui_types::base_types::ConciseableName;
11use sui_types::crypto::{AuthorityPublicKeyBytes, KeypairTraits};
12use telemetry_subscribers::get_global_telemetry_config;
13use tracing::{info, trace};
14
15#[derive(Debug)]
16pub(crate) struct Container {
17    join_handle: Option<thread::JoinHandle<()>>,
18    cancel_sender: Option<tokio::sync::oneshot::Sender<()>>,
19    node: Weak<SuiNode>,
20}
21
22/// When dropped, stop and wait for the node running in this Container to completely shutdown.
23impl Drop for Container {
24    fn drop(&mut self) {
25        trace!("dropping Container");
26
27        let thread = self.join_handle.take().unwrap();
28
29        let cancel_handle = self.cancel_sender.take().unwrap();
30
31        // Notify the thread to shutdown
32        let _ = cancel_handle.send(());
33
34        // Wait for the thread to join
35        thread.join().unwrap();
36
37        trace!("finished dropping Container");
38    }
39}
40
41impl Container {
42    /// Spawn a new Node.
43    pub async fn spawn(config: NodeConfig, runtime: RuntimeType) -> Self {
44        let (startup_sender, startup_receiver) = tokio::sync::oneshot::channel();
45        let (cancel_sender, cancel_receiver) = tokio::sync::oneshot::channel();
46        let name = AuthorityPublicKeyBytes::from(config.protocol_key_pair().public())
47            .concise()
48            .to_string();
49
50        let thread = thread::Builder::new().name(name).spawn(move || {
51            let span = if get_global_telemetry_config()
52                .map(|c| c.enable_otlp_tracing)
53                .unwrap_or(false)
54            {
55                // we cannot have long-lived root spans when exporting trace data to otlp
56                None
57            } else {
58                Some(tracing::span!(
59                    tracing::Level::INFO,
60                    "node",
61                    name =% AuthorityPublicKeyBytes::from(config.protocol_key_pair().public()).concise(),
62                ))
63            };
64
65            let _guard = span.as_ref().map(|span| span.enter());
66
67            let mut builder = match runtime {
68                RuntimeType::SingleThreaded => tokio::runtime::Builder::new_current_thread(),
69                RuntimeType::MultiThreaded => {
70                    thread_local! {
71                        static SPAN: std::cell::RefCell<Option<tracing::span::EnteredSpan>> =
72                            const { std::cell::RefCell::new(None) };
73                    }
74                    let mut builder = tokio::runtime::Builder::new_multi_thread();
75                    let span = span.clone();
76                    builder
77                        .on_thread_start(move || {
78                            SPAN.with(|maybe_entered_span| {
79                                if let Some(span) = &span {
80                                    *maybe_entered_span.borrow_mut() = Some(span.clone().entered());
81                                }
82                            });
83                        })
84                        .on_thread_stop(|| {
85                            SPAN.with(|maybe_entered_span| {
86                                maybe_entered_span.borrow_mut().take();
87                            });
88                        });
89
90                    builder
91                }
92            };
93            let runtime = builder.enable_all().build().unwrap();
94
95            runtime.block_on(async move {
96                let registry_service = mysten_metrics::start_prometheus_server(config.metrics_address);
97                info!(
98                    "Started Prometheus HTTP endpoint. To query metrics use\n\tcurl -s http://{}/metrics",
99                    config.metrics_address
100                );
101                let admin_interface_port = config.admin_interface_port;
102                let server = SuiNode::start(config, registry_service).await.unwrap();
103                let admin_node = server.clone();
104                tokio::spawn(async move {
105                    sui_node::admin::run_admin_server(admin_node, admin_interface_port, None).await;
106                });
107                // Notify that we've successfully started the node
108                let _ = startup_sender.send(Arc::downgrade(&server));
109                // run until canceled
110                cancel_receiver.map(|_| ()).await;
111
112                trace!("cancellation received; shutting down thread");
113            });
114        }).unwrap();
115
116        let node = startup_receiver.await.unwrap();
117
118        Self {
119            join_handle: Some(thread),
120            cancel_sender: Some(cancel_sender),
121            node,
122        }
123    }
124
125    /// Get a SuiNodeHandle to the node owned by the container.
126    pub fn get_node_handle(&self) -> Option<SuiNodeHandle> {
127        Some(SuiNodeHandle::new(self.node.upgrade()?))
128    }
129
130    /// Check to see that the Node is still alive by checking if the receiving side of the
131    /// `cancel_sender` has been dropped.
132    ///
133    //TODO When we move to rust 1.61 we should also use
134    // https://doc.rust-lang.org/stable/std/thread/struct.JoinHandle.html#method.is_finished
135    // in order to check if the thread has finished.
136    pub fn is_alive(&self) -> bool {
137        if let Some(cancel_sender) = &self.cancel_sender {
138            !cancel_sender.is_closed()
139        } else {
140            false
141        }
142    }
143}