1use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context as _;
9use api::checkpoints::Checkpoints;
10use api::coin::{Coins, DelegationCoins};
11use api::dynamic_fields::DynamicFields;
12use api::move_utils::MoveUtils;
13use api::name_service::NameService;
14use api::objects::{Objects, QueryObjects};
15use api::rpc_module::RpcModule;
16use api::transactions::{QueryTransactions, Transactions};
17use api::write::Write;
18use config::RpcConfig;
19use jsonrpsee::server::{BatchRequestConfig, RpcServiceBuilder, ServerBuilder};
20use metrics::RpcMetrics;
21use metrics::middleware::MetricsLayer;
22use prometheus::Registry;
23use serde_json::json;
24use sui_futures::service::Service;
25use sui_indexer_alt_reader::bigtable_reader::BigtableArgs;
26use sui_indexer_alt_reader::pg_reader::db::DbArgs;
27use sui_indexer_alt_reader::system_package_task::{SystemPackageTask, SystemPackageTaskArgs};
28use sui_open_rpc::Project;
29use timeout::TimeoutLayer;
30use tower_layer::Identity;
31use tracing::{info, warn};
32use url::Url;
33
34use crate::api::governance::{DelegationGovernance, Governance};
35use crate::context::Context;
36
37pub mod api;
38pub mod args;
39pub mod config;
40mod context;
41pub mod data;
42mod error;
43mod metrics;
44mod paginate;
45mod timeout;
46
47#[derive(clap::Args, Debug, Clone)]
48pub struct RpcArgs {
49 #[clap(long, default_value_t = Self::default().rpc_listen_address)]
51 pub rpc_listen_address: SocketAddr,
52
53 #[clap(long, default_value_t = Self::default().max_in_flight_requests)]
56 pub max_in_flight_requests: u32,
57
58 #[clap(long, default_value_t = Self::default().request_timeout_ms)]
61 pub request_timeout_ms: u64,
62
63 #[clap(long, default_value_t = Self::default().slow_request_threshold_ms)]
66 pub slow_request_threshold_ms: u64,
67}
68
69pub struct RpcService {
70 rpc_listen_address: SocketAddr,
72
73 server: ServerBuilder<Identity, Identity>,
75
76 metrics: Arc<RpcMetrics>,
78
79 request_timeout: Duration,
81
82 slow_request_threshold: Duration,
84
85 modules: jsonrpsee::RpcModule<()>,
87
88 schema: Project,
90}
91
92impl RpcArgs {
93 fn request_timeout(&self) -> Duration {
95 Duration::from_millis(self.request_timeout_ms)
96 }
97
98 fn slow_request_threshold(&self) -> Duration {
101 Duration::from_millis(self.slow_request_threshold_ms)
102 }
103}
104
105impl RpcService {
106 pub fn new(rpc_args: RpcArgs, registry: &Registry) -> anyhow::Result<Self> {
109 let metrics = RpcMetrics::new(registry);
110
111 let server = ServerBuilder::new()
112 .http_only()
113 .max_connections(rpc_args.max_in_flight_requests)
116 .max_response_body_size(u32::MAX)
117 .set_batch_request_config(BatchRequestConfig::Disabled);
118
119 let schema = Project::new(
120 env!("CARGO_PKG_VERSION"),
121 "Sui JSON-RPC",
122 "A JSON-RPC API for interacting with the Sui blockchain.",
123 "Mysten Labs",
124 "https://mystenlabs.com",
125 "build@mystenlabs.com",
126 "Apache-2.0",
127 "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
128 );
129
130 Ok(Self {
131 rpc_listen_address: rpc_args.rpc_listen_address,
132 server,
133 metrics,
134 request_timeout: rpc_args.request_timeout(),
135 slow_request_threshold: rpc_args.slow_request_threshold(),
136 modules: jsonrpsee::RpcModule::new(()),
137 schema,
138 })
139 }
140
141 pub fn metrics(&self) -> Arc<RpcMetrics> {
143 self.metrics.clone()
144 }
145
146 pub fn add_module(&mut self, module: impl RpcModule) -> anyhow::Result<()> {
149 self.schema.add_module(module.schema());
150 self.modules
151 .merge(module.into_impl().remove_context())
152 .context("Failed to add module because of a name conflict")
153 }
154
155 pub async fn run(self) -> anyhow::Result<Service> {
158 let Self {
159 rpc_listen_address,
160 server,
161 metrics,
162 request_timeout,
163 slow_request_threshold,
164 mut modules,
165 schema,
166 } = self;
167
168 info!("Starting JSON-RPC service on {rpc_listen_address}",);
169 info!("Serving schema: {}", serde_json::to_string_pretty(&schema)?);
170
171 modules
173 .register_method("rpc.discover", move |_, _, _| json!(schema.clone()))
174 .context("Failed to add schema discovery method")?;
175
176 let middleware = RpcServiceBuilder::new()
177 .layer(TimeoutLayer::new(request_timeout))
178 .layer(MetricsLayer::new(
179 metrics,
180 modules.method_names().map(|n| n.to_owned()).collect(),
181 slow_request_threshold,
182 ));
183
184 let handle = server
185 .set_rpc_middleware(middleware)
186 .set_http_middleware(
187 tower::builder::ServiceBuilder::new().layer(
188 tower_http::cors::CorsLayer::new()
189 .allow_methods([http::Method::GET, http::Method::POST])
190 .allow_origin(tower_http::cors::Any)
191 .allow_headers(tower_http::cors::Any),
192 ),
193 )
194 .build(rpc_listen_address)
195 .await
196 .context("Failed to bind JSON-RPC service")?
197 .start(modules);
198
199 let signal = handle.clone();
200 Ok(Service::new()
201 .with_shutdown_signal(async move {
202 let _ = signal.stop();
203 })
204 .spawn(async move {
205 handle.stopped().await;
206 Ok(())
207 }))
208 }
209}
210
211impl Default for RpcArgs {
212 fn default() -> Self {
213 Self {
214 rpc_listen_address: "0.0.0.0:6000".parse().unwrap(),
215 max_in_flight_requests: 2000,
216 request_timeout_ms: 60_000,
217 slow_request_threshold_ms: 15_000,
218 }
219 }
220}
221
222#[derive(clap::Args, Debug, Clone, Default)]
223pub struct NodeArgs {
224 #[arg(long)]
227 pub fullnode_rpc_url: Option<url::Url>,
228}
229
230pub async fn start_rpc(
247 database_url: Option<Url>,
248 bigtable_instance: Option<String>,
249 db_args: DbArgs,
250 bigtable_args: BigtableArgs,
251 rpc_args: RpcArgs,
252 node_args: NodeArgs,
253 system_package_task_args: SystemPackageTaskArgs,
254 rpc_config: RpcConfig,
255 registry: &Registry,
256) -> anyhow::Result<Service> {
257 let mut rpc = RpcService::new(rpc_args, registry).context("Failed to create RPC service")?;
258
259 let context = Context::new(
260 database_url,
261 bigtable_instance,
262 db_args,
263 bigtable_args,
264 rpc_config,
265 rpc.metrics(),
266 registry,
267 )
268 .await?;
269
270 let system_package_task = SystemPackageTask::new(
271 system_package_task_args,
272 context.pg_reader().clone(),
273 context.package_resolver().package_store().clone(),
274 );
275
276 rpc.add_module(Checkpoints(context.clone()))?;
277 rpc.add_module(Coins(context.clone()))?;
278 rpc.add_module(DynamicFields(context.clone()))?;
279 rpc.add_module(Governance(context.clone()))?;
280 rpc.add_module(MoveUtils(context.clone()))?;
281 rpc.add_module(NameService(context.clone()))?;
282 rpc.add_module(Objects(context.clone()))?;
283 rpc.add_module(QueryObjects(context.clone()))?;
284 rpc.add_module(QueryTransactions(context.clone()))?;
285 rpc.add_module(Transactions(context.clone()))?;
286
287 if let Some(fullnode_rpc_url) = node_args.fullnode_rpc_url {
288 rpc.add_module(DelegationCoins::new(
289 fullnode_rpc_url.clone(),
290 context.config().node.clone(),
291 )?)?;
292 rpc.add_module(DelegationGovernance::new(
293 fullnode_rpc_url.clone(),
294 context.config().node.clone(),
295 )?)?;
296 rpc.add_module(Write::new(fullnode_rpc_url, context.config().node.clone())?)?;
297 } else {
298 warn!(
299 "No fullnode rpc url provided, DelegationCoins, DelegationGovernance, and Write modules will not be added."
300 );
301 }
302
303 let s_rpc = rpc.run().await.context("Failed to start RPC service")?;
304 let s_system_package_task = system_package_task.run();
305
306 Ok(s_rpc.attach(s_system_package_task))
307}
308
309#[cfg(test)]
310mod tests {
311 use std::{
312 collections::BTreeSet,
313 net::{IpAddr, Ipv4Addr, SocketAddr},
314 time::Duration,
315 };
316
317 use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::error::METHOD_NOT_FOUND_CODE};
318 use reqwest::Client;
319 use serde_json::{Value, json};
320 use sui_open_rpc::Module;
321 use sui_open_rpc_macros::open_rpc;
322 use sui_pg_db::temp::get_available_port;
323
324 use super::*;
325
326 #[tokio::test]
327 async fn test_add_module() {
328 let mut rpc = test_service().await;
329
330 rpc.add_module(Foo).unwrap();
331
332 assert_eq!(
333 BTreeSet::from_iter(rpc.modules.method_names()),
334 BTreeSet::from_iter(["test_bar"]),
335 )
336 }
337
338 #[tokio::test]
339 async fn test_add_module_multiple_methods() {
340 let mut rpc = test_service().await;
341
342 rpc.add_module(Bar).unwrap();
343
344 assert_eq!(
345 BTreeSet::from_iter(rpc.modules.method_names()),
346 BTreeSet::from_iter(["test_bar", "test_baz"]),
347 )
348 }
349
350 #[tokio::test]
351 async fn test_add_multiple_modules() {
352 let mut rpc = test_service().await;
353
354 rpc.add_module(Foo).unwrap();
355 rpc.add_module(Baz).unwrap();
356
357 assert_eq!(
358 BTreeSet::from_iter(rpc.modules.method_names()),
359 BTreeSet::from_iter(["test_bar", "test_baz"]),
360 )
361 }
362
363 #[tokio::test]
364 async fn test_add_module_conflict() {
365 let mut rpc = test_service().await;
366
367 rpc.add_module(Foo).unwrap();
368 assert!(rpc.add_module(Bar).is_err(),)
369 }
370
371 #[tokio::test]
372 async fn test_graceful_shutdown() {
373 let rpc = test_service().await;
374 let svc = rpc.run().await.unwrap();
375
376 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
377 .await
378 .expect("Shutdown should not timeout")
379 .expect("Shutdown should succeed");
380 }
381
382 #[tokio::test]
383 async fn test_rpc_discovery() {
384 let rpc_listen_address = test_listen_address();
385 let mut rpc = RpcService::new(
386 RpcArgs {
387 rpc_listen_address,
388 ..Default::default()
389 },
390 &Registry::new(),
391 )
392 .unwrap();
393
394 rpc.add_module(Foo).unwrap();
395 rpc.add_module(Baz).unwrap();
396
397 let svc = rpc.run().await.unwrap();
398
399 let url = format!("http://{rpc_listen_address}/");
400 let client = Client::new();
401
402 let resp: Value = client
403 .post(&url)
404 .json(&json!({
405 "jsonrpc": "2.0",
406 "method": "rpc.discover",
407 "id": 1,
408 }))
409 .send()
410 .await
411 .expect("Request should succeed")
412 .json()
413 .await
414 .expect("Deserialization should succeed");
415
416 assert_eq!(resp["result"]["info"]["title"], "Sui JSON-RPC");
417 assert_eq!(
418 resp["result"]["methods"],
419 json!([
420 {
421 "name": "test_bar",
422 "tags": [{
423 "name": "Test API"
424 }],
425 "params": [],
426 "result": {
427 "name": "u64",
428 "required": true,
429 "schema": {
430 "type": "integer",
431 "format": "uint64",
432 "minimum": 0.0
433 }
434 }
435 },
436 {
437 "name": "test_baz",
438 "tags": [{
439 "name": "Test API"
440 }],
441 "params": [],
442 "result": {
443 "name": "u64",
444 "required": true,
445 "schema": {
446 "type": "integer",
447 "format": "uint64",
448 "minimum": 0.0
449 }
450 }
451 }
452 ])
453 );
454
455 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
456 .await
457 .expect("Shutdown should not timeout")
458 .expect("Shutdown should succeed");
459 }
460
461 #[tokio::test]
462 async fn test_request_metrics() {
463 let rpc_listen_address = test_listen_address();
464 let mut rpc = RpcService::new(
465 RpcArgs {
466 rpc_listen_address,
467 ..Default::default()
468 },
469 &Registry::new(),
470 )
471 .unwrap();
472
473 rpc.add_module(Foo).unwrap();
474
475 let metrics = rpc.metrics();
476 let svc = rpc.run().await.unwrap();
477
478 let url = format!("http://{rpc_listen_address}/");
479 let client = Client::new();
480
481 client
482 .post(&url)
483 .json(&json!({
484 "jsonrpc": "2.0",
485 "method": "test_bar",
486 "id": 1,
487 }))
488 .send()
489 .await
490 .expect("Request should succeed");
491
492 client
493 .post(&url)
494 .json(&json!({
495 "jsonrpc": "2.0",
496 "method": "test_baz",
497 "id": 1,
498 }))
499 .send()
500 .await
501 .expect("Request should succeed");
502
503 assert_eq!(
504 metrics
505 .requests_received
506 .with_label_values(&["test_bar"])
507 .get(),
508 1
509 );
510
511 assert_eq!(
512 metrics
513 .requests_succeeded
514 .with_label_values(&["test_bar"])
515 .get(),
516 1
517 );
518
519 assert_eq!(
520 metrics
521 .requests_received
522 .with_label_values(&["<UNKNOWN>"])
523 .get(),
524 1
525 );
526
527 assert_eq!(
528 metrics
529 .requests_succeeded
530 .with_label_values(&["<UNKNOWN>"])
531 .get(),
532 0
533 );
534
535 assert_eq!(
536 metrics
537 .requests_failed
538 .with_label_values(&["<UNKNOWN>", &format!("{METHOD_NOT_FOUND_CODE}")])
539 .get(),
540 1
541 );
542
543 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
544 .await
545 .expect("Shutdown should not timeout")
546 .expect("Shutdown should succeed");
547 }
548
549 #[open_rpc(namespace = "test", tag = "Test API")]
552 #[rpc(server, namespace = "test")]
553 trait FooApi {
554 #[method(name = "bar")]
555 fn bar(&self) -> RpcResult<u64>;
556 }
557
558 #[open_rpc(namespace = "test", tag = "Test API")]
559 #[rpc(server, namespace = "test")]
560 trait BarApi {
561 #[method(name = "bar")]
562 fn bar(&self) -> RpcResult<u64>;
563
564 #[method(name = "baz")]
565 fn baz(&self) -> RpcResult<u64>;
566 }
567
568 #[open_rpc(namespace = "test", tag = "Test API")]
569 #[rpc(server, namespace = "test")]
570 trait BazApi {
571 #[method(name = "baz")]
572 fn baz(&self) -> RpcResult<u64>;
573 }
574
575 struct Foo;
576 struct Bar;
577 struct Baz;
578
579 impl FooApiServer for Foo {
580 fn bar(&self) -> RpcResult<u64> {
581 Ok(42)
582 }
583 }
584
585 impl BarApiServer for Bar {
586 fn bar(&self) -> RpcResult<u64> {
587 Ok(43)
588 }
589
590 fn baz(&self) -> RpcResult<u64> {
591 Ok(44)
592 }
593 }
594
595 impl BazApiServer for Baz {
596 fn baz(&self) -> RpcResult<u64> {
597 Ok(45)
598 }
599 }
600
601 impl RpcModule for Foo {
602 fn schema(&self) -> Module {
603 FooApiOpenRpc::module_doc()
604 }
605
606 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
607 self.into_rpc()
608 }
609 }
610
611 impl RpcModule for Bar {
612 fn schema(&self) -> Module {
613 BarApiOpenRpc::module_doc()
614 }
615
616 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
617 self.into_rpc()
618 }
619 }
620
621 impl RpcModule for Baz {
622 fn schema(&self) -> Module {
623 BazApiOpenRpc::module_doc()
624 }
625
626 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
627 self.into_rpc()
628 }
629 }
630
631 fn test_listen_address() -> SocketAddr {
632 let port = get_available_port();
633 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
634 }
635
636 async fn test_service() -> RpcService {
637 RpcService::new(
638 RpcArgs {
639 rpc_listen_address: test_listen_address(),
640 ..Default::default()
641 },
642 &Registry::new(),
643 )
644 .expect("Failed to create test JSON-RPC service")
645 }
646}