sui_indexer_alt_jsonrpc/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context as _;
9use api::checkpoints::Checkpoints;
10use api::coin::{Coins, DelegationCoins};
11use api::dynamic_fields::DynamicFields;
12use api::move_utils::MoveUtils;
13use api::name_service::NameService;
14use api::objects::{Objects, QueryObjects};
15use api::rpc_module::RpcModule;
16use api::transactions::{QueryTransactions, Transactions};
17use api::write::Write;
18use config::RpcConfig;
19use jsonrpsee::server::{BatchRequestConfig, RpcServiceBuilder, ServerBuilder};
20use metrics::RpcMetrics;
21use metrics::middleware::MetricsLayer;
22use prometheus::Registry;
23use serde_json::json;
24use sui_indexer_alt_reader::bigtable_reader::BigtableArgs;
25use sui_indexer_alt_reader::pg_reader::db::DbArgs;
26use sui_indexer_alt_reader::system_package_task::{SystemPackageTask, SystemPackageTaskArgs};
27use sui_open_rpc::Project;
28use timeout::TimeoutLayer;
29use tokio::task::JoinHandle;
30use tokio_util::sync::CancellationToken;
31use tower_layer::Identity;
32use tracing::{info, warn};
33use url::Url;
34
35use crate::api::governance::{DelegationGovernance, Governance};
36use crate::context::Context;
37
38pub mod api;
39pub mod args;
40pub mod config;
41mod context;
42pub mod data;
43mod error;
44mod metrics;
45mod paginate;
46mod timeout;
47
48#[derive(clap::Args, Debug, Clone)]
49pub struct RpcArgs {
50    /// Address to listen to for incoming JSON-RPC connections.
51    #[clap(long, default_value_t = Self::default().rpc_listen_address)]
52    pub rpc_listen_address: SocketAddr,
53
54    /// The maximum number of concurrent requests to accept. If the service receives more than this
55    /// many requests, it will start responding with 429.
56    #[clap(long, default_value_t = Self::default().max_in_flight_requests)]
57    pub max_in_flight_requests: u32,
58
59    /// Requests that take longer than this (in milliseconds) to respond to will be terminated, and
60    /// the query itself will be logged as a warning.
61    #[clap(long, default_value_t = Self::default().request_timeout_ms)]
62    pub request_timeout_ms: u64,
63
64    /// Requests that take longer than this (in milliseconds) will be logged even if they succeed.
65    /// This should be shorter than `request_timeout_ms`.
66    #[clap(long, default_value_t = Self::default().slow_request_threshold_ms)]
67    pub slow_request_threshold_ms: u64,
68}
69
70pub struct RpcService {
71    /// The address that the server will start listening for requests on, when it is run.
72    rpc_listen_address: SocketAddr,
73
74    /// A partially built/configured JSON-RPC server.
75    server: ServerBuilder<Identity, Identity>,
76
77    /// Metrics for the RPC service.
78    metrics: Arc<RpcMetrics>,
79
80    /// Maximum time a request can take to complete.
81    request_timeout: Duration,
82
83    /// Threshold for logging slow requests.
84    slow_request_threshold: Duration,
85
86    /// All the methods added to the server so far.
87    modules: jsonrpsee::RpcModule<()>,
88
89    /// Description of the schema served by this service.
90    schema: Project,
91
92    /// Cancellation token controlling all services.
93    cancel: CancellationToken,
94}
95
96impl RpcArgs {
97    /// Requests that take longer than this are terminated and logged for debugging.
98    fn request_timeout(&self) -> Duration {
99        Duration::from_millis(self.request_timeout_ms)
100    }
101
102    /// Requests that take longer than this are logged for debugging even if they succeed.
103    /// This threshold should be lower than the request timeout threshold.
104    fn slow_request_threshold(&self) -> Duration {
105        Duration::from_millis(self.slow_request_threshold_ms)
106    }
107}
108
109impl RpcService {
110    /// Create a new instance of the JSON-RPC service, configured by `rpc_args`. The service will
111    /// not accept connections until [Self::run] is called.
112    pub fn new(
113        rpc_args: RpcArgs,
114        registry: &Registry,
115        cancel: CancellationToken,
116    ) -> anyhow::Result<Self> {
117        let metrics = RpcMetrics::new(registry);
118
119        let server = ServerBuilder::new()
120            .http_only()
121            // `jsonrpsee` calls this a limit on connections, but it is implemented as a limit on
122            // requests.
123            .max_connections(rpc_args.max_in_flight_requests)
124            .max_response_body_size(u32::MAX)
125            .set_batch_request_config(BatchRequestConfig::Disabled);
126
127        let schema = Project::new(
128            env!("CARGO_PKG_VERSION"),
129            "Sui JSON-RPC",
130            "A JSON-RPC API for interacting with the Sui blockchain.",
131            "Mysten Labs",
132            "https://mystenlabs.com",
133            "build@mystenlabs.com",
134            "Apache-2.0",
135            "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
136        );
137
138        Ok(Self {
139            rpc_listen_address: rpc_args.rpc_listen_address,
140            server,
141            metrics,
142            request_timeout: rpc_args.request_timeout(),
143            slow_request_threshold: rpc_args.slow_request_threshold(),
144            modules: jsonrpsee::RpcModule::new(()),
145            schema,
146            cancel,
147        })
148    }
149
150    /// Return a copy of the metrics.
151    pub fn metrics(&self) -> Arc<RpcMetrics> {
152        self.metrics.clone()
153    }
154
155    /// Add an `RpcModule` to the service. The module's methods are combined with the existing
156    /// methods registered on the service, and the operation will fail if there is any overlap.
157    pub fn add_module(&mut self, module: impl RpcModule) -> anyhow::Result<()> {
158        self.schema.add_module(module.schema());
159        self.modules
160            .merge(module.into_impl().remove_context())
161            .context("Failed to add module because of a name conflict")
162    }
163
164    /// Start the service (it will accept connections) and return a handle that will resolve when
165    /// the service stops.
166    pub async fn run(self) -> anyhow::Result<JoinHandle<()>> {
167        let Self {
168            rpc_listen_address,
169            server,
170            metrics,
171            request_timeout,
172            slow_request_threshold,
173            mut modules,
174            schema,
175            cancel,
176        } = self;
177
178        info!("Starting JSON-RPC service on {rpc_listen_address}",);
179        info!("Serving schema: {}", serde_json::to_string_pretty(&schema)?);
180
181        // Add a method to serve the schema to clients.
182        modules
183            .register_method("rpc.discover", move |_, _, _| json!(schema.clone()))
184            .context("Failed to add schema discovery method")?;
185
186        let middleware = RpcServiceBuilder::new()
187            .layer(TimeoutLayer::new(request_timeout))
188            .layer(MetricsLayer::new(
189                metrics,
190                modules.method_names().map(|n| n.to_owned()).collect(),
191                slow_request_threshold,
192            ));
193
194        let handle = server
195            .set_rpc_middleware(middleware)
196            .set_http_middleware(
197                tower::builder::ServiceBuilder::new().layer(
198                    tower_http::cors::CorsLayer::new()
199                        .allow_methods([http::Method::GET, http::Method::POST])
200                        .allow_origin(tower_http::cors::Any)
201                        .allow_headers(tower_http::cors::Any),
202                ),
203            )
204            .build(rpc_listen_address)
205            .await
206            .context("Failed to bind JSON-RPC service")?
207            .start(modules);
208
209        // Set-up a helper task that will tear down the RPC service when the cancellation token is
210        // triggered.
211        let cancel_handle = handle.clone();
212        let cancel_cancel = cancel.clone();
213        let h_cancel = tokio::spawn(async move {
214            cancel_cancel.cancelled().await;
215            cancel_handle.stop()
216        });
217
218        Ok(tokio::spawn(async move {
219            handle.stopped().await;
220            cancel.cancel();
221            let _ = h_cancel.await;
222        }))
223    }
224}
225
226impl Default for RpcArgs {
227    fn default() -> Self {
228        Self {
229            rpc_listen_address: "0.0.0.0:6000".parse().unwrap(),
230            max_in_flight_requests: 2000,
231            request_timeout_ms: 60_000,
232            slow_request_threshold_ms: 15_000,
233        }
234    }
235}
236
237#[derive(clap::Args, Debug, Clone, Default)]
238pub struct NodeArgs {
239    /// The URL of the fullnode RPC we connect to for transaction execution,
240    /// dry-running, and delegation coin queries etc.
241    #[arg(long)]
242    pub fullnode_rpc_url: Option<url::Url>,
243}
244
245/// Set-up and run the RPC service, using the provided arguments (expected to be extracted from the
246/// command-line). The service will continue to run until the cancellation token is triggered, and
247/// will signal cancellation on the token when it is shutting down.
248///
249/// Access to most reads is controlled by the `database_url` -- if it is `None`, reads will not work.
250/// The only exceptions are the `DelegationCoins` and `DelegationGovernance` modules, which are controlled
251/// by `node_args.fullnode_rpc_url`, which can be omitted to disable reads from this RPC.
252///
253/// KV queries can optionally be served by a Bigtable instance, if `bigtable_instance` is provided.
254/// Otherwise these requests are served by the database. If a `bigtable_instance` is provided, the
255/// `GOOGLE_APPLICATION_CREDENTIALS` environment variable must point to the credentials JSON file.
256///
257/// Access to writes (executing and dry-running transactions) is controlled by `node_args.fullnode_rpc_url`,
258/// which can be omitted to disable writes from this RPC.
259///
260/// The service may spin up auxiliary services (such as the system package task) to support itself,
261/// and will clean these up on shutdown as well.
262pub async fn start_rpc(
263    database_url: Option<Url>,
264    bigtable_instance: Option<String>,
265    db_args: DbArgs,
266    bigtable_args: BigtableArgs,
267    rpc_args: RpcArgs,
268    node_args: NodeArgs,
269    system_package_task_args: SystemPackageTaskArgs,
270    rpc_config: RpcConfig,
271    registry: &Registry,
272    cancel: CancellationToken,
273) -> anyhow::Result<JoinHandle<()>> {
274    let mut rpc = RpcService::new(rpc_args, registry, cancel.child_token())
275        .context("Failed to create RPC service")?;
276
277    let context = Context::new(
278        database_url,
279        bigtable_instance,
280        db_args,
281        bigtable_args,
282        rpc_config,
283        rpc.metrics(),
284        registry,
285        cancel.child_token(),
286    )
287    .await?;
288
289    let system_package_task = SystemPackageTask::new(
290        system_package_task_args,
291        context.pg_reader().clone(),
292        context.package_resolver().package_store().clone(),
293        cancel.child_token(),
294    );
295
296    rpc.add_module(Checkpoints(context.clone()))?;
297    rpc.add_module(Coins(context.clone()))?;
298    rpc.add_module(DynamicFields(context.clone()))?;
299    rpc.add_module(Governance(context.clone()))?;
300    rpc.add_module(MoveUtils(context.clone()))?;
301    rpc.add_module(NameService(context.clone()))?;
302    rpc.add_module(Objects(context.clone()))?;
303    rpc.add_module(QueryObjects(context.clone()))?;
304    rpc.add_module(QueryTransactions(context.clone()))?;
305    rpc.add_module(Transactions(context.clone()))?;
306
307    if let Some(fullnode_rpc_url) = node_args.fullnode_rpc_url {
308        rpc.add_module(DelegationCoins::new(
309            fullnode_rpc_url.clone(),
310            context.config().node.clone(),
311        )?)?;
312        rpc.add_module(DelegationGovernance::new(
313            fullnode_rpc_url.clone(),
314            context.config().node.clone(),
315        )?)?;
316        rpc.add_module(Write::new(fullnode_rpc_url, context.config().node.clone())?)?;
317    } else {
318        warn!(
319            "No fullnode rpc url provided, DelegationCoins, DelegationGovernance, and Write modules will not be added."
320        );
321    }
322
323    let h_rpc = rpc.run().await.context("Failed to start RPC service")?;
324    let h_system_package_task = system_package_task.run();
325
326    Ok(tokio::spawn(async move {
327        let _ = h_rpc.await;
328        cancel.cancel();
329        let _ = h_system_package_task.await;
330    }))
331}
332
333#[cfg(test)]
334mod tests {
335    use std::{
336        collections::BTreeSet,
337        net::{IpAddr, Ipv4Addr, SocketAddr},
338        time::Duration,
339    };
340
341    use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::error::METHOD_NOT_FOUND_CODE};
342    use reqwest::Client;
343    use serde_json::{Value, json};
344    use sui_open_rpc::Module;
345    use sui_open_rpc_macros::open_rpc;
346    use sui_pg_db::temp::get_available_port;
347
348    use super::*;
349
350    #[tokio::test]
351    async fn test_add_module() {
352        let mut rpc = test_service().await;
353
354        rpc.add_module(Foo).unwrap();
355
356        assert_eq!(
357            BTreeSet::from_iter(rpc.modules.method_names()),
358            BTreeSet::from_iter(["test_bar"]),
359        )
360    }
361
362    #[tokio::test]
363    async fn test_add_module_multiple_methods() {
364        let mut rpc = test_service().await;
365
366        rpc.add_module(Bar).unwrap();
367
368        assert_eq!(
369            BTreeSet::from_iter(rpc.modules.method_names()),
370            BTreeSet::from_iter(["test_bar", "test_baz"]),
371        )
372    }
373
374    #[tokio::test]
375    async fn test_add_multiple_modules() {
376        let mut rpc = test_service().await;
377
378        rpc.add_module(Foo).unwrap();
379        rpc.add_module(Baz).unwrap();
380
381        assert_eq!(
382            BTreeSet::from_iter(rpc.modules.method_names()),
383            BTreeSet::from_iter(["test_bar", "test_baz"]),
384        )
385    }
386
387    #[tokio::test]
388    async fn test_add_module_conflict() {
389        let mut rpc = test_service().await;
390
391        rpc.add_module(Foo).unwrap();
392        assert!(rpc.add_module(Bar).is_err(),)
393    }
394
395    #[tokio::test]
396    async fn test_graceful_shutdown() {
397        let cancel = CancellationToken::new();
398        let rpc = RpcService::new(
399            RpcArgs {
400                rpc_listen_address: test_listen_address(),
401                ..Default::default()
402            },
403            &Registry::new(),
404            cancel.clone(),
405        )
406        .unwrap();
407
408        let handle = rpc.run().await.unwrap();
409
410        cancel.cancel();
411        tokio::time::timeout(Duration::from_millis(500), handle)
412            .await
413            .expect("Shutdown should not timeout")
414            .expect("Shutdown should succeed");
415    }
416
417    #[tokio::test]
418    async fn test_rpc_discovery() {
419        let cancel = CancellationToken::new();
420        let rpc_listen_address = test_listen_address();
421
422        let mut rpc = RpcService::new(
423            RpcArgs {
424                rpc_listen_address,
425                ..Default::default()
426            },
427            &Registry::new(),
428            cancel.clone(),
429        )
430        .unwrap();
431
432        rpc.add_module(Foo).unwrap();
433        rpc.add_module(Baz).unwrap();
434
435        let handle = rpc.run().await.unwrap();
436
437        let url = format!("http://{}/", rpc_listen_address);
438        let client = Client::new();
439
440        let resp: Value = client
441            .post(&url)
442            .json(&json!({
443                "jsonrpc": "2.0",
444                "method": "rpc.discover",
445                "id": 1,
446            }))
447            .send()
448            .await
449            .expect("Request should succeed")
450            .json()
451            .await
452            .expect("Deserialization should succeed");
453
454        assert_eq!(resp["result"]["info"]["title"], "Sui JSON-RPC");
455        assert_eq!(
456            resp["result"]["methods"],
457            json!([
458                {
459                    "name": "test_bar",
460                    "tags": [{
461                        "name": "Test API"
462                    }],
463                    "params": [],
464                    "result": {
465                        "name": "u64",
466                        "required": true,
467                        "schema": {
468                            "type": "integer",
469                            "format": "uint64",
470                            "minimum": 0.0
471                        }
472                    }
473                },
474                {
475                    "name": "test_baz",
476                    "tags": [{
477                        "name": "Test API"
478                    }],
479                    "params": [],
480                    "result": {
481                        "name": "u64",
482                        "required": true,
483                        "schema": {
484                            "type": "integer",
485                            "format": "uint64",
486                            "minimum": 0.0
487                        }
488                    }
489                }
490            ])
491        );
492
493        cancel.cancel();
494        tokio::time::timeout(Duration::from_millis(500), handle)
495            .await
496            .expect("Shutdown should not timeout")
497            .expect("Shutdown should succeed");
498    }
499
500    #[tokio::test]
501    async fn test_request_metrics() {
502        let cancel = CancellationToken::new();
503        let rpc_listen_address = test_listen_address();
504
505        let mut rpc = RpcService::new(
506            RpcArgs {
507                rpc_listen_address,
508                ..Default::default()
509            },
510            &Registry::new(),
511            cancel.clone(),
512        )
513        .unwrap();
514
515        rpc.add_module(Foo).unwrap();
516
517        let metrics = rpc.metrics();
518        let handle = rpc.run().await.unwrap();
519
520        let url = format!("http://{}/", rpc_listen_address);
521        let client = Client::new();
522
523        client
524            .post(&url)
525            .json(&json!({
526                "jsonrpc": "2.0",
527                "method": "test_bar",
528                "id": 1,
529            }))
530            .send()
531            .await
532            .expect("Request should succeed");
533
534        client
535            .post(&url)
536            .json(&json!({
537                "jsonrpc": "2.0",
538                "method": "test_baz",
539                "id": 1,
540            }))
541            .send()
542            .await
543            .expect("Request should succeed");
544
545        assert_eq!(
546            metrics
547                .requests_received
548                .with_label_values(&["test_bar"])
549                .get(),
550            1
551        );
552
553        assert_eq!(
554            metrics
555                .requests_succeeded
556                .with_label_values(&["test_bar"])
557                .get(),
558            1
559        );
560
561        assert_eq!(
562            metrics
563                .requests_received
564                .with_label_values(&["<UNKNOWN>"])
565                .get(),
566            1
567        );
568
569        assert_eq!(
570            metrics
571                .requests_succeeded
572                .with_label_values(&["<UNKNOWN>"])
573                .get(),
574            0
575        );
576
577        assert_eq!(
578            metrics
579                .requests_failed
580                .with_label_values(&["<UNKNOWN>", &format!("{METHOD_NOT_FOUND_CODE}")])
581                .get(),
582            1
583        );
584
585        cancel.cancel();
586        tokio::time::timeout(Duration::from_millis(500), handle)
587            .await
588            .expect("Shutdown should not timeout")
589            .expect("Shutdown should succeed");
590    }
591
592    // Test Helpers
593
594    #[open_rpc(namespace = "test", tag = "Test API")]
595    #[rpc(server, namespace = "test")]
596    trait FooApi {
597        #[method(name = "bar")]
598        fn bar(&self) -> RpcResult<u64>;
599    }
600
601    #[open_rpc(namespace = "test", tag = "Test API")]
602    #[rpc(server, namespace = "test")]
603    trait BarApi {
604        #[method(name = "bar")]
605        fn bar(&self) -> RpcResult<u64>;
606
607        #[method(name = "baz")]
608        fn baz(&self) -> RpcResult<u64>;
609    }
610
611    #[open_rpc(namespace = "test", tag = "Test API")]
612    #[rpc(server, namespace = "test")]
613    trait BazApi {
614        #[method(name = "baz")]
615        fn baz(&self) -> RpcResult<u64>;
616    }
617
618    struct Foo;
619    struct Bar;
620    struct Baz;
621
622    impl FooApiServer for Foo {
623        fn bar(&self) -> RpcResult<u64> {
624            Ok(42)
625        }
626    }
627
628    impl BarApiServer for Bar {
629        fn bar(&self) -> RpcResult<u64> {
630            Ok(43)
631        }
632
633        fn baz(&self) -> RpcResult<u64> {
634            Ok(44)
635        }
636    }
637
638    impl BazApiServer for Baz {
639        fn baz(&self) -> RpcResult<u64> {
640            Ok(45)
641        }
642    }
643
644    impl RpcModule for Foo {
645        fn schema(&self) -> Module {
646            FooApiOpenRpc::module_doc()
647        }
648
649        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
650            self.into_rpc()
651        }
652    }
653
654    impl RpcModule for Bar {
655        fn schema(&self) -> Module {
656            BarApiOpenRpc::module_doc()
657        }
658
659        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
660            self.into_rpc()
661        }
662    }
663
664    impl RpcModule for Baz {
665        fn schema(&self) -> Module {
666            BazApiOpenRpc::module_doc()
667        }
668
669        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
670            self.into_rpc()
671        }
672    }
673
674    fn test_listen_address() -> SocketAddr {
675        let port = get_available_port();
676        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
677    }
678
679    async fn test_service() -> RpcService {
680        let cancel = CancellationToken::new();
681        RpcService::new(
682            RpcArgs {
683                rpc_listen_address: test_listen_address(),
684                ..Default::default()
685            },
686            &Registry::new(),
687            cancel,
688        )
689        .expect("Failed to create test JSON-RPC service")
690    }
691}