1use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context as _;
9use api::checkpoints::Checkpoints;
10use api::coin::{Coins, DelegationCoins};
11use api::dynamic_fields::DynamicFields;
12use api::move_utils::MoveUtils;
13use api::name_service::NameService;
14use api::objects::{Objects, QueryObjects};
15use api::rpc_module::RpcModule;
16use api::transactions::{QueryTransactions, Transactions};
17use api::write::Write;
18use config::RpcConfig;
19use jsonrpsee::server::{BatchRequestConfig, RpcServiceBuilder, ServerBuilder};
20use metrics::RpcMetrics;
21use metrics::middleware::MetricsLayer;
22use prometheus::Registry;
23use serde_json::json;
24use sui_indexer_alt_reader::bigtable_reader::BigtableArgs;
25use sui_indexer_alt_reader::pg_reader::db::DbArgs;
26use sui_indexer_alt_reader::system_package_task::{SystemPackageTask, SystemPackageTaskArgs};
27use sui_open_rpc::Project;
28use timeout::TimeoutLayer;
29use tokio::task::JoinHandle;
30use tokio_util::sync::CancellationToken;
31use tower_layer::Identity;
32use tracing::{info, warn};
33use url::Url;
34
35use crate::api::governance::{DelegationGovernance, Governance};
36use crate::context::Context;
37
38pub mod api;
39pub mod args;
40pub mod config;
41mod context;
42pub mod data;
43mod error;
44mod metrics;
45mod paginate;
46mod timeout;
47
48#[derive(clap::Args, Debug, Clone)]
49pub struct RpcArgs {
50 #[clap(long, default_value_t = Self::default().rpc_listen_address)]
52 pub rpc_listen_address: SocketAddr,
53
54 #[clap(long, default_value_t = Self::default().max_in_flight_requests)]
57 pub max_in_flight_requests: u32,
58
59 #[clap(long, default_value_t = Self::default().request_timeout_ms)]
62 pub request_timeout_ms: u64,
63
64 #[clap(long, default_value_t = Self::default().slow_request_threshold_ms)]
67 pub slow_request_threshold_ms: u64,
68}
69
70pub struct RpcService {
71 rpc_listen_address: SocketAddr,
73
74 server: ServerBuilder<Identity, Identity>,
76
77 metrics: Arc<RpcMetrics>,
79
80 request_timeout: Duration,
82
83 slow_request_threshold: Duration,
85
86 modules: jsonrpsee::RpcModule<()>,
88
89 schema: Project,
91
92 cancel: CancellationToken,
94}
95
96impl RpcArgs {
97 fn request_timeout(&self) -> Duration {
99 Duration::from_millis(self.request_timeout_ms)
100 }
101
102 fn slow_request_threshold(&self) -> Duration {
105 Duration::from_millis(self.slow_request_threshold_ms)
106 }
107}
108
109impl RpcService {
110 pub fn new(
113 rpc_args: RpcArgs,
114 registry: &Registry,
115 cancel: CancellationToken,
116 ) -> anyhow::Result<Self> {
117 let metrics = RpcMetrics::new(registry);
118
119 let server = ServerBuilder::new()
120 .http_only()
121 .max_connections(rpc_args.max_in_flight_requests)
124 .max_response_body_size(u32::MAX)
125 .set_batch_request_config(BatchRequestConfig::Disabled);
126
127 let schema = Project::new(
128 env!("CARGO_PKG_VERSION"),
129 "Sui JSON-RPC",
130 "A JSON-RPC API for interacting with the Sui blockchain.",
131 "Mysten Labs",
132 "https://mystenlabs.com",
133 "build@mystenlabs.com",
134 "Apache-2.0",
135 "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
136 );
137
138 Ok(Self {
139 rpc_listen_address: rpc_args.rpc_listen_address,
140 server,
141 metrics,
142 request_timeout: rpc_args.request_timeout(),
143 slow_request_threshold: rpc_args.slow_request_threshold(),
144 modules: jsonrpsee::RpcModule::new(()),
145 schema,
146 cancel,
147 })
148 }
149
150 pub fn metrics(&self) -> Arc<RpcMetrics> {
152 self.metrics.clone()
153 }
154
155 pub fn add_module(&mut self, module: impl RpcModule) -> anyhow::Result<()> {
158 self.schema.add_module(module.schema());
159 self.modules
160 .merge(module.into_impl().remove_context())
161 .context("Failed to add module because of a name conflict")
162 }
163
164 pub async fn run(self) -> anyhow::Result<JoinHandle<()>> {
167 let Self {
168 rpc_listen_address,
169 server,
170 metrics,
171 request_timeout,
172 slow_request_threshold,
173 mut modules,
174 schema,
175 cancel,
176 } = self;
177
178 info!("Starting JSON-RPC service on {rpc_listen_address}",);
179 info!("Serving schema: {}", serde_json::to_string_pretty(&schema)?);
180
181 modules
183 .register_method("rpc.discover", move |_, _, _| json!(schema.clone()))
184 .context("Failed to add schema discovery method")?;
185
186 let middleware = RpcServiceBuilder::new()
187 .layer(TimeoutLayer::new(request_timeout))
188 .layer(MetricsLayer::new(
189 metrics,
190 modules.method_names().map(|n| n.to_owned()).collect(),
191 slow_request_threshold,
192 ));
193
194 let handle = server
195 .set_rpc_middleware(middleware)
196 .set_http_middleware(
197 tower::builder::ServiceBuilder::new().layer(
198 tower_http::cors::CorsLayer::new()
199 .allow_methods([http::Method::GET, http::Method::POST])
200 .allow_origin(tower_http::cors::Any)
201 .allow_headers(tower_http::cors::Any),
202 ),
203 )
204 .build(rpc_listen_address)
205 .await
206 .context("Failed to bind JSON-RPC service")?
207 .start(modules);
208
209 let cancel_handle = handle.clone();
212 let cancel_cancel = cancel.clone();
213 let h_cancel = tokio::spawn(async move {
214 cancel_cancel.cancelled().await;
215 cancel_handle.stop()
216 });
217
218 Ok(tokio::spawn(async move {
219 handle.stopped().await;
220 cancel.cancel();
221 let _ = h_cancel.await;
222 }))
223 }
224}
225
226impl Default for RpcArgs {
227 fn default() -> Self {
228 Self {
229 rpc_listen_address: "0.0.0.0:6000".parse().unwrap(),
230 max_in_flight_requests: 2000,
231 request_timeout_ms: 60_000,
232 slow_request_threshold_ms: 15_000,
233 }
234 }
235}
236
237#[derive(clap::Args, Debug, Clone, Default)]
238pub struct NodeArgs {
239 #[arg(long)]
242 pub fullnode_rpc_url: Option<url::Url>,
243}
244
245pub async fn start_rpc(
263 database_url: Option<Url>,
264 bigtable_instance: Option<String>,
265 db_args: DbArgs,
266 bigtable_args: BigtableArgs,
267 rpc_args: RpcArgs,
268 node_args: NodeArgs,
269 system_package_task_args: SystemPackageTaskArgs,
270 rpc_config: RpcConfig,
271 registry: &Registry,
272 cancel: CancellationToken,
273) -> anyhow::Result<JoinHandle<()>> {
274 let mut rpc = RpcService::new(rpc_args, registry, cancel.child_token())
275 .context("Failed to create RPC service")?;
276
277 let context = Context::new(
278 database_url,
279 bigtable_instance,
280 db_args,
281 bigtable_args,
282 rpc_config,
283 rpc.metrics(),
284 registry,
285 cancel.child_token(),
286 )
287 .await?;
288
289 let system_package_task = SystemPackageTask::new(
290 system_package_task_args,
291 context.pg_reader().clone(),
292 context.package_resolver().package_store().clone(),
293 cancel.child_token(),
294 );
295
296 rpc.add_module(Checkpoints(context.clone()))?;
297 rpc.add_module(Coins(context.clone()))?;
298 rpc.add_module(DynamicFields(context.clone()))?;
299 rpc.add_module(Governance(context.clone()))?;
300 rpc.add_module(MoveUtils(context.clone()))?;
301 rpc.add_module(NameService(context.clone()))?;
302 rpc.add_module(Objects(context.clone()))?;
303 rpc.add_module(QueryObjects(context.clone()))?;
304 rpc.add_module(QueryTransactions(context.clone()))?;
305 rpc.add_module(Transactions(context.clone()))?;
306
307 if let Some(fullnode_rpc_url) = node_args.fullnode_rpc_url {
308 rpc.add_module(DelegationCoins::new(
309 fullnode_rpc_url.clone(),
310 context.config().node.clone(),
311 )?)?;
312 rpc.add_module(DelegationGovernance::new(
313 fullnode_rpc_url.clone(),
314 context.config().node.clone(),
315 )?)?;
316 rpc.add_module(Write::new(fullnode_rpc_url, context.config().node.clone())?)?;
317 } else {
318 warn!(
319 "No fullnode rpc url provided, DelegationCoins, DelegationGovernance, and Write modules will not be added."
320 );
321 }
322
323 let h_rpc = rpc.run().await.context("Failed to start RPC service")?;
324 let h_system_package_task = system_package_task.run();
325
326 Ok(tokio::spawn(async move {
327 let _ = h_rpc.await;
328 cancel.cancel();
329 let _ = h_system_package_task.await;
330 }))
331}
332
333#[cfg(test)]
334mod tests {
335 use std::{
336 collections::BTreeSet,
337 net::{IpAddr, Ipv4Addr, SocketAddr},
338 time::Duration,
339 };
340
341 use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::error::METHOD_NOT_FOUND_CODE};
342 use reqwest::Client;
343 use serde_json::{Value, json};
344 use sui_open_rpc::Module;
345 use sui_open_rpc_macros::open_rpc;
346 use sui_pg_db::temp::get_available_port;
347
348 use super::*;
349
350 #[tokio::test]
351 async fn test_add_module() {
352 let mut rpc = test_service().await;
353
354 rpc.add_module(Foo).unwrap();
355
356 assert_eq!(
357 BTreeSet::from_iter(rpc.modules.method_names()),
358 BTreeSet::from_iter(["test_bar"]),
359 )
360 }
361
362 #[tokio::test]
363 async fn test_add_module_multiple_methods() {
364 let mut rpc = test_service().await;
365
366 rpc.add_module(Bar).unwrap();
367
368 assert_eq!(
369 BTreeSet::from_iter(rpc.modules.method_names()),
370 BTreeSet::from_iter(["test_bar", "test_baz"]),
371 )
372 }
373
374 #[tokio::test]
375 async fn test_add_multiple_modules() {
376 let mut rpc = test_service().await;
377
378 rpc.add_module(Foo).unwrap();
379 rpc.add_module(Baz).unwrap();
380
381 assert_eq!(
382 BTreeSet::from_iter(rpc.modules.method_names()),
383 BTreeSet::from_iter(["test_bar", "test_baz"]),
384 )
385 }
386
387 #[tokio::test]
388 async fn test_add_module_conflict() {
389 let mut rpc = test_service().await;
390
391 rpc.add_module(Foo).unwrap();
392 assert!(rpc.add_module(Bar).is_err(),)
393 }
394
395 #[tokio::test]
396 async fn test_graceful_shutdown() {
397 let cancel = CancellationToken::new();
398 let rpc = RpcService::new(
399 RpcArgs {
400 rpc_listen_address: test_listen_address(),
401 ..Default::default()
402 },
403 &Registry::new(),
404 cancel.clone(),
405 )
406 .unwrap();
407
408 let handle = rpc.run().await.unwrap();
409
410 cancel.cancel();
411 tokio::time::timeout(Duration::from_millis(500), handle)
412 .await
413 .expect("Shutdown should not timeout")
414 .expect("Shutdown should succeed");
415 }
416
417 #[tokio::test]
418 async fn test_rpc_discovery() {
419 let cancel = CancellationToken::new();
420 let rpc_listen_address = test_listen_address();
421
422 let mut rpc = RpcService::new(
423 RpcArgs {
424 rpc_listen_address,
425 ..Default::default()
426 },
427 &Registry::new(),
428 cancel.clone(),
429 )
430 .unwrap();
431
432 rpc.add_module(Foo).unwrap();
433 rpc.add_module(Baz).unwrap();
434
435 let handle = rpc.run().await.unwrap();
436
437 let url = format!("http://{}/", rpc_listen_address);
438 let client = Client::new();
439
440 let resp: Value = client
441 .post(&url)
442 .json(&json!({
443 "jsonrpc": "2.0",
444 "method": "rpc.discover",
445 "id": 1,
446 }))
447 .send()
448 .await
449 .expect("Request should succeed")
450 .json()
451 .await
452 .expect("Deserialization should succeed");
453
454 assert_eq!(resp["result"]["info"]["title"], "Sui JSON-RPC");
455 assert_eq!(
456 resp["result"]["methods"],
457 json!([
458 {
459 "name": "test_bar",
460 "tags": [{
461 "name": "Test API"
462 }],
463 "params": [],
464 "result": {
465 "name": "u64",
466 "required": true,
467 "schema": {
468 "type": "integer",
469 "format": "uint64",
470 "minimum": 0.0
471 }
472 }
473 },
474 {
475 "name": "test_baz",
476 "tags": [{
477 "name": "Test API"
478 }],
479 "params": [],
480 "result": {
481 "name": "u64",
482 "required": true,
483 "schema": {
484 "type": "integer",
485 "format": "uint64",
486 "minimum": 0.0
487 }
488 }
489 }
490 ])
491 );
492
493 cancel.cancel();
494 tokio::time::timeout(Duration::from_millis(500), handle)
495 .await
496 .expect("Shutdown should not timeout")
497 .expect("Shutdown should succeed");
498 }
499
500 #[tokio::test]
501 async fn test_request_metrics() {
502 let cancel = CancellationToken::new();
503 let rpc_listen_address = test_listen_address();
504
505 let mut rpc = RpcService::new(
506 RpcArgs {
507 rpc_listen_address,
508 ..Default::default()
509 },
510 &Registry::new(),
511 cancel.clone(),
512 )
513 .unwrap();
514
515 rpc.add_module(Foo).unwrap();
516
517 let metrics = rpc.metrics();
518 let handle = rpc.run().await.unwrap();
519
520 let url = format!("http://{}/", rpc_listen_address);
521 let client = Client::new();
522
523 client
524 .post(&url)
525 .json(&json!({
526 "jsonrpc": "2.0",
527 "method": "test_bar",
528 "id": 1,
529 }))
530 .send()
531 .await
532 .expect("Request should succeed");
533
534 client
535 .post(&url)
536 .json(&json!({
537 "jsonrpc": "2.0",
538 "method": "test_baz",
539 "id": 1,
540 }))
541 .send()
542 .await
543 .expect("Request should succeed");
544
545 assert_eq!(
546 metrics
547 .requests_received
548 .with_label_values(&["test_bar"])
549 .get(),
550 1
551 );
552
553 assert_eq!(
554 metrics
555 .requests_succeeded
556 .with_label_values(&["test_bar"])
557 .get(),
558 1
559 );
560
561 assert_eq!(
562 metrics
563 .requests_received
564 .with_label_values(&["<UNKNOWN>"])
565 .get(),
566 1
567 );
568
569 assert_eq!(
570 metrics
571 .requests_succeeded
572 .with_label_values(&["<UNKNOWN>"])
573 .get(),
574 0
575 );
576
577 assert_eq!(
578 metrics
579 .requests_failed
580 .with_label_values(&["<UNKNOWN>", &format!("{METHOD_NOT_FOUND_CODE}")])
581 .get(),
582 1
583 );
584
585 cancel.cancel();
586 tokio::time::timeout(Duration::from_millis(500), handle)
587 .await
588 .expect("Shutdown should not timeout")
589 .expect("Shutdown should succeed");
590 }
591
592 #[open_rpc(namespace = "test", tag = "Test API")]
595 #[rpc(server, namespace = "test")]
596 trait FooApi {
597 #[method(name = "bar")]
598 fn bar(&self) -> RpcResult<u64>;
599 }
600
601 #[open_rpc(namespace = "test", tag = "Test API")]
602 #[rpc(server, namespace = "test")]
603 trait BarApi {
604 #[method(name = "bar")]
605 fn bar(&self) -> RpcResult<u64>;
606
607 #[method(name = "baz")]
608 fn baz(&self) -> RpcResult<u64>;
609 }
610
611 #[open_rpc(namespace = "test", tag = "Test API")]
612 #[rpc(server, namespace = "test")]
613 trait BazApi {
614 #[method(name = "baz")]
615 fn baz(&self) -> RpcResult<u64>;
616 }
617
618 struct Foo;
619 struct Bar;
620 struct Baz;
621
622 impl FooApiServer for Foo {
623 fn bar(&self) -> RpcResult<u64> {
624 Ok(42)
625 }
626 }
627
628 impl BarApiServer for Bar {
629 fn bar(&self) -> RpcResult<u64> {
630 Ok(43)
631 }
632
633 fn baz(&self) -> RpcResult<u64> {
634 Ok(44)
635 }
636 }
637
638 impl BazApiServer for Baz {
639 fn baz(&self) -> RpcResult<u64> {
640 Ok(45)
641 }
642 }
643
644 impl RpcModule for Foo {
645 fn schema(&self) -> Module {
646 FooApiOpenRpc::module_doc()
647 }
648
649 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
650 self.into_rpc()
651 }
652 }
653
654 impl RpcModule for Bar {
655 fn schema(&self) -> Module {
656 BarApiOpenRpc::module_doc()
657 }
658
659 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
660 self.into_rpc()
661 }
662 }
663
664 impl RpcModule for Baz {
665 fn schema(&self) -> Module {
666 BazApiOpenRpc::module_doc()
667 }
668
669 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
670 self.into_rpc()
671 }
672 }
673
674 fn test_listen_address() -> SocketAddr {
675 let port = get_available_port();
676 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
677 }
678
679 async fn test_service() -> RpcService {
680 let cancel = CancellationToken::new();
681 RpcService::new(
682 RpcArgs {
683 rpc_listen_address: test_listen_address(),
684 ..Default::default()
685 },
686 &Registry::new(),
687 cancel,
688 )
689 .expect("Failed to create test JSON-RPC service")
690 }
691}