sui_json_rpc/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use axum::body::Body;
8use axum::http;
9use hyper::Request;
10use jsonrpsee::RpcModule;
11use metrics::Metrics;
12use metrics::MetricsLayer;
13use prometheus::Registry;
14use std::time::Duration;
15use sui_core::traffic_controller::TrafficController;
16use sui_types::traffic_control::PolicyConfig;
17use tokio::runtime::Handle;
18use tokio_util::sync::CancellationToken;
19use tower::ServiceBuilder;
20use tower_http::trace::TraceLayer;
21use tracing::info;
22
23pub use balance_changes::*;
24pub use object_changes::*;
25pub use sui_config::node::ServerType;
26use sui_open_rpc::{Module, Project};
27use traffic_control::TrafficControllerService;
28
29use crate::error::Error;
30
31pub mod authority_state;
32mod balance_changes;
33pub mod bridge_api;
34pub mod coin_api;
35pub mod error;
36pub mod governance_api;
37pub mod indexer_api;
38pub mod logger;
39mod metrics;
40pub mod move_utils;
41mod object_changes;
42pub mod read_api;
43mod traffic_control;
44pub mod transaction_builder_api;
45pub mod transaction_execution_api;
46
47pub const APP_NAME_HEADER: &str = "app-name";
48
49pub const MAX_REQUEST_SIZE: u32 = 2 << 30;
50
51pub struct JsonRpcServerBuilder {
52    module: RpcModule<()>,
53    rpc_doc: Project,
54    registry: Registry,
55    traffic_controller: Option<Arc<TrafficController>>,
56    policy_config: Option<PolicyConfig>,
57}
58
59pub fn sui_rpc_doc(version: &str) -> Project {
60    Project::new(
61        version,
62        "Sui JSON-RPC",
63        "Sui JSON-RPC API for interaction with Sui Full node. Make RPC calls using https://fullnode.NETWORK.sui.io:443, where NETWORK is the network you want to use (testnet, devnet, mainnet). By default, local networks use port 9000.",
64        "Mysten Labs",
65        "https://mystenlabs.com",
66        "build@mystenlabs.com",
67        "Apache-2.0",
68        "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
69    )
70}
71
72impl JsonRpcServerBuilder {
73    pub fn new(
74        version: &str,
75        prometheus_registry: &Registry,
76        traffic_controller: Option<Arc<TrafficController>>,
77        policy_config: Option<PolicyConfig>,
78    ) -> Self {
79        Self {
80            module: RpcModule::new(()),
81            rpc_doc: sui_rpc_doc(version),
82            registry: prometheus_registry.clone(),
83            traffic_controller,
84            policy_config,
85        }
86    }
87
88    pub fn register_module<T: SuiRpcModule>(&mut self, module: T) -> Result<(), Error> {
89        self.rpc_doc.add_module(T::rpc_doc_module());
90        Ok(self.module.merge(module.rpc())?)
91    }
92
93    fn trace_layer() -> TraceLayer<
94        tower_http::classify::SharedClassifier<tower_http::classify::ServerErrorsAsFailures>,
95        impl tower_http::trace::MakeSpan<Body> + Clone,
96        (),
97        (),
98        (),
99        (),
100        (),
101    > {
102        TraceLayer::new_for_http()
103            .make_span_with(|request: &Request<Body>| {
104                let request_id = request
105                    .headers()
106                    .get("x-req-id")
107                    .and_then(|v| v.to_str().ok())
108                    .map(tracing::field::display);
109
110                let origin = request
111                    .headers()
112                    .get("origin")
113                    .and_then(|v| v.to_str().ok())
114                    .map(tracing::field::display);
115
116                tracing::info_span!(
117                    "json-rpc-request",
118                    "x-req-id" = request_id,
119                    "origin" = origin
120                )
121            })
122            .on_request(())
123            .on_response(())
124            .on_body_chunk(())
125            .on_eos(())
126            .on_failure(())
127    }
128
129    pub async fn to_router(&self, server_type: ServerType) -> Result<axum::Router, Error> {
130        let rpc_docs = self.rpc_doc.clone();
131        let mut module = self.module.clone();
132        module.register_method("rpc.discover", move |_, _, _| {
133            Ok::<_, jsonrpsee::types::ErrorObjectOwned>(rpc_docs.clone())
134        })?;
135        let methods_names = module.method_names().collect::<Vec<_>>();
136
137        let metrics = Arc::new(Metrics::new(&self.registry, &methods_names));
138        let client_id_source = self
139            .policy_config
140            .clone()
141            .map(|policy| policy.client_id_source);
142
143        let metrics_clone = metrics.clone();
144        let middleware = ServiceBuilder::new()
145            .layer(Self::trace_layer())
146            .map_request(move |mut request: http::Request<_>| {
147                metrics_clone.on_http_request(request.headers());
148                if let Some(client_id_source) = client_id_source.clone() {
149                    traffic_control::determine_client_ip(client_id_source, &mut request);
150                }
151                request
152            });
153
154        let (stop_handle, server_handle) = jsonrpsee::server::stop_channel();
155        std::mem::forget(server_handle);
156
157        let timeout = std::env::var("JSON_RPC_TIMEOUT")
158            .ok()
159            .and_then(|value| value.parse::<u64>().ok())
160            .unwrap_or(60);
161
162        let traffic_controller = self.traffic_controller.clone();
163        let rpc_middleware = jsonrpsee::server::middleware::rpc::RpcServiceBuilder::new()
164            .layer_fn(move |s| TimeoutLayer::new(s, Duration::from_secs(timeout)))
165            .layer_fn(move |s| MetricsLayer::new(s, metrics.clone()))
166            .layer_fn({
167                let traffic_controller = traffic_controller.clone();
168                move |s| TrafficControllerService::new(s, traffic_controller.clone())
169            });
170        let service_builder = jsonrpsee::server::ServerBuilder::new()
171            // Since we're not using jsonrpsee's server to actually handle connections this value
172            // is instead limiting the number of concurrent requests and has no impact on the
173            // number of connections. As such, for now we can just set this to a very high value to
174            // disable it artificially limiting us to ~100 conncurrent requests.
175            .max_connections(u32::MAX)
176            // Before we updated jsonrpsee, batches were disabled so lets keep them disabled.
177            .set_batch_request_config(jsonrpsee::server::BatchRequestConfig::Disabled)
178            // We don't limit response body sizes.
179            .max_response_body_size(u32::MAX)
180            .set_rpc_middleware(rpc_middleware);
181
182        let mut router = axum::Router::new();
183        match server_type {
184            ServerType::WebSocket => {
185                let service = JsonRpcService(
186                    service_builder
187                        .ws_only()
188                        .to_service_builder()
189                        .build(module, stop_handle),
190                );
191                router = router
192                    .route("/", axum::routing::get_service(service.clone()))
193                    .route("/subscribe", axum::routing::get_service(service));
194            }
195            ServerType::Http => {
196                let service = JsonRpcService(
197                    service_builder
198                        .http_only()
199                        .to_service_builder()
200                        .build(module, stop_handle),
201                );
202                router = router
203                    .route("/", axum::routing::post_service(service.clone()))
204                    .route("/json-rpc", axum::routing::post_service(service.clone()))
205                    .route("/public", axum::routing::post_service(service));
206            }
207            ServerType::Both => {
208                let service = JsonRpcService(
209                    service_builder
210                        .to_service_builder()
211                        .build(module, stop_handle),
212                );
213                router = router
214                    .route("/", axum::routing::post_service(service.clone()))
215                    .route("/", axum::routing::get_service(service.clone()))
216                    .route("/subscribe", axum::routing::get_service(service.clone()))
217                    .route("/json-rpc", axum::routing::post_service(service.clone()))
218                    .route("/public", axum::routing::post_service(service));
219            }
220        }
221
222        let app = router.layer(middleware);
223
224        info!("Available JSON-RPC methods : {:?}", methods_names);
225
226        Ok(app)
227    }
228
229    pub async fn start(
230        self,
231        listen_address: SocketAddr,
232        _custom_runtime: Option<Handle>,
233        server_type: ServerType,
234        cancel: Option<CancellationToken>,
235    ) -> Result<ServerHandle, Error> {
236        let app = self.to_router(server_type).await?;
237
238        let listener = tokio::net::TcpListener::bind(&listen_address)
239            .await
240            .unwrap();
241        let addr = listener.local_addr().unwrap();
242
243        let handle = tokio::spawn(async move {
244            axum::serve(
245                listener,
246                app.into_make_service_with_connect_info::<SocketAddr>(),
247            )
248            .await
249            .unwrap();
250            if let Some(cancel) = cancel {
251                // Signal that the server is shutting down, so other tasks can clean-up.
252                cancel.cancel();
253            }
254        });
255
256        let handle = ServerHandle {
257            handle: ServerHandleInner::Axum(handle),
258        };
259        info!(local_addr =? addr, "Sui JSON-RPC server listening on {addr}");
260        Ok(handle)
261    }
262}
263
264pub struct ServerHandle {
265    handle: ServerHandleInner,
266}
267
268impl ServerHandle {
269    pub async fn stopped(self) {
270        match self.handle {
271            ServerHandleInner::Axum(handle) => handle.await.unwrap(),
272        }
273    }
274}
275
276enum ServerHandleInner {
277    Axum(tokio::task::JoinHandle<()>),
278}
279
280pub trait SuiRpcModule
281where
282    Self: Sized,
283{
284    fn rpc(self) -> RpcModule<Self>;
285    fn rpc_doc_module() -> Module;
286}
287
288use crate::metrics::TimeoutLayer;
289use jsonrpsee::core::BoxError;
290
291#[derive(Clone)]
292struct JsonRpcService<S>(S);
293
294impl<S, RequestBody> tower::Service<http::Request<RequestBody>> for JsonRpcService<S>
295where
296    S: tower::Service<
297            http::Request<RequestBody>,
298            Error = BoxError,
299            Response = http::Response<jsonrpsee::server::HttpBody>,
300            Future: Send + 'static,
301        >,
302{
303    type Response = http::Response<jsonrpsee::server::HttpBody>;
304    type Error = std::convert::Infallible;
305    type Future = std::pin::Pin<
306        Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
307    >;
308
309    fn poll_ready(
310        &mut self,
311        _cx: &mut std::task::Context<'_>,
312    ) -> std::task::Poll<Result<(), Self::Error>> {
313        std::task::Poll::Ready(Ok(()))
314    }
315
316    fn call(&mut self, request: http::Request<RequestBody>) -> Self::Future {
317        let fut = self.0.call(request);
318        Box::pin(async move {
319            match fut.await {
320                Ok(response) => Ok(response),
321                Err(e) => Ok(http::Response::builder()
322                    .status(http::status::StatusCode::INTERNAL_SERVER_ERROR)
323                    .body(jsonrpsee::server::HttpBody::from(e.to_string()))
324                    .unwrap()),
325            }
326        })
327    }
328}