sui_graphql_rpc/server/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::exchange_rates_task::TriggerExchangeRatesTask;
5use super::system_package_task::SystemPackageTask;
6use super::watermark_task::{ChainIdentifierLock, Watermark, WatermarkLock, WatermarkTask};
7use crate::config::{
8    ConnectionConfig, ServiceConfig, Version, MAX_CONCURRENT_REQUESTS,
9    RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
10};
11use crate::data::move_registry_data_loader::MoveRegistryDataLoader;
12use crate::data::package_resolver::{DbPackageStore, PackageResolver};
13use crate::data::{DataLoader, Db};
14use crate::extensions::directive_checker::DirectiveChecker;
15use crate::metrics::Metrics;
16use crate::mutation::Mutation;
17use crate::types::datatype::IMoveDatatype;
18use crate::types::move_object::IMoveObject;
19use crate::types::object::IObject;
20use crate::types::owner::IOwner;
21use crate::{
22    config::ServerConfig,
23    context_data::db_data_provider::PgManager,
24    error::Error,
25    extensions::{
26        feature_gate::FeatureGate,
27        logger::Logger,
28        query_limits_checker::{PayloadSize, QueryLimitsChecker, ShowUsage},
29        timeout::Timeout,
30    },
31    server::version::set_version_middleware,
32    types::query::{Query, SuiGraphQLSchema},
33};
34use async_graphql::extensions::ApolloTracing;
35use async_graphql::extensions::Tracing;
36use async_graphql::EmptySubscription;
37use async_graphql::{extensions::ExtensionFactory, Schema, SchemaBuilder};
38use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
39use axum::body::Body;
40use axum::extract::FromRef;
41use axum::extract::{ConnectInfo, Query as AxumQuery, State};
42use axum::http::{HeaderMap, StatusCode};
43use axum::middleware::{self};
44use axum::response::IntoResponse;
45use axum::routing::{get, post, MethodRouter, Route};
46use axum::Extension;
47use axum::Router;
48use axum_extra::headers::ContentLength;
49use axum_extra::TypedHeader;
50use chrono::Utc;
51use http::{HeaderValue, Method, Request};
52use mysten_metrics::spawn_monitored_task;
53use mysten_network::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler};
54use std::convert::Infallible;
55use std::net::TcpStream;
56use std::sync::Arc;
57use std::time::Duration;
58use std::{any::Any, net::SocketAddr, time::Instant};
59use sui_graphql_rpc_headers::LIMITS_HEADER;
60use sui_indexer::db::check_db_migration_consistency;
61use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
62use sui_sdk::SuiClientBuilder;
63use tokio::join;
64use tokio::sync::OnceCell;
65use tokio_util::sync::CancellationToken;
66use tower::{Layer, Service};
67use tower_http::cors::{AllowOrigin, CorsLayer};
68use tracing::{info, warn};
69use uuid::Uuid;
70
71/// The default allowed maximum lag between the current timestamp and the checkpoint timestamp.
72const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);
73
74pub(crate) struct Server {
75    // pub server: HyperServer<HyperAddrIncoming, IntoMakeServiceWithConnectInfo<Router, SocketAddr>>,
76    router: Router,
77    address: String,
78    watermark_task: WatermarkTask,
79    system_package_task: SystemPackageTask,
80    trigger_exchange_rates_task: TriggerExchangeRatesTask,
81    state: AppState,
82}
83
84impl Server {
85    /// Start the GraphQL service and any background tasks it is dependent on. When a cancellation
86    /// signal is received, the method waits for all tasks to complete before returning.
87    pub async fn run(mut self) -> Result<(), Error> {
88        get_or_init_server_start_time().await;
89
90        // A handle that spawns a background task to periodically update the `Watermark`, which
91        // consists of the checkpoint upper bound and current epoch.
92        let watermark_task = {
93            info!("Starting watermark update task");
94            spawn_monitored_task!(async move {
95                self.watermark_task.run().await;
96            })
97        };
98
99        // A handle that spawns a background task to evict system packages on epoch changes.
100        let system_package_task = {
101            info!("Starting system package task");
102            spawn_monitored_task!(async move {
103                self.system_package_task.run().await;
104            })
105        };
106
107        let trigger_exchange_rates_task = {
108            info!("Starting trigger exchange rates task");
109            spawn_monitored_task!(async move {
110                self.trigger_exchange_rates_task.run().await;
111            })
112        };
113
114        let server_task = {
115            info!("Starting graphql service");
116            let cancellation_token = self.state.cancellation_token.clone();
117            spawn_monitored_task!(async move {
118                let listener = tokio::net::TcpListener::bind(&self.address).await.unwrap();
119                axum::serve(
120                    listener,
121                    self.router
122                        .into_make_service_with_connect_info::<SocketAddr>(),
123                )
124                .with_graceful_shutdown(async move {
125                    cancellation_token.cancelled().await;
126                    info!("Shutdown signal received, terminating graphql service");
127                })
128                .await
129                .map_err(|e| Error::Internal(format!("Server run failed: {}", e)))
130            })
131        };
132
133        // Wait for all tasks to complete. This ensures that the service doesn't fully shut down
134        // until all tasks and the server have completed their shutdown processes.
135        let _ = join!(
136            watermark_task,
137            system_package_task,
138            trigger_exchange_rates_task,
139            server_task
140        );
141
142        Ok(())
143    }
144}
145
146pub(crate) struct ServerBuilder {
147    state: AppState,
148    schema: SchemaBuilder<Query, Mutation, EmptySubscription>,
149    router: Option<Router>,
150    db_reader: Option<Db>,
151    resolver: Option<PackageResolver>,
152}
153
154#[derive(Clone)]
155pub(crate) struct AppState {
156    connection: ConnectionConfig,
157    service: ServiceConfig,
158    metrics: Metrics,
159    cancellation_token: CancellationToken,
160    pub version: Version,
161}
162
163impl AppState {
164    pub(crate) fn new(
165        connection: ConnectionConfig,
166        service: ServiceConfig,
167        metrics: Metrics,
168        cancellation_token: CancellationToken,
169        version: Version,
170    ) -> Self {
171        Self {
172            connection,
173            service,
174            metrics,
175            cancellation_token,
176            version,
177        }
178    }
179}
180
181impl FromRef<AppState> for ConnectionConfig {
182    fn from_ref(app_state: &AppState) -> ConnectionConfig {
183        app_state.connection.clone()
184    }
185}
186
187impl FromRef<AppState> for Metrics {
188    fn from_ref(app_state: &AppState) -> Metrics {
189        app_state.metrics.clone()
190    }
191}
192
193impl ServerBuilder {
194    pub fn new(state: AppState) -> Self {
195        Self {
196            state,
197            schema: schema_builder(),
198            router: None,
199            db_reader: None,
200            resolver: None,
201        }
202    }
203
204    pub fn address(&self) -> String {
205        format!(
206            "{}:{}",
207            self.state.connection.host, self.state.connection.port
208        )
209    }
210
211    pub fn context_data(mut self, context_data: impl Any + Send + Sync) -> Self {
212        self.schema = self.schema.data(context_data);
213        self
214    }
215
216    pub fn extension(mut self, extension: impl ExtensionFactory) -> Self {
217        self.schema = self.schema.extension(extension);
218        self
219    }
220
221    #[cfg(test)]
222    fn build_schema(self) -> Schema<Query, Mutation, EmptySubscription> {
223        self.schema.finish()
224    }
225
226    /// Prepares the components of the server to be run. Finalizes the graphql schema, and expects
227    /// the `Db` and `Router` to have been initialized.
228    fn build_components(
229        self,
230    ) -> (
231        String,
232        Schema<Query, Mutation, EmptySubscription>,
233        Db,
234        PackageResolver,
235        Router,
236    ) {
237        let address = self.address();
238        let ServerBuilder {
239            state: _,
240            schema,
241            db_reader,
242            resolver,
243            router,
244        } = self;
245        (
246            address,
247            schema.finish(),
248            db_reader.expect("DB reader not initialized"),
249            resolver.expect("Package resolver not initialized"),
250            router.expect("Router not initialized"),
251        )
252    }
253
254    fn init_router(&mut self) {
255        if self.router.is_none() {
256            let router: Router = Router::new()
257                .route("/", post(graphql_handler))
258                .route("/graphql", post(graphql_handler))
259                .route("/health", get(health_check))
260                .route("/graphql/health", get(health_check))
261                .with_state(self.state.clone())
262                .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
263                    metrics: self.state.metrics.clone(),
264                }));
265            self.router = Some(router);
266        }
267    }
268
269    pub fn route(mut self, path: &str, method_handler: MethodRouter) -> Self {
270        self.init_router();
271        self.router = self.router.map(|router| router.route(path, method_handler));
272        self
273    }
274
275    pub fn layer<L>(mut self, layer: L) -> Self
276    where
277        L: Layer<Route> + Clone + Send + 'static,
278        L::Service: Service<Request<Body>> + Clone + Send + 'static,
279        <L::Service as Service<Request<Body>>>::Response: IntoResponse + 'static,
280        <L::Service as Service<Request<Body>>>::Error: Into<Infallible> + 'static,
281        <L::Service as Service<Request<Body>>>::Future: Send + 'static,
282    {
283        self.init_router();
284        self.router = self.router.map(|router| router.layer(layer));
285        self
286    }
287
288    fn cors() -> Result<CorsLayer, Error> {
289        let acl = match std::env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
290            Ok(value) => {
291                let allow_hosts = value
292                    .split(',')
293                    .map(HeaderValue::from_str)
294                    .collect::<Result<Vec<_>, _>>()
295                    .map_err(|_| {
296                        Error::Internal(
297                            "Cannot resolve access control origin env variable".to_string(),
298                        )
299                    })?;
300                AllowOrigin::list(allow_hosts)
301            }
302            _ => AllowOrigin::any(),
303        };
304        info!("Access control allow origin set to: {acl:?}");
305
306        let cors = CorsLayer::new()
307            // Allow `POST` when accessing the resource
308            .allow_methods([Method::POST])
309            // Allow requests from any origin
310            .allow_origin(acl)
311            .allow_headers([hyper::header::CONTENT_TYPE, LIMITS_HEADER.clone()]);
312        Ok(cors)
313    }
314
315    /// Consumes the `ServerBuilder` to create a `Server` that can be run.
316    pub fn build(self) -> Result<Server, Error> {
317        let state = self.state.clone();
318        let (address, schema, db_reader, resolver, router) = self.build_components();
319
320        // Initialize the watermark background task struct.
321        let watermark_task = WatermarkTask::new(
322            db_reader.clone(),
323            state.metrics.clone(),
324            std::time::Duration::from_millis(state.service.background_tasks.watermark_update_ms),
325            state.cancellation_token.clone(),
326        );
327
328        let system_package_task = SystemPackageTask::new(
329            resolver,
330            watermark_task.epoch_receiver(),
331            state.cancellation_token.clone(),
332        );
333
334        let trigger_exchange_rates_task = TriggerExchangeRatesTask::new(
335            db_reader,
336            watermark_task.epoch_receiver(),
337            state.cancellation_token.clone(),
338        );
339
340        let router = router
341            .route_layer(middleware::from_fn_with_state(
342                state.version,
343                set_version_middleware,
344            ))
345            .layer(axum::extract::Extension(schema))
346            .layer(axum::extract::Extension(watermark_task.lock()))
347            .layer(axum::extract::Extension(watermark_task.chain_id_lock()))
348            .layer(Self::cors()?);
349
350        Ok(Server {
351            router,
352            address,
353            watermark_task,
354            system_package_task,
355            trigger_exchange_rates_task,
356            state,
357        })
358    }
359
360    /// Instantiate a `ServerBuilder` from a `ServerConfig`, typically called when building the
361    /// graphql service for production usage.
362    pub async fn from_config(
363        config: &ServerConfig,
364        version: &Version,
365        cancellation_token: CancellationToken,
366    ) -> Result<Self, Error> {
367        // PROMETHEUS
368        let prom_addr: SocketAddr = format!(
369            "{}:{}",
370            config.connection.prom_host, config.connection.prom_port
371        )
372        .parse()
373        .map_err(|_| {
374            Error::Internal(format!(
375                "Failed to parse url {}, port {} into socket address",
376                config.connection.prom_host, config.connection.prom_port
377            ))
378        })?;
379
380        let registry_service = mysten_metrics::start_prometheus_server(prom_addr);
381        info!("Starting Prometheus HTTP endpoint at {}", prom_addr);
382        let registry = registry_service.default_registry();
383        registry
384            .register(mysten_metrics::uptime_metric(
385                "graphql",
386                version.full,
387                "unknown",
388            ))
389            .unwrap();
390
391        // METRICS
392        let metrics = Metrics::new(&registry);
393        let state = AppState::new(
394            config.connection.clone(),
395            config.service.clone(),
396            metrics.clone(),
397            cancellation_token,
398            *version,
399        );
400        let mut builder = ServerBuilder::new(state);
401
402        let name_service_config = config.service.name_service.clone();
403        let move_registry_config = config.service.move_registry.clone();
404        let zklogin_config = config.service.zklogin.clone();
405        let reader = PgManager::reader_with_config(
406            config.connection.db_url.clone(),
407            config.connection.db_pool_size,
408            // Bound each statement in a request with the overall request timeout, to bound DB
409            // utilisation (in the worst case we will use 2x the request timeout time in DB wall
410            // time).
411            config.service.limits.request_timeout_ms.into(),
412        )
413        .await
414        .map_err(|e| Error::Internal(format!("Failed to create pg connection pool: {}", e)))?;
415
416        if !config.connection.skip_migration_consistency_check {
417            check_db_migration_consistency(
418                &mut reader
419                    .pool()
420                    .get()
421                    .await
422                    .map_err(|e| Error::Internal(e.to_string()))?,
423            )
424            .await?;
425        }
426
427        // DB
428        let db = Db::new(
429            reader.clone(),
430            config.service.limits.clone(),
431            metrics.clone(),
432        );
433        let loader = DataLoader::new(db.clone());
434        let pg_conn_pool = PgManager::new(reader.clone());
435        let package_store = DbPackageStore::new(loader.clone());
436        let resolver = Arc::new(Resolver::new_with_limits(
437            PackageStoreWithLruCache::new(package_store),
438            config.service.limits.package_resolver_limits(),
439        ));
440
441        builder.db_reader = Some(db.clone());
442        builder.resolver = Some(resolver.clone());
443
444        // SDK for talking to fullnode. Used for executing transactions only
445        // TODO: fail fast if no url, once we enable mutations fully
446        let sui_sdk_client = if let Some(url) = &config.tx_exec_full_node.node_rpc_url {
447            Some(
448                SuiClientBuilder::default()
449                    .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
450                    .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
451                    .build(url)
452                    .await
453                    .map_err(|e| Error::Internal(format!("Failed to create SuiClient: {}", e)))?,
454            )
455        } else {
456            warn!("No fullnode url found in config. `dryRunTransactionBlock` and `executeTransactionBlock` will not work");
457            None
458        };
459
460        builder = builder
461            .context_data(config.service.clone())
462            .context_data(loader)
463            .context_data(db)
464            .context_data(pg_conn_pool)
465            .context_data(resolver)
466            .context_data(sui_sdk_client)
467            .context_data(name_service_config)
468            .context_data(zklogin_config)
469            .context_data(metrics.clone())
470            .context_data(config.clone())
471            .context_data(move_registry_config.clone())
472            .context_data(MoveRegistryDataLoader::new(
473                move_registry_config,
474                metrics.clone(),
475            ));
476
477        if config.internal_features.feature_gate {
478            builder = builder.extension(FeatureGate);
479        }
480
481        if config.internal_features.logger {
482            builder = builder.extension(Logger::default());
483        }
484
485        if config.internal_features.query_limits_checker {
486            builder = builder.extension(QueryLimitsChecker);
487        }
488
489        if config.internal_features.directive_checker {
490            builder = builder.extension(DirectiveChecker);
491        }
492
493        if config.internal_features.query_timeout {
494            builder = builder.extension(Timeout);
495        }
496
497        if config.internal_features.tracing {
498            builder = builder.extension(Tracing);
499        }
500
501        if config.internal_features.apollo_tracing {
502            builder = builder.extension(ApolloTracing);
503        }
504
505        // TODO: uncomment once impl
506        // if config.internal_features.open_telemetry { }
507
508        Ok(builder)
509    }
510}
511
512fn schema_builder() -> SchemaBuilder<Query, Mutation, EmptySubscription> {
513    async_graphql::Schema::build(Query, Mutation, EmptySubscription)
514        .register_output_type::<IMoveObject>()
515        .register_output_type::<IObject>()
516        .register_output_type::<IOwner>()
517        .register_output_type::<IMoveDatatype>()
518}
519
520/// Return the string representation of the schema used by this server.
521pub fn export_schema() -> String {
522    schema_builder().finish().sdl()
523}
524
525/// Entry point for graphql requests. Each request is stamped with a unique ID, a `ShowUsage` flag
526/// if set in the request headers, and the watermark as set by the background task.
527async fn graphql_handler(
528    ConnectInfo(addr): ConnectInfo<SocketAddr>,
529    TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
530    schema: Extension<SuiGraphQLSchema>,
531    Extension(watermark_lock): Extension<WatermarkLock>,
532    Extension(chain_identifier_lock): Extension<ChainIdentifierLock>,
533    headers: HeaderMap,
534    req: GraphQLRequest,
535) -> (axum::http::Extensions, GraphQLResponse) {
536    let mut req = req.into_inner();
537
538    req.data.insert(PayloadSize(content_length));
539    req.data.insert(Uuid::new_v4());
540    if headers.contains_key(ShowUsage::name()) {
541        req.data.insert(ShowUsage)
542    }
543
544    // Capture the IP address of the client
545    // Note: if a load balancer is used it must be configured to forward the client IP address
546    req.data.insert(addr);
547    req.data.insert(Watermark::new(watermark_lock).await);
548    req.data.insert(chain_identifier_lock.read().await);
549
550    let result = schema.execute(req).await;
551
552    // If there are errors, insert them as an extension so that the Metrics callback handler can
553    // pull it out later.
554    let mut extensions = axum::http::Extensions::new();
555    if result.is_err() {
556        extensions.insert(GraphqlErrors(std::sync::Arc::new(result.errors.clone())));
557    };
558    (extensions, result.into())
559}
560
561#[derive(Clone)]
562struct MetricsMakeCallbackHandler {
563    metrics: Metrics,
564}
565
566impl MakeCallbackHandler for MetricsMakeCallbackHandler {
567    type Handler = MetricsCallbackHandler;
568
569    fn make_handler(&self, _request: &http::request::Parts) -> Self::Handler {
570        let start = Instant::now();
571        let metrics = self.metrics.clone();
572
573        metrics.request_metrics.inflight_requests.inc();
574        metrics.inc_num_queries();
575
576        MetricsCallbackHandler { metrics, start }
577    }
578}
579
580struct MetricsCallbackHandler {
581    metrics: Metrics,
582    start: Instant,
583}
584
585impl ResponseHandler for MetricsCallbackHandler {
586    fn on_response(&mut self, response: &http::response::Parts) {
587        if let Some(errors) = response.extensions.get::<GraphqlErrors>() {
588            self.metrics.inc_errors(&errors.0);
589        }
590    }
591
592    fn on_error<E>(&mut self, _error: &E) {
593        // Do nothing if the whole service errored
594        //
595        // in Axum this isn't possible since all services are required to have an error type of
596        // Infallible
597    }
598}
599
600impl Drop for MetricsCallbackHandler {
601    fn drop(&mut self) {
602        self.metrics.query_latency(self.start.elapsed());
603        self.metrics.request_metrics.inflight_requests.dec();
604    }
605}
606
607#[derive(Debug, Clone)]
608struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);
609
610/// Connect via a TCPStream to the DB to check if it is alive
611async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
612    let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
613        return StatusCode::INTERNAL_SERVER_ERROR;
614    };
615
616    let Some(host) = url.host_str() else {
617        return StatusCode::INTERNAL_SERVER_ERROR;
618    };
619
620    let tcp_url = if let Some(port) = url.port() {
621        format!("{host}:{port}")
622    } else {
623        host.to_string()
624    };
625
626    if TcpStream::connect(tcp_url).is_err() {
627        StatusCode::INTERNAL_SERVER_ERROR
628    } else {
629        StatusCode::OK
630    }
631}
632
633#[derive(serde::Deserialize)]
634struct HealthParam {
635    max_checkpoint_lag_ms: Option<u64>,
636}
637
638/// Endpoint for querying the health of the service.
639/// It returns 500 for any internal error, including not connecting to the DB,
640/// and 504 if the checkpoint timestamp is too far behind the current timestamp as per the
641/// max checkpoint timestamp lag query parameter, or the default value if not provided.
642async fn health_check(
643    State(connection): State<ConnectionConfig>,
644    Extension(watermark_lock): Extension<WatermarkLock>,
645    AxumQuery(query_params): AxumQuery<HealthParam>,
646) -> StatusCode {
647    let db_health_check = db_health_check(axum::extract::State(connection)).await;
648    if db_health_check != StatusCode::OK {
649        return db_health_check;
650    }
651
652    let max_checkpoint_lag_ms = query_params
653        .max_checkpoint_lag_ms
654        .map(Duration::from_millis)
655        .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);
656
657    let checkpoint_timestamp =
658        Duration::from_millis(watermark_lock.read().await.hi_cp_timestamp_ms);
659
660    let now_millis = Utc::now().timestamp_millis();
661
662    // Check for negative timestamp or conversion failure
663    let now: Duration = match u64::try_from(now_millis) {
664        Ok(val) => Duration::from_millis(val),
665        Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
666    };
667
668    if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
669        return StatusCode::GATEWAY_TIMEOUT;
670    }
671
672    db_health_check
673}
674
675// One server per proc, so this is okay
676async fn get_or_init_server_start_time() -> &'static Instant {
677    static ONCE: OnceCell<Instant> = OnceCell::const_new();
678    ONCE.get_or_init(|| async move { Instant::now() }).await
679}
680
681#[cfg(test)]
682pub mod tests {
683    use super::*;
684    use crate::test_infra::cluster::{prep_executor_cluster, start_cluster};
685    use crate::types::chain_identifier::ChainIdentifier;
686    use crate::{
687        config::{ConnectionConfig, Limits, ServiceConfig, Version},
688        context_data::db_data_provider::PgManager,
689        extensions::{query_limits_checker::QueryLimitsChecker, timeout::Timeout},
690    };
691    use async_graphql::{
692        extensions::{Extension, ExtensionContext, NextExecute},
693        Request, Response, Variables,
694    };
695    use serde_json::json;
696    use std::sync::Arc;
697    use std::time::Duration;
698    use sui_pg_db::temp::get_available_port;
699    use sui_sdk::SuiClient;
700    use sui_types::digests::get_mainnet_chain_identifier;
701    use sui_types::transaction::TransactionData;
702    use uuid::Uuid;
703
704    /// Prepares a schema for tests dealing with extensions. Returns a `ServerBuilder` that can be
705    /// further extended with `context_data` and `extension` for testing.
706    async fn prep_schema(db_url: String, service_config: Option<ServiceConfig>) -> ServerBuilder {
707        let connection_config = ConnectionConfig {
708            port: get_available_port(),
709            host: "127.0.0.1".to_owned(),
710            db_url,
711            db_pool_size: 5,
712            prom_host: "127.0.0.1".to_owned(),
713            prom_port: get_available_port(),
714            skip_migration_consistency_check: false,
715        };
716        let service_config = service_config.unwrap_or_default();
717
718        let reader = PgManager::reader_with_config(
719            connection_config.db_url.clone(),
720            connection_config.db_pool_size,
721            service_config.limits.request_timeout_ms.into(),
722        )
723        .await
724        .expect("Failed to create pg connection pool");
725
726        let version = Version::for_testing();
727        let metrics = metrics();
728        let db = Db::new(
729            reader.clone(),
730            service_config.limits.clone(),
731            metrics.clone(),
732        );
733        let loader = DataLoader::new(db.clone());
734        let pg_conn_pool = PgManager::new(reader);
735        let cancellation_token = CancellationToken::new();
736        let watermark = Watermark {
737            hi_cp: 1,
738            hi_cp_timestamp_ms: 1,
739            hi_tx: 1,
740            epoch: 0,
741            lo_cp: 0,
742            lo_tx: 0,
743        };
744        let state = AppState::new(
745            connection_config.clone(),
746            service_config.clone(),
747            metrics.clone(),
748            cancellation_token.clone(),
749            version,
750        );
751        ServerBuilder::new(state)
752            .context_data(db)
753            .context_data(loader)
754            .context_data(pg_conn_pool)
755            .context_data(service_config)
756            .context_data(query_id())
757            .context_data(ip_address())
758            .context_data(watermark)
759            .context_data(ChainIdentifier::from(get_mainnet_chain_identifier()))
760            .context_data(metrics)
761    }
762
763    fn metrics() -> Metrics {
764        let binding_address: SocketAddr = format!("127.0.0.1:{}", get_available_port())
765            .parse()
766            .unwrap();
767        let registry = mysten_metrics::start_prometheus_server(binding_address).default_registry();
768        Metrics::new(&registry)
769    }
770
771    fn ip_address() -> SocketAddr {
772        let binding_address: SocketAddr = "127.0.0.1:51515".parse().unwrap();
773        binding_address
774    }
775
776    fn query_id() -> Uuid {
777        Uuid::new_v4()
778    }
779
780    #[tokio::test]
781    async fn test_timeout() {
782        telemetry_subscribers::init_for_testing();
783        let cluster = start_cluster(ServiceConfig::test_defaults()).await;
784        cluster
785            .wait_for_checkpoint_catchup(1, Duration::from_secs(30))
786            .await;
787        // timeout test includes mutation timeout, which requires a [SuiClient] to be able to run
788        // the test, and a transaction. [WalletContext] gives access to everything that's needed.
789        let wallet = &cluster.network.validator_fullnode_handle.wallet;
790        let db_url = cluster.network.graphql_connection_config.db_url.clone();
791
792        struct TimedExecuteExt {
793            pub min_req_delay: Duration,
794        }
795
796        impl ExtensionFactory for TimedExecuteExt {
797            fn create(&self) -> Arc<dyn Extension> {
798                Arc::new(TimedExecuteExt {
799                    min_req_delay: self.min_req_delay,
800                })
801            }
802        }
803
804        #[async_trait::async_trait]
805        impl Extension for TimedExecuteExt {
806            async fn execute(
807                &self,
808                ctx: &ExtensionContext<'_>,
809                operation_name: Option<&str>,
810                next: NextExecute<'_>,
811            ) -> Response {
812                tokio::time::sleep(self.min_req_delay).await;
813                next.run(ctx, operation_name).await
814            }
815        }
816
817        async fn test_timeout(
818            delay: Duration,
819            timeout: Duration,
820            query: &str,
821            sui_client: &SuiClient,
822            db_url: String,
823        ) -> Response {
824            let mut cfg = ServiceConfig::default();
825            cfg.limits.request_timeout_ms = timeout.as_millis() as u32;
826            cfg.limits.mutation_timeout_ms = timeout.as_millis() as u32;
827
828            let schema = prep_schema(db_url, Some(cfg))
829                .await
830                .context_data(Some(sui_client.clone()))
831                .extension(Timeout)
832                .extension(TimedExecuteExt {
833                    min_req_delay: delay,
834                })
835                .build_schema();
836
837            schema.execute(query).await
838        }
839
840        let query = r#"{ checkpoint(id: {sequenceNumber: 0 }) { digest }}"#;
841        let timeout = Duration::from_millis(1000);
842        let delay = Duration::from_millis(100);
843        let sui_client = wallet.get_client().await.unwrap();
844
845        test_timeout(delay, timeout, query, &sui_client, db_url.clone())
846            .await
847            .into_result()
848            .expect("Should complete successfully");
849
850        // Should timeout
851        let errs: Vec<_> = test_timeout(delay, delay, query, &sui_client, db_url.clone())
852            .await
853            .into_result()
854            .unwrap_err()
855            .into_iter()
856            .map(|e| e.message)
857            .collect();
858        let exp = format!("Query request timed out. Limit: {}s", delay.as_secs_f32());
859        assert_eq!(errs, vec![exp]);
860
861        // Should timeout for mutation
862        // Create a transaction and sign it, and use the tx_bytes + signatures for the GraphQL
863        // executeTransactionBlock mutation call.
864        let addresses = wallet.get_addresses();
865        let gas = wallet
866            .get_one_gas_object_owned_by_address(addresses[0])
867            .await
868            .unwrap();
869        let tx_data = TransactionData::new_transfer_sui(
870            addresses[1],
871            addresses[0],
872            Some(1000),
873            gas.unwrap(),
874            1_000_000,
875            wallet.get_reference_gas_price().await.unwrap(),
876        );
877
878        let tx = wallet.sign_transaction(&tx_data).await;
879        let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
880
881        let signature_base64 = &signatures[0];
882        let query = format!(
883            r#"
884            mutation {{
885              executeTransactionBlock(txBytes: "{}", signatures: "{}") {{
886                effects {{
887                  status
888                }}
889              }}
890            }}"#,
891            tx_bytes.encoded(),
892            signature_base64.encoded()
893        );
894        let errs: Vec<_> = test_timeout(delay, delay, &query, &sui_client, db_url.clone())
895            .await
896            .into_result()
897            .unwrap_err()
898            .into_iter()
899            .map(|e| e.message)
900            .collect();
901        let exp = format!(
902            "Mutation request timed out. Limit: {}s",
903            delay.as_secs_f32()
904        );
905        assert_eq!(errs, vec![exp]);
906    }
907
908    #[tokio::test]
909    async fn test_query_depth_limit() {
910        let cluster = prep_executor_cluster().await;
911        let db_url = cluster.graphql_connection_config.db_url.clone();
912
913        async fn exec_query_depth_limit(db_url: String, depth: u32, query: &str) -> Response {
914            let service_config = ServiceConfig {
915                limits: Limits {
916                    max_query_depth: depth,
917                    ..Default::default()
918                },
919                ..Default::default()
920            };
921
922            let schema = prep_schema(db_url, Some(service_config))
923                .await
924                .context_data(PayloadSize(100))
925                .extension(QueryLimitsChecker)
926                .build_schema();
927            schema.execute(query).await
928        }
929
930        exec_query_depth_limit(db_url.clone(), 1, "{ chainIdentifier }")
931            .await
932            .into_result()
933            .expect("Should complete successfully");
934
935        exec_query_depth_limit(
936            db_url.clone(),
937            5,
938            "{ chainIdentifier protocolConfig { configs { value key }} }",
939        )
940        .await
941        .into_result()
942        .expect("Should complete successfully");
943
944        // Should fail
945        let errs: Vec<_> = exec_query_depth_limit(db_url.clone(), 0, "{ chainIdentifier }")
946            .await
947            .into_result()
948            .unwrap_err()
949            .into_iter()
950            .map(|e| e.message)
951            .collect();
952
953        assert_eq!(errs, vec!["Query nesting is over 0".to_string()]);
954        let errs: Vec<_> = exec_query_depth_limit(
955            db_url.clone(),
956            2,
957            "{ chainIdentifier protocolConfig { configs { value key }} }",
958        )
959        .await
960        .into_result()
961        .unwrap_err()
962        .into_iter()
963        .map(|e| e.message)
964        .collect();
965        assert_eq!(errs, vec!["Query nesting is over 2".to_string()]);
966    }
967
968    #[tokio::test]
969    async fn test_query_node_limit() {
970        let cluster = prep_executor_cluster().await;
971        let db_url = cluster.graphql_connection_config.db_url.clone();
972        async fn exec_query_node_limit(db_url: String, nodes: u32, query: &str) -> Response {
973            let service_config = ServiceConfig {
974                limits: Limits {
975                    max_query_nodes: nodes,
976                    ..Default::default()
977                },
978                ..Default::default()
979            };
980
981            let schema = prep_schema(db_url, Some(service_config))
982                .await
983                .context_data(PayloadSize(100))
984                .extension(QueryLimitsChecker)
985                .build_schema();
986            schema.execute(query).await
987        }
988
989        exec_query_node_limit(db_url.clone(), 1, "{ chainIdentifier }")
990            .await
991            .into_result()
992            .expect("Should complete successfully");
993
994        exec_query_node_limit(
995            db_url.clone(),
996            5,
997            "{ chainIdentifier protocolConfig { configs { value key }} }",
998        )
999        .await
1000        .into_result()
1001        .expect("Should complete successfully");
1002
1003        // Should fail
1004        let err: Vec<_> = exec_query_node_limit(db_url.clone(), 0, "{ chainIdentifier }")
1005            .await
1006            .into_result()
1007            .unwrap_err()
1008            .into_iter()
1009            .map(|e| e.message)
1010            .collect();
1011        assert_eq!(err, vec!["Query has over 0 nodes".to_string()]);
1012
1013        let err: Vec<_> = exec_query_node_limit(
1014            db_url.clone(),
1015            4,
1016            "{ chainIdentifier protocolConfig { configs { value key }} }",
1017        )
1018        .await
1019        .into_result()
1020        .unwrap_err()
1021        .into_iter()
1022        .map(|e| e.message)
1023        .collect();
1024        assert_eq!(err, vec!["Query has over 4 nodes".to_string()]);
1025    }
1026
1027    #[tokio::test]
1028    async fn test_query_default_page_limit() {
1029        let cluster = prep_executor_cluster().await;
1030        let db_url = cluster.graphql_connection_config.db_url.clone();
1031
1032        let service_config = ServiceConfig {
1033            limits: Limits {
1034                default_page_size: 1,
1035                ..Default::default()
1036            },
1037            ..Default::default()
1038        };
1039        let schema = prep_schema(db_url, Some(service_config))
1040            .await
1041            .build_schema();
1042
1043        let resp = schema
1044            .execute("{ checkpoints { nodes { sequenceNumber } } }")
1045            .await;
1046        let data = resp.data.clone().into_json().unwrap();
1047        let checkpoints = data
1048            .get("checkpoints")
1049            .unwrap()
1050            .get("nodes")
1051            .unwrap()
1052            .as_array()
1053            .unwrap();
1054        assert_eq!(
1055            checkpoints.len(),
1056            1,
1057            "Checkpoints should have exactly one element"
1058        );
1059
1060        let resp = schema
1061            .execute("{ checkpoints(first: 2) { nodes { sequenceNumber } } }")
1062            .await;
1063        let data = resp.data.clone().into_json().unwrap();
1064        let checkpoints = data
1065            .get("checkpoints")
1066            .unwrap()
1067            .get("nodes")
1068            .unwrap()
1069            .as_array()
1070            .unwrap();
1071        assert_eq!(
1072            checkpoints.len(),
1073            2,
1074            "Checkpoints should return two elements"
1075        );
1076    }
1077
1078    #[tokio::test]
1079    async fn test_query_max_page_limit() {
1080        telemetry_subscribers::init_for_testing();
1081        let cluster = prep_executor_cluster().await;
1082        let db_url = cluster.graphql_connection_config.db_url.clone();
1083        let schema = prep_schema(db_url, None).await.build_schema();
1084
1085        schema
1086            .execute("{ objects(first: 1) { nodes { version } } }")
1087            .await
1088            .into_result()
1089            .expect("Should complete successfully");
1090
1091        // Should fail
1092        let err: Vec<_> = schema
1093            .execute("{ objects(first: 51) { nodes { version } } }")
1094            .await
1095            .into_result()
1096            .unwrap_err()
1097            .into_iter()
1098            .map(|e| e.message)
1099            .collect();
1100        assert_eq!(
1101            err,
1102            vec!["Connection's page size of 51 exceeds max of 50".to_string()]
1103        );
1104    }
1105
1106    #[tokio::test]
1107    async fn test_query_complexity_metrics() {
1108        telemetry_subscribers::init_for_testing();
1109        let cluster = prep_executor_cluster().await;
1110        let db_url = cluster.graphql_connection_config.db_url.clone();
1111        let server_builder = prep_schema(db_url, None)
1112            .await
1113            .context_data(PayloadSize(100));
1114        let metrics = server_builder.state.metrics.clone();
1115        let schema = server_builder
1116            .extension(QueryLimitsChecker) // QueryLimitsChecker is where we actually set the metrics
1117            .build_schema();
1118
1119        schema
1120            .execute("{ chainIdentifier }")
1121            .await
1122            .into_result()
1123            .expect("Should complete successfully");
1124
1125        let req_metrics = metrics.request_metrics;
1126        assert_eq!(req_metrics.input_nodes.get_sample_count(), 1);
1127        assert_eq!(req_metrics.output_nodes.get_sample_count(), 1);
1128        assert_eq!(req_metrics.query_depth.get_sample_count(), 1);
1129        assert_eq!(req_metrics.input_nodes.get_sample_sum(), 1.);
1130        assert_eq!(req_metrics.output_nodes.get_sample_sum(), 1.);
1131        assert_eq!(req_metrics.query_depth.get_sample_sum(), 1.);
1132
1133        schema
1134            .execute("{ chainIdentifier protocolConfig { configs { value key }} }")
1135            .await
1136            .into_result()
1137            .expect("Should complete successfully");
1138
1139        assert_eq!(req_metrics.input_nodes.get_sample_count(), 2);
1140        assert_eq!(req_metrics.output_nodes.get_sample_count(), 2);
1141        assert_eq!(req_metrics.query_depth.get_sample_count(), 2);
1142        assert_eq!(req_metrics.input_nodes.get_sample_sum(), 2. + 4.);
1143        assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
1144        assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
1145    }
1146
1147    #[tokio::test]
1148    pub async fn test_health_check() {
1149        let cluster = prep_executor_cluster().await;
1150
1151        cluster
1152            .wait_for_checkpoint_catchup(6, Duration::from_secs(60))
1153            .await;
1154
1155        let url = format!(
1156            "http://{}:{}/health",
1157            cluster.graphql_connection_config.host, cluster.graphql_connection_config.port
1158        );
1159
1160        let resp = reqwest::get(&url).await.unwrap();
1161        assert_eq!(resp.status(), reqwest::StatusCode::OK);
1162
1163        let url_with_param = format!("{}?max_checkpoint_lag_ms=1", url);
1164        let resp = reqwest::get(&url_with_param).await.unwrap();
1165        assert_eq!(resp.status(), reqwest::StatusCode::GATEWAY_TIMEOUT);
1166    }
1167
1168    /// Execute a GraphQL request with `limits` in place, expecting an error to be returned.
1169    /// Returns the list of errors returned.
1170    async fn execute_for_error(db_url: &str, limits: Limits, request: Request) -> String {
1171        let service_config = ServiceConfig {
1172            limits,
1173            ..Default::default()
1174        };
1175
1176        let schema = prep_schema(db_url.to_owned(), Some(service_config))
1177            .await
1178            .context_data(PayloadSize(
1179                // Payload size is usually set per request, and it is the size of the raw HTTP
1180                // request, which includes the query, variables, and surrounding JSON. Simulate for
1181                // testing purposes by serializing the request back into JSON and baking its length
1182                // as context data into the schema.
1183                serde_json::to_string(&request).unwrap().len() as u64,
1184            ))
1185            .extension(QueryLimitsChecker)
1186            .build_schema();
1187
1188        let errs: Vec<_> = schema
1189            .execute(request)
1190            .await
1191            .into_result()
1192            .unwrap_err()
1193            .into_iter()
1194            .map(|e| e.message)
1195            .collect();
1196
1197        errs.join("\n")
1198    }
1199
1200    #[tokio::test]
1201    async fn test_payload_read_exceeded() {
1202        let cluster = prep_executor_cluster().await;
1203        let db_url = cluster.graphql_connection_config.db_url.clone();
1204        assert_eq!(
1205            execute_for_error(
1206                &db_url,
1207                Limits {
1208                    max_tx_payload_size: 400,
1209                    max_query_payload_size: 10,
1210                    ..Default::default()
1211                },
1212                r#"
1213                    mutation {
1214                        executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1215                            effects {
1216                                status
1217                            }
1218                        }
1219                    }
1220                "#
1221                .into()
1222            )
1223            .await,
1224            "Query part too large: 354 bytes. Requests are limited to 400 bytes or fewer on \
1225             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1226             or verifyZkloginSignature) and the rest of the request (the query part) must be 10 \
1227             bytes or fewer."
1228        );
1229    }
1230
1231    #[tokio::test]
1232    async fn test_payload_mutation_exceeded() {
1233        let cluster = prep_executor_cluster().await;
1234        let db_url = cluster.graphql_connection_config.db_url.clone();
1235        assert_eq!(
1236            execute_for_error(
1237                &db_url,
1238                Limits {
1239                    max_tx_payload_size: 10,
1240                    max_query_payload_size: 400,
1241                    ..Default::default()
1242                },
1243                r#"
1244                    mutation {
1245                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1246                            effects {
1247                                status
1248                            }
1249                        }
1250                    }
1251                "#
1252                .into()
1253            )
1254            .await,
1255            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1256             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1257             or verifyZkloginSignature) and the rest of the request (the query part) must be 400 \
1258             bytes or fewer."
1259        );
1260    }
1261
1262    #[tokio::test]
1263    async fn test_payload_dry_run_exceeded() {
1264        let cluster = prep_executor_cluster().await;
1265        let db_url = cluster.graphql_connection_config.db_url.clone();
1266        assert_eq!(
1267            execute_for_error(
1268                &db_url,
1269                Limits {
1270                    max_tx_payload_size: 10,
1271                    max_query_payload_size: 400,
1272                    ..Default::default()
1273                },
1274                r#"
1275                    query {
1276                        dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1277                            error
1278                            transaction {
1279                                digest
1280                            }
1281                        }
1282                    }
1283                "#
1284                .into(),
1285            )
1286            .await,
1287            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1288             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1289             or verifyZkloginSignature) and the rest of the request (the query part) must be 400 bytes \
1290             or fewer."
1291        );
1292    }
1293
1294    #[tokio::test]
1295    async fn test_payload_zklogin_exceeded() {
1296        let cluster = prep_executor_cluster().await;
1297        let db_url = cluster.graphql_connection_config.db_url.clone();
1298        assert_eq!(
1299            execute_for_error(
1300                &db_url,
1301                Limits {
1302                    max_tx_payload_size: 10,
1303                    max_query_payload_size: 600,
1304                    ..Default::default()
1305                },
1306                r#"
1307                    query {
1308                        verifyZkloginSignature(
1309                            bytes: "AAABBBCCC",
1310                            signature: "DDD",
1311                            intentScope: TRANSACTION_DATA,
1312                            author: "0xeee",
1313                        ) {
1314                            success
1315                            errors
1316                        }
1317                    }
1318                "#
1319                .into(),
1320            )
1321            .await,
1322            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1323             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1324             or verifyZkloginSignature) and the rest of the request (the query part) must be 600 \
1325             bytes or fewer."
1326        );
1327    }
1328
1329    #[tokio::test]
1330    async fn test_payload_total_exceeded_impl() {
1331        let cluster = prep_executor_cluster().await;
1332        let db_url = cluster.graphql_connection_config.db_url.clone();
1333        assert_eq!(
1334            execute_for_error(
1335                &db_url,
1336                Limits {
1337                    max_tx_payload_size: 10,
1338                    max_query_payload_size: 10,
1339                    ..Default::default()
1340                },
1341                r#"
1342                    query {
1343                        dryRunTransactionBlock(txByte: "AAABBB") {
1344                            error
1345                            transaction {
1346                                digest
1347                            }
1348                        }
1349                    }
1350                "#
1351                .into(),
1352            )
1353            .await,
1354            "Overall request too large: 380 bytes. Requests are limited to 10 bytes or fewer on \
1355             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1356             or verifyZkloginSignature) and the rest of the request (the query part) must be 10 \
1357             bytes or fewer."
1358        );
1359    }
1360
1361    #[tokio::test]
1362    async fn test_payload_using_vars_mutation_exceeded() {
1363        let cluster = prep_executor_cluster().await;
1364        let db_url = cluster.graphql_connection_config.db_url.clone();
1365        assert_eq!(
1366            execute_for_error(
1367                &db_url,
1368                Limits {
1369                    max_tx_payload_size: 10,
1370                    max_query_payload_size: 500,
1371                    ..Default::default()
1372                },
1373                Request::new(
1374                    r#"
1375                    mutation ($tx: String!, $sigs: [String!]!) {
1376                        executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1377                            effects {
1378                                status
1379                            }
1380                        }
1381                    }
1382                    "#
1383                )
1384                .variables(Variables::from_json(json!({
1385                    "tx": "AAABBBCCC",
1386                    "sigs": ["BBB"]
1387                })))
1388            )
1389            .await,
1390            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1391             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1392             or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
1393             bytes or fewer."
1394        );
1395    }
1396
1397    #[tokio::test]
1398    async fn test_payload_using_vars_read_exceeded() {
1399        let cluster = prep_executor_cluster().await;
1400        let db_url = cluster.graphql_connection_config.db_url.clone();
1401        assert_eq!(
1402            execute_for_error(
1403                &db_url,
1404                Limits {
1405                    max_tx_payload_size: 500,
1406                    max_query_payload_size: 10,
1407                    ..Default::default()
1408                },
1409                Request::new(
1410                    r#"
1411                    mutation ($tx: String!, $sigs: [String!]!) {
1412                        executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1413                            effects {
1414                                status
1415                            }
1416                        }
1417                    }
1418                    "#
1419                )
1420                .variables(Variables::from_json(json!({
1421                    "tx": "AAA",
1422                    "sigs": ["BBB"]
1423                })))
1424            )
1425            .await,
1426            "Query part too large: 409 bytes. Requests are limited to 500 bytes or fewer on \
1427             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1428             or verifyZkloginSignature) and the rest of the request (the query part) must be 10 \
1429             bytes or fewer."
1430        );
1431    }
1432
1433    #[tokio::test]
1434    async fn test_payload_using_vars_dry_run_exceeded() {
1435        let cluster = prep_executor_cluster().await;
1436        let db_url = cluster.graphql_connection_config.db_url.clone();
1437        assert_eq!(
1438            execute_for_error(
1439                &db_url,
1440                Limits {
1441                    max_tx_payload_size: 10,
1442                    max_query_payload_size: 400,
1443                    ..Default::default()
1444                },
1445                Request::new(
1446                    r#"
1447                    query ($tx: String!) {
1448                        dryRunTransactionBlock(txBytes: $tx) {
1449                            error
1450                            transaction {
1451                                digest
1452                            }
1453                        }
1454                    }
1455                    "#
1456                )
1457                .variables(Variables::from_json(json!({
1458                    "tx": "AAABBBCCC"
1459                }))),
1460            )
1461            .await,
1462            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1463             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1464             or verifyZkloginSignature) and the rest of the request (the query part) must be 400 \
1465             bytes or fewer."
1466        );
1467    }
1468
1469    #[tokio::test]
1470    async fn test_payload_using_vars_dry_run_read_exceeded() {
1471        let cluster = prep_executor_cluster().await;
1472        let db_url = cluster.graphql_connection_config.db_url.clone();
1473        assert_eq!(
1474            execute_for_error(
1475                &db_url,
1476                Limits {
1477                    max_tx_payload_size: 400,
1478                    max_query_payload_size: 10,
1479                    ..Default::default()
1480                },
1481                Request::new(
1482                    r#"
1483                    query ($tx: String!) {
1484                        dryRunTransactionBlock(txBytes: $tx) {
1485                            error
1486                            transaction {
1487                                digest
1488                            }
1489                        }
1490                    }
1491                    "#
1492                )
1493                .variables(Variables::from_json(json!({
1494                    "tx": "AAABBBCCC"
1495                }))),
1496            )
1497            .await,
1498            "Query part too large: 398 bytes. Requests are limited to 400 bytes or fewer on \
1499             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1500             or verifyZkloginSignature) and the rest of the request (the query part) must be 10 \
1501             bytes or fewer."
1502        );
1503    }
1504
1505    #[tokio::test]
1506    async fn test_payload_multiple_execution_exceeded() {
1507        let cluster = prep_executor_cluster().await;
1508        let db_url = cluster.graphql_connection_config.db_url.clone();
1509        // First check that the limit is large enough to hold one transaction's parameters (by
1510        // checking that we hit the read limit).
1511        let err = execute_for_error(
1512            &db_url,
1513            Limits {
1514                max_tx_payload_size: 30,
1515                max_query_payload_size: 320,
1516                ..Default::default()
1517            },
1518            r#"
1519                mutation {
1520                    executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1521                        effects {
1522                            status
1523                        }
1524                    }
1525                }
1526            "#
1527            .into(),
1528        )
1529        .await;
1530        assert!(err.starts_with("Query part too large"), "{err}");
1531
1532        assert_eq!(
1533            execute_for_error(
1534                &db_url,
1535                Limits {
1536                    max_tx_payload_size: 30,
1537                    max_query_payload_size: 800,
1538                    ..Default::default()
1539                },
1540                r#"
1541                    mutation {
1542                        e0: executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1543                            effects {
1544                                status
1545                            }
1546                        }
1547                        e1: executeTransactionBlock(txBytes: "EEEFFFGGG", signatures: ["HHH"]) {
1548                            effects {
1549                                status
1550                            }
1551                        }
1552                    }
1553                "#
1554                .into()
1555            )
1556            .await,
1557            "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1558             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1559             or verifyZkloginSignature) and the rest of the request (the query part) must be 800 \
1560             bytes or fewer."
1561        );
1562    }
1563
1564    #[tokio::test]
1565    async fn test_payload_multiple_dry_run_exceeded() {
1566        let cluster = prep_executor_cluster().await;
1567        let db_url = cluster.graphql_connection_config.db_url.clone();
1568        // First check that tx limit is large enough to hold one transaction's parameters (by
1569        // checking that we hit the read limit).
1570        let err = execute_for_error(
1571            &db_url,
1572            Limits {
1573                max_tx_payload_size: 20,
1574                max_query_payload_size: 330,
1575                ..Default::default()
1576            },
1577            r#"
1578                query {
1579                    dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1580                       error
1581                       transaction {
1582                           digest
1583                       }
1584                    }
1585                }
1586            "#
1587            .into(),
1588        )
1589        .await;
1590        assert!(err.starts_with("Query part too large"), "{err}");
1591
1592        assert_eq!(
1593            execute_for_error(
1594                &db_url,
1595                Limits {
1596                    max_tx_payload_size: 20,
1597                    max_query_payload_size: 800,
1598                    ..Default::default()
1599                },
1600                r#"
1601                    query {
1602                        d0: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1603                           error
1604                           transaction {
1605                               digest
1606                           }
1607                        }
1608                        d1: dryRunTransactionBlock(txBytes: "DDDEEEFFF") {
1609                           error
1610                           transaction {
1611                               digest
1612                           }
1613                        }
1614                    }
1615                "#
1616                .into()
1617            )
1618            .await,
1619            "Transaction payload too large. Requests are limited to 20 bytes or fewer on \
1620             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1621             or verifyZkloginSignature) and the rest of the request (the query part) must be 800 \
1622             bytes or fewer."
1623        );
1624    }
1625
1626    #[tokio::test]
1627    async fn test_payload_execution_multiple_sigs_exceeded() {
1628        let cluster = prep_executor_cluster().await;
1629        let db_url = cluster.graphql_connection_config.db_url.clone();
1630        // First check that the limit is large enough to hold a transaction with a single signature
1631        // (by checking that we hite the read limit).
1632        let err = execute_for_error(
1633            &db_url,
1634            Limits {
1635                max_tx_payload_size: 30,
1636                max_query_payload_size: 320,
1637                ..Default::default()
1638            },
1639            r#"
1640                mutation {
1641                    executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1642                        effects {
1643                            status
1644                        }
1645                    }
1646                }
1647            "#
1648            .into(),
1649        )
1650        .await;
1651
1652        assert!(err.starts_with("Query part too large"), "{err}");
1653
1654        assert_eq!(
1655            execute_for_error(
1656                &db_url,
1657                Limits {
1658                    max_tx_payload_size: 30,
1659                    max_query_payload_size: 500,
1660                    ..Default::default()
1661                },
1662                r#"
1663                    mutation {
1664                        executeTransactionBlock(
1665                            txBytes: "AAA",
1666                            signatures: ["BBB", "CCC", "DDD", "EEE", "FFF"]
1667                        ) {
1668                            effects {
1669                                status
1670                            }
1671                        }
1672                    }
1673                "#
1674                .into(),
1675            )
1676            .await,
1677            "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1678             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1679             or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
1680             bytes or fewer.",
1681        )
1682    }
1683
1684    #[tokio::test]
1685    async fn test_payload_sig_var_execution_exceeded() {
1686        let cluster = prep_executor_cluster().await;
1687        let db_url = cluster.graphql_connection_config.db_url.clone();
1688        // Variables can show up in the sub-structure of a GraphQL value as well, and we need to
1689        // count those as well.
1690        assert_eq!(
1691            execute_for_error(
1692                &db_url,
1693                Limits {
1694                    max_tx_payload_size: 10,
1695                    max_query_payload_size: 500,
1696                    ..Default::default()
1697                },
1698                Request::new(
1699                    r#"
1700                    mutation ($tx: String!, $sig: String!) {
1701                        executeTransactionBlock(txBytes: $tx, signatures: [$sig]) {
1702                            effects {
1703                                status
1704                            }
1705                        }
1706                    }
1707                    "#
1708                )
1709                .variables(Variables::from_json(json!({
1710                    "tx": "AAA",
1711                    "sig": "BBB"
1712                })))
1713            )
1714            .await,
1715            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1716             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1717             or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
1718             bytes or fewer."
1719        );
1720    }
1721
1722    /// Check if the error indicates that the request passed the overall size check and the
1723    /// transaction payload check.
1724    fn passed_tx_checks(err: &str) -> bool {
1725        !err.starts_with("Overall request too large")
1726            && !err.starts_with("Transaction payload too large")
1727    }
1728
1729    #[tokio::test]
1730    async fn test_payload_reusing_vars_execution() {
1731        let cluster = prep_executor_cluster().await;
1732        let db_url = cluster.graphql_connection_config.db_url.clone();
1733        // Test that when variables are re-used as execution params, the size of the variable is
1734        // only counted once.
1735
1736        // First, check that `error_passed_tx_checks` is working, by submitting a request that will
1737        // fail the initial payload check.
1738        assert!(!passed_tx_checks(
1739            &execute_for_error(
1740                &db_url,
1741                Limits {
1742                    max_tx_payload_size: 1,
1743                    max_query_payload_size: 1,
1744                    ..Default::default()
1745                },
1746                r#"
1747                    mutation {
1748                        executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1749                            effects {
1750                                status
1751                            }
1752                        }
1753                    }
1754                "#
1755                .into()
1756            )
1757            .await
1758        ));
1759
1760        let limits = Limits {
1761            max_tx_payload_size: 20,
1762            max_query_payload_size: 1000,
1763            ..Default::default()
1764        };
1765
1766        // Then check that a request that uses the variable once passes the transaction limit
1767        // check.
1768        assert!(passed_tx_checks(
1769            &execute_for_error(
1770                &db_url,
1771                limits.clone(),
1772                Request::new(
1773                    r#"
1774                    mutation ($sig: String!) {
1775                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig]) {
1776                            effects {
1777                                status
1778                            }
1779                        }
1780                    }
1781                    "#,
1782                )
1783                .variables(Variables::from_json(json!({
1784                    "sig": "BBB"
1785                })))
1786            )
1787            .await
1788        ));
1789
1790        // Then check that a request that introduces an extra signature, but without re-using the
1791        // variable fails the transaction limit.
1792        assert!(!passed_tx_checks(
1793            &execute_for_error(
1794                &db_url,
1795                limits.clone(),
1796                Request::new(
1797                    r#"
1798                    mutation ($sig: String!) {
1799                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, "BBB"]) {
1800                            effects {
1801                                status
1802                            }
1803                        }
1804                    }
1805                    "#,
1806                )
1807                .variables(Variables::from_json(json!({
1808                    "sig": "BBB"
1809                })))
1810            )
1811            .await
1812        ));
1813
1814        // And then when that use is replaced by re-using the variable, we should be under the
1815        // transaction payload limit again.
1816        assert!(passed_tx_checks(
1817            &execute_for_error(
1818                &db_url,
1819                limits,
1820                Request::new(
1821                    r#"
1822                    mutation ($sig: String!) {
1823                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, $sig]) {
1824                            effects {
1825                                status
1826                            }
1827                        }
1828                    }
1829                    "#,
1830                )
1831                .variables(Variables::from_json(json!({
1832                    "sig": "BBB"
1833                })))
1834            )
1835            .await
1836        ));
1837    }
1838
1839    #[tokio::test]
1840    async fn test_payload_reusing_vars_dry_run() {
1841        let cluster = prep_executor_cluster().await;
1842        let db_url = cluster.graphql_connection_config.db_url.clone();
1843        // Like `test_payload_reusing_vars_execution` but the variable is used in a dry-run.
1844
1845        let limits = Limits {
1846            max_tx_payload_size: 20,
1847            max_query_payload_size: 1000,
1848            ..Default::default()
1849        };
1850
1851        // A single dry-run is under the limit.
1852        assert!(passed_tx_checks(
1853            &execute_for_error(
1854                &db_url,
1855                limits.clone(),
1856                Request::new(
1857                    r#"
1858                    query ($tx: String!) {
1859                        dryRunTransactionBlock(txBytes: $tx) {
1860                            error
1861                            transaction {
1862                                digest
1863                            }
1864                        }
1865                    }
1866                    "#,
1867                )
1868                .variables(Variables::from_json(json!({
1869                    "tx": "AAABBBCCC"
1870                })))
1871            )
1872            .await
1873        ));
1874
1875        // Duplicating the dry-run causes us to hit the limit.
1876        assert!(!passed_tx_checks(
1877            &execute_for_error(
1878                &db_url,
1879                limits.clone(),
1880                Request::new(
1881                    r#"
1882                    query ($tx: String!) {
1883                        d0: dryRunTransactionBlock(txBytes: $tx) {
1884                            error
1885                            transaction {
1886                                digest
1887                            }
1888                        }
1889
1890                        d1: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1891                            error
1892                            transaction {
1893                                digest
1894                            }
1895                        }
1896                    }
1897                    "#,
1898                )
1899                .variables(Variables::from_json(json!({
1900                    "tx": "AAABBBCCC"
1901                })))
1902            )
1903            .await
1904        ));
1905
1906        // And by re-using the variable, we are under the transaction limit again.
1907        assert!(passed_tx_checks(
1908            &execute_for_error(
1909                &db_url,
1910                limits,
1911                Request::new(
1912                    r#"
1913                    query ($tx: String!) {
1914                        d0: dryRunTransactionBlock(txBytes: $tx) {
1915                            error
1916                            transaction {
1917                                digest
1918                            }
1919                        }
1920
1921                        d1: dryRunTransactionBlock(txBytes: $tx) {
1922                            error
1923                            transaction {
1924                                digest
1925                            }
1926                        }
1927                    }
1928                    "#,
1929                )
1930                .variables(Variables::from_json(json!({
1931                    "tx": "AAABBBCCC"
1932                })))
1933            )
1934            .await
1935        ));
1936    }
1937
1938    #[tokio::test]
1939    async fn test_payload_named_fragment_execution_exceeded() {
1940        let cluster = prep_executor_cluster().await;
1941        let db_url = cluster.graphql_connection_config.db_url.clone();
1942        assert_eq!(
1943            execute_for_error(
1944                &db_url,
1945                Limits {
1946                    max_tx_payload_size: 10,
1947                    max_query_payload_size: 500,
1948                    ..Default::default()
1949                },
1950                r#"
1951                    mutation {
1952                        ...Tx
1953                    }
1954
1955                    fragment Tx on Mutation {
1956                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1957                            effects {
1958                                status
1959                            }
1960                        }
1961                    }
1962                "#
1963                .into()
1964            )
1965            .await,
1966            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1967             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1968             or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
1969             bytes or fewer."
1970        );
1971    }
1972
1973    #[tokio::test]
1974    async fn test_payload_inline_fragment_execution_exceeded() {
1975        let cluster = prep_executor_cluster().await;
1976        let db_url = cluster.graphql_connection_config.db_url.clone();
1977        assert_eq!(
1978            execute_for_error(
1979                &db_url,
1980                Limits {
1981                    max_tx_payload_size: 10,
1982                    max_query_payload_size: 500,
1983                    ..Default::default()
1984                },
1985                r#"
1986                    mutation {
1987                        ... on Mutation {
1988                            executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1989                                effects {
1990                                    status
1991                                }
1992                            }
1993                        }
1994                    }
1995                "#
1996                .into()
1997            )
1998            .await,
1999            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2000             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
2001             or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
2002             bytes or fewer."
2003        );
2004    }
2005
2006    #[tokio::test]
2007    async fn test_payload_named_fragment_dry_run_exceeded() {
2008        let cluster = prep_executor_cluster().await;
2009        let db_url = cluster.graphql_connection_config.db_url.clone();
2010        assert_eq!(
2011            execute_for_error(
2012                &db_url,
2013                Limits {
2014                    max_tx_payload_size: 10,
2015                    max_query_payload_size: 500,
2016                    ..Default::default()
2017                },
2018                r#"
2019                    query {
2020                        ...DryRun
2021                    }
2022
2023                    fragment DryRun on Query {
2024                        dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2025                            error
2026                            transaction {
2027                                digest
2028                            }
2029                        }
2030                    }
2031                "#
2032                .into(),
2033            )
2034            .await,
2035            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2036             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
2037             or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
2038             bytes or fewer."
2039        );
2040    }
2041
2042    #[tokio::test]
2043    async fn test_payload_inline_fragment_dry_run_exceeded() {
2044        let cluster = prep_executor_cluster().await;
2045        let db_url = cluster.graphql_connection_config.db_url.clone();
2046        assert_eq!(
2047            execute_for_error(
2048                &db_url,
2049                Limits {
2050                    max_tx_payload_size: 10,
2051                    max_query_payload_size: 500,
2052                    ..Default::default()
2053                },
2054                r#"
2055                    query {
2056                        ... on Query {
2057                            dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2058                                error
2059                                transaction {
2060                                    digest
2061                                }
2062                            }
2063                        }
2064                    }
2065                "#
2066                .into(),
2067            )
2068            .await,
2069            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2070             transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
2071             or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
2072             bytes or fewer."
2073        );
2074    }
2075
2076    #[tokio::test]
2077    async fn test_multi_get_objects_query_limits_exceeded() {
2078        let cluster = prep_executor_cluster().await;
2079        let db_url = cluster.graphql_connection_config.db_url.clone();
2080        assert_eq!(
2081            execute_for_error(
2082                &db_url,
2083                Limits {
2084                    max_output_nodes: 5,
2085                    ..Default::default()
2086                }, // the query will have 6 output nodes: 2 keys * 3 fields, thus exceeding the
2087                   // limit
2088                r#"
2089                    query {
2090                          multiGetObjects(
2091                            keys: [
2092                              {objectId: "0x01dcb4674affb04e68d8088895e951f4ea335ef1695e9e50c166618f6789d808", version: 2},
2093                              {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2094                            ]
2095                          ) {
2096                                address
2097                                status
2098                                version
2099                            }
2100                    }
2101                "#
2102                .into(),
2103            )
2104            .await,
2105            "Estimated output nodes exceeds 5"
2106        );
2107        assert_eq!(
2108            execute_for_error(
2109                &db_url,
2110                Limits {
2111                    max_output_nodes: 4,
2112                    ..Default::default()
2113                }, // the query will have 5 output nodes: 5keys * 1 field, thus exceeding the limit
2114                r#"
2115                    query {
2116                          multiGetObjects(
2117                            keys: [
2118                              {objectId: "0x01dcb4674affb04e68d8088895e951f4ea335ef1695e9e50c166618f6789d808", version: 2},
2119                              {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2120                              {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2121                              {objectId: "0x33032e0706337632361f2607b79df8c9d1079e8069259b27b1fa5c0394e79893", version: 2},
2122                              {objectId: "0x388295e3ecad53986ebf9a7a1e5854b7df94c3f1f0bba934c5396a2a9eb4550b", version: 2},
2123                            ]
2124                          ) {
2125                                address
2126                            }
2127                    }
2128                "#
2129                .into(),
2130            )
2131            .await,
2132            "Estimated output nodes exceeds 4"
2133        );
2134    }
2135
2136    #[tokio::test]
2137    async fn test_multi_get_objects_query_limits_pass() {
2138        let cluster = prep_executor_cluster().await;
2139        let db_url = cluster.graphql_connection_config.db_url.clone();
2140        let service_config = ServiceConfig {
2141            limits: Limits {
2142                max_output_nodes: 5,
2143                ..Default::default()
2144            },
2145            ..Default::default()
2146        };
2147
2148        let schema = prep_schema(db_url, Some(service_config))
2149            .await
2150            .build_schema();
2151
2152        let resp = schema
2153            .execute(
2154                // query will have 5 output nodes: 5 keys * 1 field, thus not exceeding the limit
2155                r#"
2156                    query {
2157                          multiGetObjects(
2158                            keys: [
2159                              {objectId: "0x01dcb4674affb04e68d8088895e951f4ea335ef1695e9e50c166618f6789d808", version: 2},
2160                              {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2161                              {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2162                              {objectId: "0x33032e0706337632361f2607b79df8c9d1079e8069259b27b1fa5c0394e79893", version: 2},
2163                              {objectId: "0x388295e3ecad53986ebf9a7a1e5854b7df94c3f1f0bba934c5396a2a9eb4550b", version: 2},
2164                            ]
2165                          ) {
2166                                address
2167                            }
2168                    }
2169                "#)
2170            .await;
2171        assert!(resp.is_ok());
2172        assert!(resp.errors.is_empty());
2173
2174        let resp = schema
2175            .execute(
2176                // query will have 4 output nodes: 2 keys * 2 fields, thus not exceeding the limit
2177                r#"
2178                    query {
2179                          multiGetObjects(
2180                            keys: [
2181                              {objectId: "0x01dcb4674affb04e68d8088895e951f4ea335ef1695e9e50c166618f6789d808", version: 2},
2182                              {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2183                            ]
2184                          ) {
2185                                address
2186                                status
2187                            }
2188                    }
2189                "#)
2190            .await;
2191        assert!(resp.is_ok());
2192        assert!(resp.errors.is_empty());
2193    }
2194}