1use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context as _;
9use jsonrpsee::server::BatchRequestConfig;
10use jsonrpsee::server::RpcServiceBuilder;
11use jsonrpsee::server::ServerBuilder;
12use prometheus::Registry;
13use serde_json::json;
14use sui_futures::service::Service;
15use sui_indexer_alt_reader::bigtable_reader::BigtableArgs;
16use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
17use sui_indexer_alt_reader::pg_reader::db::DbArgs;
18use sui_indexer_alt_reader::system_package_task::SystemPackageTask;
19use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
20use sui_open_rpc::Project;
21use tower_http::catch_panic;
22use tower_layer::Identity;
23use tracing::info;
24use tracing::warn;
25use url::Url;
26
27use crate::api::checkpoints::Checkpoints;
28use crate::api::coin::Coins;
29use crate::api::dynamic_fields::DynamicFields;
30use crate::api::governance::DelegationGovernance;
31use crate::api::governance::Governance;
32use crate::api::move_utils::MoveUtils;
33use crate::api::name_service::NameService;
34use crate::api::objects::Objects;
35use crate::api::objects::QueryObjects;
36use crate::api::rpc_module::RpcModule;
37use crate::api::transactions::QueryTransactions;
38use crate::api::transactions::Transactions;
39use crate::api::write::Write;
40use crate::config::RpcConfig;
41use crate::context::Context;
42use crate::error::PanicHandler;
43use crate::metrics::RpcMetrics;
44use crate::metrics::middleware::MetricsLayer;
45use crate::timeout::TimeoutLayer;
46
47pub mod api;
48pub mod args;
49pub mod config;
50mod context;
51pub mod data;
52mod error;
53mod metrics;
54mod paginate;
55mod timeout;
56
57#[derive(clap::Args, Debug, Clone)]
58pub struct RpcArgs {
59 #[clap(long, default_value_t = Self::default().rpc_listen_address)]
61 pub rpc_listen_address: SocketAddr,
62
63 #[clap(long, default_value_t = Self::default().max_in_flight_requests)]
66 pub max_in_flight_requests: u32,
67
68 #[clap(long, default_value_t = Self::default().request_timeout_ms)]
71 pub request_timeout_ms: u64,
72
73 #[clap(long, default_value_t = Self::default().slow_request_threshold_ms)]
76 pub slow_request_threshold_ms: u64,
77}
78
79pub struct RpcService {
80 rpc_listen_address: SocketAddr,
82
83 server: ServerBuilder<Identity, Identity>,
85
86 metrics: Arc<RpcMetrics>,
88
89 request_timeout: Duration,
91
92 slow_request_threshold: Duration,
94
95 modules: jsonrpsee::RpcModule<()>,
97
98 schema: Project,
100}
101
102impl RpcArgs {
103 fn request_timeout(&self) -> Duration {
105 Duration::from_millis(self.request_timeout_ms)
106 }
107
108 fn slow_request_threshold(&self) -> Duration {
111 Duration::from_millis(self.slow_request_threshold_ms)
112 }
113}
114
115impl RpcService {
116 pub fn new(rpc_args: RpcArgs, registry: &Registry) -> anyhow::Result<Self> {
119 let metrics = RpcMetrics::new(registry);
120
121 let server = ServerBuilder::new()
122 .http_only()
123 .max_connections(rpc_args.max_in_flight_requests)
126 .max_response_body_size(u32::MAX)
127 .set_batch_request_config(BatchRequestConfig::Disabled);
128
129 let schema = Project::new(
130 env!("CARGO_PKG_VERSION"),
131 "Sui JSON-RPC",
132 "A JSON-RPC API for interacting with the Sui blockchain.",
133 "Mysten Labs",
134 "https://mystenlabs.com",
135 "build@mystenlabs.com",
136 "Apache-2.0",
137 "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
138 );
139
140 Ok(Self {
141 rpc_listen_address: rpc_args.rpc_listen_address,
142 server,
143 metrics,
144 request_timeout: rpc_args.request_timeout(),
145 slow_request_threshold: rpc_args.slow_request_threshold(),
146 modules: jsonrpsee::RpcModule::new(()),
147 schema,
148 })
149 }
150
151 pub fn metrics(&self) -> Arc<RpcMetrics> {
153 self.metrics.clone()
154 }
155
156 pub fn add_module(&mut self, module: impl RpcModule) -> anyhow::Result<()> {
159 self.schema.add_module(module.schema());
160 self.modules
161 .merge(module.into_impl().remove_context())
162 .context("Failed to add module because of a name conflict")
163 }
164
165 pub async fn run(self) -> anyhow::Result<Service> {
168 let Self {
169 rpc_listen_address,
170 server,
171 metrics,
172 request_timeout,
173 slow_request_threshold,
174 mut modules,
175 schema,
176 } = self;
177
178 info!("Starting JSON-RPC service on {rpc_listen_address}",);
179 info!("Serving schema: {}", serde_json::to_string_pretty(&schema)?);
180
181 modules
183 .register_method("rpc.discover", move |_, _, _| json!(schema.clone()))
184 .context("Failed to add schema discovery method")?;
185
186 let middleware = RpcServiceBuilder::new()
187 .layer(TimeoutLayer::new(request_timeout))
188 .layer(MetricsLayer::new(
189 metrics.clone(),
190 modules.method_names().map(|n| n.to_owned()).collect(),
191 slow_request_threshold,
192 ));
193
194 let handle = server
195 .set_rpc_middleware(middleware)
196 .set_http_middleware(
197 tower::builder::ServiceBuilder::new()
198 .layer(
199 tower_http::cors::CorsLayer::new()
200 .allow_methods([http::Method::GET, http::Method::POST])
201 .allow_origin(tower_http::cors::Any)
202 .allow_headers(tower_http::cors::Any),
203 )
204 .layer(catch_panic::CatchPanicLayer::custom(PanicHandler::new(
205 metrics,
206 ))),
207 )
208 .build(rpc_listen_address)
209 .await
210 .context("Failed to bind JSON-RPC service")?
211 .start(modules);
212
213 let signal = handle.clone();
214 Ok(Service::new()
215 .with_shutdown_signal(async move {
216 let _ = signal.stop();
217 })
218 .spawn(async move {
219 handle.stopped().await;
220 Ok(())
221 }))
222 }
223}
224
225impl Default for RpcArgs {
226 fn default() -> Self {
227 Self {
228 rpc_listen_address: "0.0.0.0:6000".parse().unwrap(),
229 max_in_flight_requests: 2000,
230 request_timeout_ms: 60_000,
231 slow_request_threshold_ms: 15_000,
232 }
233 }
234}
235
236#[derive(clap::Args, Debug, Clone, Default)]
237pub struct NodeArgs {
238 #[arg(long)]
241 pub fullnode_rpc_url: Option<url::Url>,
242}
243
244pub async fn start_rpc(
261 database_url: Option<Url>,
262 bigtable_instance: Option<String>,
263 db_args: DbArgs,
264 bigtable_args: BigtableArgs,
265 consistent_reader_args: ConsistentReaderArgs,
266 rpc_args: RpcArgs,
267 node_args: NodeArgs,
268 system_package_task_args: SystemPackageTaskArgs,
269 rpc_config: RpcConfig,
270 registry: &Registry,
271) -> anyhow::Result<Service> {
272 let mut rpc = RpcService::new(rpc_args, registry).context("Failed to create RPC service")?;
273
274 let context = Context::new(
275 database_url,
276 bigtable_instance,
277 db_args,
278 bigtable_args,
279 consistent_reader_args,
280 rpc_config,
281 rpc.metrics(),
282 registry,
283 )
284 .await?;
285
286 let system_package_task = SystemPackageTask::new(
287 system_package_task_args,
288 context.pg_reader().clone(),
289 context.package_resolver().package_store().clone(),
290 );
291
292 rpc.add_module(Checkpoints(context.clone()))?;
293 rpc.add_module(Coins(context.clone()))?;
294 rpc.add_module(DynamicFields(context.clone()))?;
295 rpc.add_module(Governance(context.clone()))?;
296 rpc.add_module(MoveUtils(context.clone()))?;
297 rpc.add_module(NameService(context.clone()))?;
298 rpc.add_module(Objects(context.clone()))?;
299 rpc.add_module(QueryObjects(context.clone()))?;
300 rpc.add_module(QueryTransactions(context.clone()))?;
301 rpc.add_module(Transactions(context.clone()))?;
302
303 if let Some(fullnode_rpc_url) = node_args.fullnode_rpc_url {
304 let client = context.config().node.client(fullnode_rpc_url)?;
305 rpc.add_module(DelegationGovernance::new(client.clone()))?;
306 rpc.add_module(Write::new(client))?;
307 } else {
308 warn!(
309 "No fullnode rpc url provided, DelegationGovernance and Write modules will not be added."
310 );
311 }
312
313 let s_rpc = rpc.run().await.context("Failed to start RPC service")?;
314 let s_system_package_task = system_package_task.run();
315
316 Ok(s_rpc.attach(s_system_package_task))
317}
318
319#[cfg(test)]
320mod tests {
321 use std::collections::BTreeSet;
322 use std::net::IpAddr;
323 use std::net::Ipv4Addr;
324 use std::net::SocketAddr;
325 use std::time::Duration;
326
327 use jsonrpsee::core::RpcResult;
328 use jsonrpsee::proc_macros::rpc;
329 use jsonrpsee::types::error::INTERNAL_ERROR_CODE;
330 use jsonrpsee::types::error::METHOD_NOT_FOUND_CODE;
331 use reqwest::Client;
332 use serde_json::Value;
333 use serde_json::json;
334 use sui_open_rpc::Module;
335 use sui_open_rpc_macros::open_rpc;
336 use sui_pg_db::temp::get_available_port;
337
338 use super::*;
339
340 #[tokio::test]
341 async fn test_add_module() {
342 let mut rpc = test_service().await;
343
344 rpc.add_module(Foo).unwrap();
345
346 assert_eq!(
347 BTreeSet::from_iter(rpc.modules.method_names()),
348 BTreeSet::from_iter(["test_bar"]),
349 )
350 }
351
352 #[tokio::test]
353 async fn test_add_module_multiple_methods() {
354 let mut rpc = test_service().await;
355
356 rpc.add_module(Bar).unwrap();
357
358 assert_eq!(
359 BTreeSet::from_iter(rpc.modules.method_names()),
360 BTreeSet::from_iter(["test_bar", "test_baz"]),
361 )
362 }
363
364 #[tokio::test]
365 async fn test_add_multiple_modules() {
366 let mut rpc = test_service().await;
367
368 rpc.add_module(Foo).unwrap();
369 rpc.add_module(Baz).unwrap();
370
371 assert_eq!(
372 BTreeSet::from_iter(rpc.modules.method_names()),
373 BTreeSet::from_iter(["test_bar", "test_baz"]),
374 )
375 }
376
377 #[tokio::test]
378 async fn test_add_module_conflict() {
379 let mut rpc = test_service().await;
380
381 rpc.add_module(Foo).unwrap();
382 assert!(rpc.add_module(Bar).is_err(),)
383 }
384
385 #[tokio::test]
386 async fn test_graceful_shutdown() {
387 let rpc = test_service().await;
388 let svc = rpc.run().await.unwrap();
389
390 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
391 .await
392 .expect("Shutdown should not timeout")
393 .expect("Shutdown should succeed");
394 }
395
396 #[tokio::test]
397 async fn test_rpc_discovery() {
398 let rpc_listen_address = test_listen_address();
399 let mut rpc = RpcService::new(
400 RpcArgs {
401 rpc_listen_address,
402 ..Default::default()
403 },
404 &Registry::new(),
405 )
406 .unwrap();
407
408 rpc.add_module(Foo).unwrap();
409 rpc.add_module(Baz).unwrap();
410
411 let svc = rpc.run().await.unwrap();
412
413 let url = format!("http://{rpc_listen_address}/");
414 let client = Client::new();
415
416 let resp: Value = client
417 .post(&url)
418 .json(&json!({
419 "jsonrpc": "2.0",
420 "method": "rpc.discover",
421 "id": 1,
422 }))
423 .send()
424 .await
425 .expect("Request should succeed")
426 .json()
427 .await
428 .expect("Deserialization should succeed");
429
430 assert_eq!(resp["result"]["info"]["title"], "Sui JSON-RPC");
431 assert_eq!(
432 resp["result"]["methods"],
433 json!([
434 {
435 "name": "test_bar",
436 "tags": [{
437 "name": "Test API"
438 }],
439 "params": [],
440 "result": {
441 "name": "u64",
442 "required": true,
443 "schema": {
444 "type": "integer",
445 "format": "uint64",
446 "minimum": 0.0
447 }
448 }
449 },
450 {
451 "name": "test_baz",
452 "tags": [{
453 "name": "Test API"
454 }],
455 "params": [],
456 "result": {
457 "name": "u64",
458 "required": true,
459 "schema": {
460 "type": "integer",
461 "format": "uint64",
462 "minimum": 0.0
463 }
464 }
465 }
466 ])
467 );
468
469 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
470 .await
471 .expect("Shutdown should not timeout")
472 .expect("Shutdown should succeed");
473 }
474
475 #[tokio::test]
476 async fn test_request_metrics() {
477 let rpc_listen_address = test_listen_address();
478 let mut rpc = RpcService::new(
479 RpcArgs {
480 rpc_listen_address,
481 ..Default::default()
482 },
483 &Registry::new(),
484 )
485 .unwrap();
486
487 rpc.add_module(Foo).unwrap();
488
489 let metrics = rpc.metrics();
490 let svc = rpc.run().await.unwrap();
491
492 let url = format!("http://{rpc_listen_address}/");
493 let client = Client::new();
494
495 client
496 .post(&url)
497 .json(&json!({
498 "jsonrpc": "2.0",
499 "method": "test_bar",
500 "id": 1,
501 }))
502 .send()
503 .await
504 .expect("Request should succeed");
505
506 client
507 .post(&url)
508 .json(&json!({
509 "jsonrpc": "2.0",
510 "method": "test_baz",
511 "id": 1,
512 }))
513 .send()
514 .await
515 .expect("Request should succeed");
516
517 assert_eq!(
518 metrics
519 .requests_received
520 .with_label_values(&["test_bar"])
521 .get(),
522 1
523 );
524
525 assert_eq!(
526 metrics
527 .requests_succeeded
528 .with_label_values(&["test_bar"])
529 .get(),
530 1
531 );
532
533 assert_eq!(
534 metrics
535 .requests_received
536 .with_label_values(&["<UNKNOWN>"])
537 .get(),
538 1
539 );
540
541 assert_eq!(
542 metrics
543 .requests_succeeded
544 .with_label_values(&["<UNKNOWN>"])
545 .get(),
546 0
547 );
548
549 assert_eq!(
550 metrics
551 .requests_failed
552 .with_label_values(&["<UNKNOWN>", &format!("{METHOD_NOT_FOUND_CODE}")])
553 .get(),
554 1
555 );
556
557 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
558 .await
559 .expect("Shutdown should not timeout")
560 .expect("Shutdown should succeed");
561 }
562
563 #[tokio::test]
564 async fn test_panic_handling() {
565 let rpc_listen_address = test_listen_address();
566 let mut rpc = RpcService::new(
567 RpcArgs {
568 rpc_listen_address,
569 ..Default::default()
570 },
571 &Registry::new(),
572 )
573 .unwrap();
574
575 rpc.add_module(Panic).unwrap();
576
577 let metrics = rpc.metrics();
578 let svc = rpc.run().await.unwrap();
579
580 let url = format!("http://{rpc_listen_address}/");
581 let client = Client::new();
582
583 let resp = client
584 .post(&url)
585 .json(&json!({
586 "jsonrpc": "2.0",
587 "method": "test_panic",
588 "id": 1,
589 }))
590 .send()
591 .await
592 .expect("Request should succeed");
593
594 let body: Value = resp.json().await.expect("Response should be JSON");
595
596 assert_eq!(body["jsonrpc"], "2.0");
598 assert_eq!(body["error"]["code"], INTERNAL_ERROR_CODE);
599 assert!(body["error"]["message"].as_str().unwrap().contains("Boom!"));
600
601 assert_eq!(metrics.requests_panicked.get(), 1);
603
604 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
605 .await
606 .expect("Shutdown should not timeout")
607 .expect("Shutdown should succeed");
608 }
609
610 #[open_rpc(namespace = "test", tag = "Test API")]
613 #[rpc(server, namespace = "test")]
614 trait FooApi {
615 #[method(name = "bar")]
616 fn bar(&self) -> RpcResult<u64>;
617 }
618
619 #[open_rpc(namespace = "test", tag = "Test API")]
620 #[rpc(server, namespace = "test")]
621 trait BarApi {
622 #[method(name = "bar")]
623 fn bar(&self) -> RpcResult<u64>;
624
625 #[method(name = "baz")]
626 fn baz(&self) -> RpcResult<u64>;
627 }
628
629 #[open_rpc(namespace = "test", tag = "Test API")]
630 #[rpc(server, namespace = "test")]
631 trait BazApi {
632 #[method(name = "baz")]
633 fn baz(&self) -> RpcResult<u64>;
634 }
635
636 #[open_rpc(namespace = "test", tag = "Test API")]
637 #[rpc(server, namespace = "test")]
638 trait PanicApi {
639 #[method(name = "panic")]
640 fn panic(&self) -> RpcResult<u64>;
641 }
642
643 struct Foo;
644 struct Bar;
645 struct Baz;
646 struct Panic;
647
648 impl FooApiServer for Foo {
649 fn bar(&self) -> RpcResult<u64> {
650 Ok(42)
651 }
652 }
653
654 impl BarApiServer for Bar {
655 fn bar(&self) -> RpcResult<u64> {
656 Ok(43)
657 }
658
659 fn baz(&self) -> RpcResult<u64> {
660 Ok(44)
661 }
662 }
663
664 impl BazApiServer for Baz {
665 fn baz(&self) -> RpcResult<u64> {
666 Ok(45)
667 }
668 }
669
670 impl PanicApiServer for Panic {
671 fn panic(&self) -> RpcResult<u64> {
672 panic!("Boom!");
673 }
674 }
675
676 impl RpcModule for Foo {
677 fn schema(&self) -> Module {
678 FooApiOpenRpc::module_doc()
679 }
680
681 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
682 self.into_rpc()
683 }
684 }
685
686 impl RpcModule for Bar {
687 fn schema(&self) -> Module {
688 BarApiOpenRpc::module_doc()
689 }
690
691 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
692 self.into_rpc()
693 }
694 }
695
696 impl RpcModule for Baz {
697 fn schema(&self) -> Module {
698 BazApiOpenRpc::module_doc()
699 }
700
701 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
702 self.into_rpc()
703 }
704 }
705
706 impl RpcModule for Panic {
707 fn schema(&self) -> Module {
708 PanicApiOpenRpc::module_doc()
709 }
710
711 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
712 self.into_rpc()
713 }
714 }
715
716 fn test_listen_address() -> SocketAddr {
717 let port = get_available_port();
718 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
719 }
720
721 async fn test_service() -> RpcService {
722 RpcService::new(
723 RpcArgs {
724 rpc_listen_address: test_listen_address(),
725 ..Default::default()
726 },
727 &Registry::new(),
728 )
729 .expect("Failed to create test JSON-RPC service")
730 }
731}