sui_indexer_alt_jsonrpc/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context as _;
9use api::checkpoints::Checkpoints;
10use api::coin::{Coins, DelegationCoins};
11use api::dynamic_fields::DynamicFields;
12use api::move_utils::MoveUtils;
13use api::name_service::NameService;
14use api::objects::{Objects, QueryObjects};
15use api::rpc_module::RpcModule;
16use api::transactions::{QueryTransactions, Transactions};
17use api::write::Write;
18use config::RpcConfig;
19use jsonrpsee::server::{BatchRequestConfig, RpcServiceBuilder, ServerBuilder};
20use metrics::RpcMetrics;
21use metrics::middleware::MetricsLayer;
22use prometheus::Registry;
23use serde_json::json;
24use sui_futures::service::Service;
25use sui_indexer_alt_reader::bigtable_reader::BigtableArgs;
26use sui_indexer_alt_reader::pg_reader::db::DbArgs;
27use sui_indexer_alt_reader::system_package_task::{SystemPackageTask, SystemPackageTaskArgs};
28use sui_open_rpc::Project;
29use timeout::TimeoutLayer;
30use tower_layer::Identity;
31use tracing::{info, warn};
32use url::Url;
33
34use crate::api::governance::{DelegationGovernance, Governance};
35use crate::context::Context;
36
37pub mod api;
38pub mod args;
39pub mod config;
40mod context;
41pub mod data;
42mod error;
43mod metrics;
44mod paginate;
45mod timeout;
46
47#[derive(clap::Args, Debug, Clone)]
48pub struct RpcArgs {
49    /// Address to listen to for incoming JSON-RPC connections.
50    #[clap(long, default_value_t = Self::default().rpc_listen_address)]
51    pub rpc_listen_address: SocketAddr,
52
53    /// The maximum number of concurrent requests to accept. If the service receives more than this
54    /// many requests, it will start responding with 429.
55    #[clap(long, default_value_t = Self::default().max_in_flight_requests)]
56    pub max_in_flight_requests: u32,
57
58    /// Requests that take longer than this (in milliseconds) to respond to will be terminated, and
59    /// the query itself will be logged as a warning.
60    #[clap(long, default_value_t = Self::default().request_timeout_ms)]
61    pub request_timeout_ms: u64,
62
63    /// Requests that take longer than this (in milliseconds) will be logged even if they succeed.
64    /// This should be shorter than `request_timeout_ms`.
65    #[clap(long, default_value_t = Self::default().slow_request_threshold_ms)]
66    pub slow_request_threshold_ms: u64,
67}
68
69pub struct RpcService {
70    /// The address that the server will start listening for requests on, when it is run.
71    rpc_listen_address: SocketAddr,
72
73    /// A partially built/configured JSON-RPC server.
74    server: ServerBuilder<Identity, Identity>,
75
76    /// Metrics for the RPC service.
77    metrics: Arc<RpcMetrics>,
78
79    /// Maximum time a request can take to complete.
80    request_timeout: Duration,
81
82    /// Threshold for logging slow requests.
83    slow_request_threshold: Duration,
84
85    /// All the methods added to the server so far.
86    modules: jsonrpsee::RpcModule<()>,
87
88    /// Description of the schema served by this service.
89    schema: Project,
90}
91
92impl RpcArgs {
93    /// Requests that take longer than this are terminated and logged for debugging.
94    fn request_timeout(&self) -> Duration {
95        Duration::from_millis(self.request_timeout_ms)
96    }
97
98    /// Requests that take longer than this are logged for debugging even if they succeed.
99    /// This threshold should be lower than the request timeout threshold.
100    fn slow_request_threshold(&self) -> Duration {
101        Duration::from_millis(self.slow_request_threshold_ms)
102    }
103}
104
105impl RpcService {
106    /// Create a new instance of the JSON-RPC service, configured by `rpc_args`. The service will
107    /// not accept connections until [Self::run] is called.
108    pub fn new(rpc_args: RpcArgs, registry: &Registry) -> anyhow::Result<Self> {
109        let metrics = RpcMetrics::new(registry);
110
111        let server = ServerBuilder::new()
112            .http_only()
113            // `jsonrpsee` calls this a limit on connections, but it is implemented as a limit on
114            // requests.
115            .max_connections(rpc_args.max_in_flight_requests)
116            .max_response_body_size(u32::MAX)
117            .set_batch_request_config(BatchRequestConfig::Disabled);
118
119        let schema = Project::new(
120            env!("CARGO_PKG_VERSION"),
121            "Sui JSON-RPC",
122            "A JSON-RPC API for interacting with the Sui blockchain.",
123            "Mysten Labs",
124            "https://mystenlabs.com",
125            "build@mystenlabs.com",
126            "Apache-2.0",
127            "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
128        );
129
130        Ok(Self {
131            rpc_listen_address: rpc_args.rpc_listen_address,
132            server,
133            metrics,
134            request_timeout: rpc_args.request_timeout(),
135            slow_request_threshold: rpc_args.slow_request_threshold(),
136            modules: jsonrpsee::RpcModule::new(()),
137            schema,
138        })
139    }
140
141    /// Return a copy of the metrics.
142    pub fn metrics(&self) -> Arc<RpcMetrics> {
143        self.metrics.clone()
144    }
145
146    /// Add an `RpcModule` to the service. The module's methods are combined with the existing
147    /// methods registered on the service, and the operation will fail if there is any overlap.
148    pub fn add_module(&mut self, module: impl RpcModule) -> anyhow::Result<()> {
149        self.schema.add_module(module.schema());
150        self.modules
151            .merge(module.into_impl().remove_context())
152            .context("Failed to add module because of a name conflict")
153    }
154
155    /// Start the service (it will accept connections) and return a handle that tracks the
156    /// lifecycle of the service.
157    pub async fn run(self) -> anyhow::Result<Service> {
158        let Self {
159            rpc_listen_address,
160            server,
161            metrics,
162            request_timeout,
163            slow_request_threshold,
164            mut modules,
165            schema,
166        } = self;
167
168        info!("Starting JSON-RPC service on {rpc_listen_address}",);
169        info!("Serving schema: {}", serde_json::to_string_pretty(&schema)?);
170
171        // Add a method to serve the schema to clients.
172        modules
173            .register_method("rpc.discover", move |_, _, _| json!(schema.clone()))
174            .context("Failed to add schema discovery method")?;
175
176        let middleware = RpcServiceBuilder::new()
177            .layer(TimeoutLayer::new(request_timeout))
178            .layer(MetricsLayer::new(
179                metrics,
180                modules.method_names().map(|n| n.to_owned()).collect(),
181                slow_request_threshold,
182            ));
183
184        let handle = server
185            .set_rpc_middleware(middleware)
186            .set_http_middleware(
187                tower::builder::ServiceBuilder::new().layer(
188                    tower_http::cors::CorsLayer::new()
189                        .allow_methods([http::Method::GET, http::Method::POST])
190                        .allow_origin(tower_http::cors::Any)
191                        .allow_headers(tower_http::cors::Any),
192                ),
193            )
194            .build(rpc_listen_address)
195            .await
196            .context("Failed to bind JSON-RPC service")?
197            .start(modules);
198
199        let signal = handle.clone();
200        Ok(Service::new()
201            .with_shutdown_signal(async move {
202                let _ = signal.stop();
203            })
204            .spawn(async move {
205                handle.stopped().await;
206                Ok(())
207            }))
208    }
209}
210
211impl Default for RpcArgs {
212    fn default() -> Self {
213        Self {
214            rpc_listen_address: "0.0.0.0:6000".parse().unwrap(),
215            max_in_flight_requests: 2000,
216            request_timeout_ms: 60_000,
217            slow_request_threshold_ms: 15_000,
218        }
219    }
220}
221
222#[derive(clap::Args, Debug, Clone, Default)]
223pub struct NodeArgs {
224    /// The URL of the fullnode RPC we connect to for transaction execution,
225    /// dry-running, and delegation coin queries etc.
226    #[arg(long)]
227    pub fullnode_rpc_url: Option<url::Url>,
228}
229
230/// Set-up and run the RPC service, using the provided arguments (expected to be extracted from the
231/// command-line).
232///
233/// Access to most reads is controlled by the `database_url` -- if it is `None`, reads will not work.
234/// The only exceptions are the `DelegationCoins` and `DelegationGovernance` modules, which are controlled
235/// by `node_args.fullnode_rpc_url`, which can be omitted to disable reads from this RPC.
236///
237/// KV queries can optionally be served by a Bigtable instance, if `bigtable_instance` is provided.
238/// Otherwise these requests are served by the database. If a `bigtable_instance` is provided, the
239/// `GOOGLE_APPLICATION_CREDENTIALS` environment variable must point to the credentials JSON file.
240///
241/// Access to writes (executing and dry-running transactions) is controlled by `node_args.fullnode_rpc_url`,
242/// which can be omitted to disable writes from this RPC.
243///
244/// The service may spin up auxiliary services (such as the system package task) to support itself,
245/// and will clean these up on shutdown as well.
246pub async fn start_rpc(
247    database_url: Option<Url>,
248    bigtable_instance: Option<String>,
249    db_args: DbArgs,
250    bigtable_args: BigtableArgs,
251    rpc_args: RpcArgs,
252    node_args: NodeArgs,
253    system_package_task_args: SystemPackageTaskArgs,
254    rpc_config: RpcConfig,
255    registry: &Registry,
256) -> anyhow::Result<Service> {
257    let mut rpc = RpcService::new(rpc_args, registry).context("Failed to create RPC service")?;
258
259    let context = Context::new(
260        database_url,
261        bigtable_instance,
262        db_args,
263        bigtable_args,
264        rpc_config,
265        rpc.metrics(),
266        registry,
267    )
268    .await?;
269
270    let system_package_task = SystemPackageTask::new(
271        system_package_task_args,
272        context.pg_reader().clone(),
273        context.package_resolver().package_store().clone(),
274    );
275
276    rpc.add_module(Checkpoints(context.clone()))?;
277    rpc.add_module(Coins(context.clone()))?;
278    rpc.add_module(DynamicFields(context.clone()))?;
279    rpc.add_module(Governance(context.clone()))?;
280    rpc.add_module(MoveUtils(context.clone()))?;
281    rpc.add_module(NameService(context.clone()))?;
282    rpc.add_module(Objects(context.clone()))?;
283    rpc.add_module(QueryObjects(context.clone()))?;
284    rpc.add_module(QueryTransactions(context.clone()))?;
285    rpc.add_module(Transactions(context.clone()))?;
286
287    if let Some(fullnode_rpc_url) = node_args.fullnode_rpc_url {
288        rpc.add_module(DelegationCoins::new(
289            fullnode_rpc_url.clone(),
290            context.config().node.clone(),
291        )?)?;
292        rpc.add_module(DelegationGovernance::new(
293            fullnode_rpc_url.clone(),
294            context.config().node.clone(),
295        )?)?;
296        rpc.add_module(Write::new(fullnode_rpc_url, context.config().node.clone())?)?;
297    } else {
298        warn!(
299            "No fullnode rpc url provided, DelegationCoins, DelegationGovernance, and Write modules will not be added."
300        );
301    }
302
303    let s_rpc = rpc.run().await.context("Failed to start RPC service")?;
304    let s_system_package_task = system_package_task.run();
305
306    Ok(s_rpc.attach(s_system_package_task))
307}
308
309#[cfg(test)]
310mod tests {
311    use std::{
312        collections::BTreeSet,
313        net::{IpAddr, Ipv4Addr, SocketAddr},
314        time::Duration,
315    };
316
317    use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::error::METHOD_NOT_FOUND_CODE};
318    use reqwest::Client;
319    use serde_json::{Value, json};
320    use sui_open_rpc::Module;
321    use sui_open_rpc_macros::open_rpc;
322    use sui_pg_db::temp::get_available_port;
323
324    use super::*;
325
326    #[tokio::test]
327    async fn test_add_module() {
328        let mut rpc = test_service().await;
329
330        rpc.add_module(Foo).unwrap();
331
332        assert_eq!(
333            BTreeSet::from_iter(rpc.modules.method_names()),
334            BTreeSet::from_iter(["test_bar"]),
335        )
336    }
337
338    #[tokio::test]
339    async fn test_add_module_multiple_methods() {
340        let mut rpc = test_service().await;
341
342        rpc.add_module(Bar).unwrap();
343
344        assert_eq!(
345            BTreeSet::from_iter(rpc.modules.method_names()),
346            BTreeSet::from_iter(["test_bar", "test_baz"]),
347        )
348    }
349
350    #[tokio::test]
351    async fn test_add_multiple_modules() {
352        let mut rpc = test_service().await;
353
354        rpc.add_module(Foo).unwrap();
355        rpc.add_module(Baz).unwrap();
356
357        assert_eq!(
358            BTreeSet::from_iter(rpc.modules.method_names()),
359            BTreeSet::from_iter(["test_bar", "test_baz"]),
360        )
361    }
362
363    #[tokio::test]
364    async fn test_add_module_conflict() {
365        let mut rpc = test_service().await;
366
367        rpc.add_module(Foo).unwrap();
368        assert!(rpc.add_module(Bar).is_err(),)
369    }
370
371    #[tokio::test]
372    async fn test_graceful_shutdown() {
373        let rpc = test_service().await;
374        let svc = rpc.run().await.unwrap();
375
376        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
377            .await
378            .expect("Shutdown should not timeout")
379            .expect("Shutdown should succeed");
380    }
381
382    #[tokio::test]
383    async fn test_rpc_discovery() {
384        let rpc_listen_address = test_listen_address();
385        let mut rpc = RpcService::new(
386            RpcArgs {
387                rpc_listen_address,
388                ..Default::default()
389            },
390            &Registry::new(),
391        )
392        .unwrap();
393
394        rpc.add_module(Foo).unwrap();
395        rpc.add_module(Baz).unwrap();
396
397        let svc = rpc.run().await.unwrap();
398
399        let url = format!("http://{rpc_listen_address}/");
400        let client = Client::new();
401
402        let resp: Value = client
403            .post(&url)
404            .json(&json!({
405                "jsonrpc": "2.0",
406                "method": "rpc.discover",
407                "id": 1,
408            }))
409            .send()
410            .await
411            .expect("Request should succeed")
412            .json()
413            .await
414            .expect("Deserialization should succeed");
415
416        assert_eq!(resp["result"]["info"]["title"], "Sui JSON-RPC");
417        assert_eq!(
418            resp["result"]["methods"],
419            json!([
420                {
421                    "name": "test_bar",
422                    "tags": [{
423                        "name": "Test API"
424                    }],
425                    "params": [],
426                    "result": {
427                        "name": "u64",
428                        "required": true,
429                        "schema": {
430                            "type": "integer",
431                            "format": "uint64",
432                            "minimum": 0.0
433                        }
434                    }
435                },
436                {
437                    "name": "test_baz",
438                    "tags": [{
439                        "name": "Test API"
440                    }],
441                    "params": [],
442                    "result": {
443                        "name": "u64",
444                        "required": true,
445                        "schema": {
446                            "type": "integer",
447                            "format": "uint64",
448                            "minimum": 0.0
449                        }
450                    }
451                }
452            ])
453        );
454
455        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
456            .await
457            .expect("Shutdown should not timeout")
458            .expect("Shutdown should succeed");
459    }
460
461    #[tokio::test]
462    async fn test_request_metrics() {
463        let rpc_listen_address = test_listen_address();
464        let mut rpc = RpcService::new(
465            RpcArgs {
466                rpc_listen_address,
467                ..Default::default()
468            },
469            &Registry::new(),
470        )
471        .unwrap();
472
473        rpc.add_module(Foo).unwrap();
474
475        let metrics = rpc.metrics();
476        let svc = rpc.run().await.unwrap();
477
478        let url = format!("http://{rpc_listen_address}/");
479        let client = Client::new();
480
481        client
482            .post(&url)
483            .json(&json!({
484                "jsonrpc": "2.0",
485                "method": "test_bar",
486                "id": 1,
487            }))
488            .send()
489            .await
490            .expect("Request should succeed");
491
492        client
493            .post(&url)
494            .json(&json!({
495                "jsonrpc": "2.0",
496                "method": "test_baz",
497                "id": 1,
498            }))
499            .send()
500            .await
501            .expect("Request should succeed");
502
503        assert_eq!(
504            metrics
505                .requests_received
506                .with_label_values(&["test_bar"])
507                .get(),
508            1
509        );
510
511        assert_eq!(
512            metrics
513                .requests_succeeded
514                .with_label_values(&["test_bar"])
515                .get(),
516            1
517        );
518
519        assert_eq!(
520            metrics
521                .requests_received
522                .with_label_values(&["<UNKNOWN>"])
523                .get(),
524            1
525        );
526
527        assert_eq!(
528            metrics
529                .requests_succeeded
530                .with_label_values(&["<UNKNOWN>"])
531                .get(),
532            0
533        );
534
535        assert_eq!(
536            metrics
537                .requests_failed
538                .with_label_values(&["<UNKNOWN>", &format!("{METHOD_NOT_FOUND_CODE}")])
539                .get(),
540            1
541        );
542
543        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
544            .await
545            .expect("Shutdown should not timeout")
546            .expect("Shutdown should succeed");
547    }
548
549    // Test Helpers
550
551    #[open_rpc(namespace = "test", tag = "Test API")]
552    #[rpc(server, namespace = "test")]
553    trait FooApi {
554        #[method(name = "bar")]
555        fn bar(&self) -> RpcResult<u64>;
556    }
557
558    #[open_rpc(namespace = "test", tag = "Test API")]
559    #[rpc(server, namespace = "test")]
560    trait BarApi {
561        #[method(name = "bar")]
562        fn bar(&self) -> RpcResult<u64>;
563
564        #[method(name = "baz")]
565        fn baz(&self) -> RpcResult<u64>;
566    }
567
568    #[open_rpc(namespace = "test", tag = "Test API")]
569    #[rpc(server, namespace = "test")]
570    trait BazApi {
571        #[method(name = "baz")]
572        fn baz(&self) -> RpcResult<u64>;
573    }
574
575    struct Foo;
576    struct Bar;
577    struct Baz;
578
579    impl FooApiServer for Foo {
580        fn bar(&self) -> RpcResult<u64> {
581            Ok(42)
582        }
583    }
584
585    impl BarApiServer for Bar {
586        fn bar(&self) -> RpcResult<u64> {
587            Ok(43)
588        }
589
590        fn baz(&self) -> RpcResult<u64> {
591            Ok(44)
592        }
593    }
594
595    impl BazApiServer for Baz {
596        fn baz(&self) -> RpcResult<u64> {
597            Ok(45)
598        }
599    }
600
601    impl RpcModule for Foo {
602        fn schema(&self) -> Module {
603            FooApiOpenRpc::module_doc()
604        }
605
606        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
607            self.into_rpc()
608        }
609    }
610
611    impl RpcModule for Bar {
612        fn schema(&self) -> Module {
613            BarApiOpenRpc::module_doc()
614        }
615
616        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
617            self.into_rpc()
618        }
619    }
620
621    impl RpcModule for Baz {
622        fn schema(&self) -> Module {
623            BazApiOpenRpc::module_doc()
624        }
625
626        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
627            self.into_rpc()
628        }
629    }
630
631    fn test_listen_address() -> SocketAddr {
632        let port = get_available_port();
633        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
634    }
635
636    async fn test_service() -> RpcService {
637        RpcService::new(
638            RpcArgs {
639                rpc_listen_address: test_listen_address(),
640                ..Default::default()
641            },
642            &Registry::new(),
643        )
644        .expect("Failed to create test JSON-RPC service")
645    }
646}