sui_swarm/memory/
node.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use anyhow::anyhow;
6use std::sync::Mutex;
7use std::sync::MutexGuard;
8use sui_config::NodeConfig;
9use sui_node::SuiNodeHandle;
10use sui_types::base_types::AuthorityName;
11use sui_types::base_types::ConciseableName;
12use sui_types::crypto::KeypairTraits;
13use tap::TapFallible;
14use tracing::{error, info};
15
16use super::container::Container;
17
18/// A handle to an in-memory Sui Node.
19///
20/// Each Node is attempted to run in isolation from each other by running them in their own tokio
21/// runtime in a separate thread. By doing this we can ensure that all asynchronous tasks
22/// associated with a Node are able to be stopped when desired (either when a Node is dropped or
23/// explicitly stopped by calling [`Node::stop`]) by simply dropping that Node's runtime.
24#[derive(Debug)]
25pub struct Node {
26    container: Mutex<Option<Container>>,
27    config: Mutex<NodeConfig>,
28    runtime_type: RuntimeType,
29}
30
31impl Node {
32    /// Create a new Node from the provided `NodeConfig`.
33    ///
34    /// The Node is returned without being started. See [`Node::spawn`] or [`Node::start`] for how to
35    /// start the node.
36    ///
37    /// [`NodeConfig`]: sui_config::NodeConfig
38    pub fn new(config: NodeConfig) -> Self {
39        Self {
40            container: Default::default(),
41            config: config.into(),
42            runtime_type: RuntimeType::SingleThreaded,
43        }
44    }
45
46    /// Return the `name` of this Node
47    pub fn name(&self) -> AuthorityName {
48        self.config().protocol_public_key()
49    }
50
51    pub fn config(&self) -> MutexGuard<'_, NodeConfig> {
52        self.config.lock().unwrap()
53    }
54
55    pub fn json_rpc_address(&self) -> std::net::SocketAddr {
56        self.config().json_rpc_address
57    }
58
59    /// Start this Node
60    pub async fn spawn(&self) -> Result<()> {
61        info!(name =% self.name().concise(), "starting in-memory node");
62        let config = self.config().clone();
63        *self.container.lock().unwrap() = Some(Container::spawn(config, self.runtime_type).await);
64        Ok(())
65    }
66
67    /// Start this Node, waiting until its completely started up.
68    pub async fn start(&self) -> Result<()> {
69        self.spawn().await
70    }
71
72    /// Stop this Node
73    pub fn stop(&self) {
74        info!(name =% self.name().concise(), "stopping in-memory node");
75        *self.container.lock().unwrap() = None;
76        info!(name =% self.name().concise(), "node stopped");
77    }
78
79    /// If this Node is currently running
80    pub fn is_running(&self) -> bool {
81        self.container
82            .lock()
83            .unwrap()
84            .as_ref()
85            .is_some_and(|c| c.is_alive())
86    }
87
88    pub fn get_node_handle(&self) -> Option<SuiNodeHandle> {
89        self.container
90            .lock()
91            .unwrap()
92            .as_ref()
93            .and_then(|c| c.get_node_handle())
94    }
95
96    /// Perform a health check on this Node by:
97    /// * Checking that the node is running
98    /// * Calling the Node's gRPC Health service if it's a validator.
99    pub async fn health_check(&self, is_validator: bool) -> Result<(), HealthCheckError> {
100        {
101            let lock = self.container.lock().unwrap();
102            let container = lock.as_ref().ok_or(HealthCheckError::NotRunning)?;
103            if !container.is_alive() {
104                return Err(HealthCheckError::NotRunning);
105            }
106        }
107
108        if is_validator {
109            let network_address = self
110                .config()
111                .network_address()
112                .clone()
113                .rewrite_http_to_https();
114            let tls_config = sui_tls::create_rustls_client_config(
115                self.config().network_key_pair().public().to_owned(),
116                sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
117                None,
118            );
119            let channel = mysten_network::client::connect(&network_address, tls_config)
120                .await
121                .map_err(|err| anyhow!(err.to_string()))
122                .map_err(HealthCheckError::Failure)
123                .tap_err(|e| error!("error connecting to {}: {e}", self.name().concise()))?;
124
125            let mut client = tonic_health::pb::health_client::HealthClient::new(channel);
126            client
127                .check(tonic_health::pb::HealthCheckRequest::default())
128                .await
129                .map_err(|e| HealthCheckError::Failure(e.into()))
130                .tap_err(|e| {
131                    error!(
132                        "error performing health check on {}: {e}",
133                        self.name().concise()
134                    )
135                })?;
136        }
137
138        Ok(())
139    }
140}
141
142#[derive(Debug)]
143pub enum HealthCheckError {
144    NotRunning,
145    Failure(anyhow::Error),
146    Unknown(anyhow::Error),
147}
148
149impl std::fmt::Display for HealthCheckError {
150    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
151        write!(f, "{:?}", self)
152    }
153}
154
155impl std::error::Error for HealthCheckError {}
156
157/// The type of tokio runtime that should be used for a particular Node
158#[derive(Clone, Copy, Debug)]
159pub enum RuntimeType {
160    SingleThreaded,
161    MultiThreaded,
162}
163
164#[cfg(test)]
165mod test {
166    use crate::memory::Swarm;
167
168    #[tokio::test]
169    async fn start_and_stop() {
170        telemetry_subscribers::init_for_testing();
171        let swarm = Swarm::builder().build();
172
173        let validator = swarm.validator_nodes().next().unwrap();
174
175        validator.start().await.unwrap();
176        validator.health_check(true).await.unwrap();
177        validator.stop();
178        validator.health_check(true).await.unwrap_err();
179
180        validator.start().await.unwrap();
181        validator.health_check(true).await.unwrap();
182    }
183}