1use std::net::SocketAddr;
5use std::sync::Arc;
6
7use axum::body::Body;
8use axum::http;
9use hyper::Request;
10use jsonrpsee::RpcModule;
11use metrics::Metrics;
12use metrics::MetricsLayer;
13use prometheus::Registry;
14use std::time::Duration;
15use sui_core::traffic_controller::TrafficController;
16use sui_types::traffic_control::PolicyConfig;
17use tokio::runtime::Handle;
18use tokio_util::sync::CancellationToken;
19use tower::ServiceBuilder;
20use tower_http::trace::TraceLayer;
21use tracing::info;
22
23pub use balance_changes::*;
24pub use object_changes::*;
25pub use sui_config::node::ServerType;
26use sui_open_rpc::{Module, Project};
27use traffic_control::TrafficControllerService;
28
29use crate::error::Error;
30
31pub mod authority_state;
32mod balance_changes;
33pub mod bridge_api;
34pub mod coin_api;
35pub mod error;
36pub mod governance_api;
37pub mod indexer_api;
38pub mod logger;
39mod metrics;
40pub mod move_utils;
41mod object_changes;
42pub mod read_api;
43mod traffic_control;
44pub mod transaction_builder_api;
45pub mod transaction_execution_api;
46
47pub const APP_NAME_HEADER: &str = "app-name";
48
49pub const MAX_REQUEST_SIZE: u32 = 2 << 30;
50
51pub struct JsonRpcServerBuilder {
52 module: RpcModule<()>,
53 rpc_doc: Project,
54 registry: Registry,
55 traffic_controller: Option<Arc<TrafficController>>,
56 policy_config: Option<PolicyConfig>,
57}
58
59pub fn sui_rpc_doc(version: &str) -> Project {
60 Project::new(
61 version,
62 "Sui JSON-RPC",
63 "Sui JSON-RPC API for interaction with Sui Full node. Make RPC calls using https://fullnode.NETWORK.sui.io:443, where NETWORK is the network you want to use (testnet, devnet, mainnet). By default, local networks use port 9000.",
64 "Mysten Labs",
65 "https://mystenlabs.com",
66 "build@mystenlabs.com",
67 "Apache-2.0",
68 "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
69 )
70}
71
72impl JsonRpcServerBuilder {
73 pub fn new(
74 version: &str,
75 prometheus_registry: &Registry,
76 traffic_controller: Option<Arc<TrafficController>>,
77 policy_config: Option<PolicyConfig>,
78 ) -> Self {
79 Self {
80 module: RpcModule::new(()),
81 rpc_doc: sui_rpc_doc(version),
82 registry: prometheus_registry.clone(),
83 traffic_controller,
84 policy_config,
85 }
86 }
87
88 pub fn register_module<T: SuiRpcModule>(&mut self, module: T) -> Result<(), Error> {
89 self.rpc_doc.add_module(T::rpc_doc_module());
90 Ok(self.module.merge(module.rpc())?)
91 }
92
93 fn trace_layer() -> TraceLayer<
94 tower_http::classify::SharedClassifier<tower_http::classify::ServerErrorsAsFailures>,
95 impl tower_http::trace::MakeSpan<Body> + Clone,
96 (),
97 (),
98 (),
99 (),
100 (),
101 > {
102 TraceLayer::new_for_http()
103 .make_span_with(|request: &Request<Body>| {
104 let request_id = request
105 .headers()
106 .get("x-req-id")
107 .and_then(|v| v.to_str().ok())
108 .map(tracing::field::display);
109
110 let origin = request
111 .headers()
112 .get("origin")
113 .and_then(|v| v.to_str().ok())
114 .map(tracing::field::display);
115
116 tracing::info_span!(
117 "json-rpc-request",
118 "x-req-id" = request_id,
119 "origin" = origin
120 )
121 })
122 .on_request(())
123 .on_response(())
124 .on_body_chunk(())
125 .on_eos(())
126 .on_failure(())
127 }
128
129 pub async fn to_router(&self, server_type: ServerType) -> Result<axum::Router, Error> {
130 let rpc_docs = self.rpc_doc.clone();
131 let mut module = self.module.clone();
132 module.register_method("rpc.discover", move |_, _, _| {
133 Ok::<_, jsonrpsee::types::ErrorObjectOwned>(rpc_docs.clone())
134 })?;
135 let methods_names = module.method_names().collect::<Vec<_>>();
136
137 let metrics = Arc::new(Metrics::new(&self.registry, &methods_names));
138 let client_id_source = self
139 .policy_config
140 .clone()
141 .map(|policy| policy.client_id_source);
142
143 let metrics_clone = metrics.clone();
144 let middleware = ServiceBuilder::new()
145 .layer(Self::trace_layer())
146 .map_request(move |mut request: http::Request<_>| {
147 metrics_clone.on_http_request(request.headers());
148 if let Some(client_id_source) = client_id_source.clone() {
149 traffic_control::determine_client_ip(client_id_source, &mut request);
150 }
151 request
152 });
153
154 let (stop_handle, server_handle) = jsonrpsee::server::stop_channel();
155 std::mem::forget(server_handle);
156
157 let timeout = std::env::var("JSON_RPC_TIMEOUT")
158 .ok()
159 .and_then(|value| value.parse::<u64>().ok())
160 .unwrap_or(60);
161
162 let traffic_controller = self.traffic_controller.clone();
163 let rpc_middleware = jsonrpsee::server::middleware::rpc::RpcServiceBuilder::new()
164 .layer_fn(move |s| TimeoutLayer::new(s, Duration::from_secs(timeout)))
165 .layer_fn(move |s| MetricsLayer::new(s, metrics.clone()))
166 .layer_fn({
167 let traffic_controller = traffic_controller.clone();
168 move |s| TrafficControllerService::new(s, traffic_controller.clone())
169 });
170 let service_builder = jsonrpsee::server::ServerBuilder::new()
171 .max_connections(u32::MAX)
176 .set_batch_request_config(jsonrpsee::server::BatchRequestConfig::Disabled)
178 .max_response_body_size(u32::MAX)
180 .set_rpc_middleware(rpc_middleware);
181
182 let mut router = axum::Router::new();
183 match server_type {
184 ServerType::WebSocket => {
185 let service = JsonRpcService(
186 service_builder
187 .ws_only()
188 .to_service_builder()
189 .build(module, stop_handle),
190 );
191 router = router
192 .route("/", axum::routing::get_service(service.clone()))
193 .route("/subscribe", axum::routing::get_service(service));
194 }
195 ServerType::Http => {
196 let service = JsonRpcService(
197 service_builder
198 .http_only()
199 .to_service_builder()
200 .build(module, stop_handle),
201 );
202 router = router
203 .route("/", axum::routing::post_service(service.clone()))
204 .route("/json-rpc", axum::routing::post_service(service.clone()))
205 .route("/public", axum::routing::post_service(service));
206 }
207 ServerType::Both => {
208 let service = JsonRpcService(
209 service_builder
210 .to_service_builder()
211 .build(module, stop_handle),
212 );
213 router = router
214 .route("/", axum::routing::post_service(service.clone()))
215 .route("/", axum::routing::get_service(service.clone()))
216 .route("/subscribe", axum::routing::get_service(service.clone()))
217 .route("/json-rpc", axum::routing::post_service(service.clone()))
218 .route("/public", axum::routing::post_service(service));
219 }
220 }
221
222 let app = router.layer(middleware);
223
224 info!("Available JSON-RPC methods : {:?}", methods_names);
225
226 Ok(app)
227 }
228
229 pub async fn start(
230 self,
231 listen_address: SocketAddr,
232 _custom_runtime: Option<Handle>,
233 server_type: ServerType,
234 cancel: Option<CancellationToken>,
235 ) -> Result<ServerHandle, Error> {
236 let app = self.to_router(server_type).await?;
237
238 let listener = tokio::net::TcpListener::bind(&listen_address)
239 .await
240 .unwrap();
241 let addr = listener.local_addr().unwrap();
242
243 let handle = tokio::spawn(async move {
244 axum::serve(
245 listener,
246 app.into_make_service_with_connect_info::<SocketAddr>(),
247 )
248 .await
249 .unwrap();
250 if let Some(cancel) = cancel {
251 cancel.cancel();
253 }
254 });
255
256 let handle = ServerHandle {
257 handle: ServerHandleInner::Axum(handle),
258 };
259 info!(local_addr =? addr, "Sui JSON-RPC server listening on {addr}");
260 Ok(handle)
261 }
262}
263
264pub struct ServerHandle {
265 handle: ServerHandleInner,
266}
267
268impl ServerHandle {
269 pub async fn stopped(self) {
270 match self.handle {
271 ServerHandleInner::Axum(handle) => handle.await.unwrap(),
272 }
273 }
274}
275
276enum ServerHandleInner {
277 Axum(tokio::task::JoinHandle<()>),
278}
279
280pub trait SuiRpcModule
281where
282 Self: Sized,
283{
284 fn rpc(self) -> RpcModule<Self>;
285 fn rpc_doc_module() -> Module;
286}
287
288use crate::metrics::TimeoutLayer;
289use jsonrpsee::core::BoxError;
290
291#[derive(Clone)]
292struct JsonRpcService<S>(S);
293
294impl<S, RequestBody> tower::Service<http::Request<RequestBody>> for JsonRpcService<S>
295where
296 S: tower::Service<
297 http::Request<RequestBody>,
298 Error = BoxError,
299 Response = http::Response<jsonrpsee::server::HttpBody>,
300 Future: Send + 'static,
301 >,
302{
303 type Response = http::Response<jsonrpsee::server::HttpBody>;
304 type Error = std::convert::Infallible;
305 type Future = std::pin::Pin<
306 Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
307 >;
308
309 fn poll_ready(
310 &mut self,
311 _cx: &mut std::task::Context<'_>,
312 ) -> std::task::Poll<Result<(), Self::Error>> {
313 std::task::Poll::Ready(Ok(()))
314 }
315
316 fn call(&mut self, request: http::Request<RequestBody>) -> Self::Future {
317 let fut = self.0.call(request);
318 Box::pin(async move {
319 match fut.await {
320 Ok(response) => Ok(response),
321 Err(e) => Ok(http::Response::builder()
322 .status(http::status::StatusCode::INTERNAL_SERVER_ERROR)
323 .body(jsonrpsee::server::HttpBody::from(e.to_string()))
324 .unwrap()),
325 }
326 })
327 }
328}