1use super::exchange_rates_task::TriggerExchangeRatesTask;
5use super::system_package_task::SystemPackageTask;
6use super::watermark_task::{ChainIdentifierLock, Watermark, WatermarkLock, WatermarkTask};
7use crate::config::{
8 ConnectionConfig, ServiceConfig, Version, MAX_CONCURRENT_REQUESTS,
9 RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
10};
11use crate::data::move_registry_data_loader::MoveRegistryDataLoader;
12use crate::data::package_resolver::{DbPackageStore, PackageResolver};
13use crate::data::{DataLoader, Db};
14use crate::extensions::directive_checker::DirectiveChecker;
15use crate::metrics::Metrics;
16use crate::mutation::Mutation;
17use crate::types::datatype::IMoveDatatype;
18use crate::types::move_object::IMoveObject;
19use crate::types::object::IObject;
20use crate::types::owner::IOwner;
21use crate::{
22 config::ServerConfig,
23 context_data::db_data_provider::PgManager,
24 error::Error,
25 extensions::{
26 feature_gate::FeatureGate,
27 logger::Logger,
28 query_limits_checker::{PayloadSize, QueryLimitsChecker, ShowUsage},
29 timeout::Timeout,
30 },
31 server::version::set_version_middleware,
32 types::query::{Query, SuiGraphQLSchema},
33};
34use async_graphql::extensions::ApolloTracing;
35use async_graphql::extensions::Tracing;
36use async_graphql::EmptySubscription;
37use async_graphql::{extensions::ExtensionFactory, Schema, SchemaBuilder};
38use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
39use axum::body::Body;
40use axum::extract::FromRef;
41use axum::extract::{ConnectInfo, Query as AxumQuery, State};
42use axum::http::{HeaderMap, StatusCode};
43use axum::middleware::{self};
44use axum::response::IntoResponse;
45use axum::routing::{get, post, MethodRouter, Route};
46use axum::Extension;
47use axum::Router;
48use axum_extra::headers::ContentLength;
49use axum_extra::TypedHeader;
50use chrono::Utc;
51use http::{HeaderValue, Method, Request};
52use mysten_metrics::spawn_monitored_task;
53use mysten_network::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler};
54use std::convert::Infallible;
55use std::net::TcpStream;
56use std::sync::Arc;
57use std::time::Duration;
58use std::{any::Any, net::SocketAddr, time::Instant};
59use sui_graphql_rpc_headers::LIMITS_HEADER;
60use sui_indexer::db::check_db_migration_consistency;
61use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
62use sui_sdk::SuiClientBuilder;
63use tokio::join;
64use tokio::sync::OnceCell;
65use tokio_util::sync::CancellationToken;
66use tower::{Layer, Service};
67use tower_http::cors::{AllowOrigin, CorsLayer};
68use tracing::{info, warn};
69use uuid::Uuid;
70
71const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);
73
74pub(crate) struct Server {
75 router: Router,
77 address: String,
78 watermark_task: WatermarkTask,
79 system_package_task: SystemPackageTask,
80 trigger_exchange_rates_task: TriggerExchangeRatesTask,
81 state: AppState,
82}
83
84impl Server {
85 pub async fn run(mut self) -> Result<(), Error> {
88 get_or_init_server_start_time().await;
89
90 let watermark_task = {
93 info!("Starting watermark update task");
94 spawn_monitored_task!(async move {
95 self.watermark_task.run().await;
96 })
97 };
98
99 let system_package_task = {
101 info!("Starting system package task");
102 spawn_monitored_task!(async move {
103 self.system_package_task.run().await;
104 })
105 };
106
107 let trigger_exchange_rates_task = {
108 info!("Starting trigger exchange rates task");
109 spawn_monitored_task!(async move {
110 self.trigger_exchange_rates_task.run().await;
111 })
112 };
113
114 let server_task = {
115 info!("Starting graphql service");
116 let cancellation_token = self.state.cancellation_token.clone();
117 spawn_monitored_task!(async move {
118 let listener = tokio::net::TcpListener::bind(&self.address).await.unwrap();
119 axum::serve(
120 listener,
121 self.router
122 .into_make_service_with_connect_info::<SocketAddr>(),
123 )
124 .with_graceful_shutdown(async move {
125 cancellation_token.cancelled().await;
126 info!("Shutdown signal received, terminating graphql service");
127 })
128 .await
129 .map_err(|e| Error::Internal(format!("Server run failed: {}", e)))
130 })
131 };
132
133 let _ = join!(
136 watermark_task,
137 system_package_task,
138 trigger_exchange_rates_task,
139 server_task
140 );
141
142 Ok(())
143 }
144}
145
146pub(crate) struct ServerBuilder {
147 state: AppState,
148 schema: SchemaBuilder<Query, Mutation, EmptySubscription>,
149 router: Option<Router>,
150 db_reader: Option<Db>,
151 resolver: Option<PackageResolver>,
152}
153
154#[derive(Clone)]
155pub(crate) struct AppState {
156 connection: ConnectionConfig,
157 service: ServiceConfig,
158 metrics: Metrics,
159 cancellation_token: CancellationToken,
160 pub version: Version,
161}
162
163impl AppState {
164 pub(crate) fn new(
165 connection: ConnectionConfig,
166 service: ServiceConfig,
167 metrics: Metrics,
168 cancellation_token: CancellationToken,
169 version: Version,
170 ) -> Self {
171 Self {
172 connection,
173 service,
174 metrics,
175 cancellation_token,
176 version,
177 }
178 }
179}
180
181impl FromRef<AppState> for ConnectionConfig {
182 fn from_ref(app_state: &AppState) -> ConnectionConfig {
183 app_state.connection.clone()
184 }
185}
186
187impl FromRef<AppState> for Metrics {
188 fn from_ref(app_state: &AppState) -> Metrics {
189 app_state.metrics.clone()
190 }
191}
192
193impl ServerBuilder {
194 pub fn new(state: AppState) -> Self {
195 Self {
196 state,
197 schema: schema_builder(),
198 router: None,
199 db_reader: None,
200 resolver: None,
201 }
202 }
203
204 pub fn address(&self) -> String {
205 format!(
206 "{}:{}",
207 self.state.connection.host, self.state.connection.port
208 )
209 }
210
211 pub fn context_data(mut self, context_data: impl Any + Send + Sync) -> Self {
212 self.schema = self.schema.data(context_data);
213 self
214 }
215
216 pub fn extension(mut self, extension: impl ExtensionFactory) -> Self {
217 self.schema = self.schema.extension(extension);
218 self
219 }
220
221 #[cfg(test)]
222 fn build_schema(self) -> Schema<Query, Mutation, EmptySubscription> {
223 self.schema.finish()
224 }
225
226 fn build_components(
229 self,
230 ) -> (
231 String,
232 Schema<Query, Mutation, EmptySubscription>,
233 Db,
234 PackageResolver,
235 Router,
236 ) {
237 let address = self.address();
238 let ServerBuilder {
239 state: _,
240 schema,
241 db_reader,
242 resolver,
243 router,
244 } = self;
245 (
246 address,
247 schema.finish(),
248 db_reader.expect("DB reader not initialized"),
249 resolver.expect("Package resolver not initialized"),
250 router.expect("Router not initialized"),
251 )
252 }
253
254 fn init_router(&mut self) {
255 if self.router.is_none() {
256 let router: Router = Router::new()
257 .route("/", post(graphql_handler))
258 .route("/graphql", post(graphql_handler))
259 .route("/health", get(health_check))
260 .route("/graphql/health", get(health_check))
261 .with_state(self.state.clone())
262 .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
263 metrics: self.state.metrics.clone(),
264 }));
265 self.router = Some(router);
266 }
267 }
268
269 pub fn route(mut self, path: &str, method_handler: MethodRouter) -> Self {
270 self.init_router();
271 self.router = self.router.map(|router| router.route(path, method_handler));
272 self
273 }
274
275 pub fn layer<L>(mut self, layer: L) -> Self
276 where
277 L: Layer<Route> + Clone + Send + 'static,
278 L::Service: Service<Request<Body>> + Clone + Send + 'static,
279 <L::Service as Service<Request<Body>>>::Response: IntoResponse + 'static,
280 <L::Service as Service<Request<Body>>>::Error: Into<Infallible> + 'static,
281 <L::Service as Service<Request<Body>>>::Future: Send + 'static,
282 {
283 self.init_router();
284 self.router = self.router.map(|router| router.layer(layer));
285 self
286 }
287
288 fn cors() -> Result<CorsLayer, Error> {
289 let acl = match std::env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
290 Ok(value) => {
291 let allow_hosts = value
292 .split(',')
293 .map(HeaderValue::from_str)
294 .collect::<Result<Vec<_>, _>>()
295 .map_err(|_| {
296 Error::Internal(
297 "Cannot resolve access control origin env variable".to_string(),
298 )
299 })?;
300 AllowOrigin::list(allow_hosts)
301 }
302 _ => AllowOrigin::any(),
303 };
304 info!("Access control allow origin set to: {acl:?}");
305
306 let cors = CorsLayer::new()
307 .allow_methods([Method::POST])
309 .allow_origin(acl)
311 .allow_headers([hyper::header::CONTENT_TYPE, LIMITS_HEADER.clone()]);
312 Ok(cors)
313 }
314
315 pub fn build(self) -> Result<Server, Error> {
317 let state = self.state.clone();
318 let (address, schema, db_reader, resolver, router) = self.build_components();
319
320 let watermark_task = WatermarkTask::new(
322 db_reader.clone(),
323 state.metrics.clone(),
324 std::time::Duration::from_millis(state.service.background_tasks.watermark_update_ms),
325 state.cancellation_token.clone(),
326 );
327
328 let system_package_task = SystemPackageTask::new(
329 resolver,
330 watermark_task.epoch_receiver(),
331 state.cancellation_token.clone(),
332 );
333
334 let trigger_exchange_rates_task = TriggerExchangeRatesTask::new(
335 db_reader,
336 watermark_task.epoch_receiver(),
337 state.cancellation_token.clone(),
338 );
339
340 let router = router
341 .route_layer(middleware::from_fn_with_state(
342 state.version,
343 set_version_middleware,
344 ))
345 .layer(axum::extract::Extension(schema))
346 .layer(axum::extract::Extension(watermark_task.lock()))
347 .layer(axum::extract::Extension(watermark_task.chain_id_lock()))
348 .layer(Self::cors()?);
349
350 Ok(Server {
351 router,
352 address,
353 watermark_task,
354 system_package_task,
355 trigger_exchange_rates_task,
356 state,
357 })
358 }
359
360 pub async fn from_config(
363 config: &ServerConfig,
364 version: &Version,
365 cancellation_token: CancellationToken,
366 ) -> Result<Self, Error> {
367 let prom_addr: SocketAddr = format!(
369 "{}:{}",
370 config.connection.prom_host, config.connection.prom_port
371 )
372 .parse()
373 .map_err(|_| {
374 Error::Internal(format!(
375 "Failed to parse url {}, port {} into socket address",
376 config.connection.prom_host, config.connection.prom_port
377 ))
378 })?;
379
380 let registry_service = mysten_metrics::start_prometheus_server(prom_addr);
381 info!("Starting Prometheus HTTP endpoint at {}", prom_addr);
382 let registry = registry_service.default_registry();
383 registry
384 .register(mysten_metrics::uptime_metric(
385 "graphql",
386 version.full,
387 "unknown",
388 ))
389 .unwrap();
390
391 let metrics = Metrics::new(®istry);
393 let state = AppState::new(
394 config.connection.clone(),
395 config.service.clone(),
396 metrics.clone(),
397 cancellation_token,
398 *version,
399 );
400 let mut builder = ServerBuilder::new(state);
401
402 let name_service_config = config.service.name_service.clone();
403 let move_registry_config = config.service.move_registry.clone();
404 let zklogin_config = config.service.zklogin.clone();
405 let reader = PgManager::reader_with_config(
406 config.connection.db_url.clone(),
407 config.connection.db_pool_size,
408 config.service.limits.request_timeout_ms.into(),
412 )
413 .await
414 .map_err(|e| Error::Internal(format!("Failed to create pg connection pool: {}", e)))?;
415
416 if !config.connection.skip_migration_consistency_check {
417 check_db_migration_consistency(
418 &mut reader
419 .pool()
420 .get()
421 .await
422 .map_err(|e| Error::Internal(e.to_string()))?,
423 )
424 .await?;
425 }
426
427 let db = Db::new(
429 reader.clone(),
430 config.service.limits.clone(),
431 metrics.clone(),
432 );
433 let loader = DataLoader::new(db.clone());
434 let pg_conn_pool = PgManager::new(reader.clone());
435 let package_store = DbPackageStore::new(loader.clone());
436 let resolver = Arc::new(Resolver::new_with_limits(
437 PackageStoreWithLruCache::new(package_store),
438 config.service.limits.package_resolver_limits(),
439 ));
440
441 builder.db_reader = Some(db.clone());
442 builder.resolver = Some(resolver.clone());
443
444 let sui_sdk_client = if let Some(url) = &config.tx_exec_full_node.node_rpc_url {
447 Some(
448 SuiClientBuilder::default()
449 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
450 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
451 .build(url)
452 .await
453 .map_err(|e| Error::Internal(format!("Failed to create SuiClient: {}", e)))?,
454 )
455 } else {
456 warn!("No fullnode url found in config. `dryRunTransactionBlock` and `executeTransactionBlock` will not work");
457 None
458 };
459
460 builder = builder
461 .context_data(config.service.clone())
462 .context_data(loader)
463 .context_data(db)
464 .context_data(pg_conn_pool)
465 .context_data(resolver)
466 .context_data(sui_sdk_client)
467 .context_data(name_service_config)
468 .context_data(zklogin_config)
469 .context_data(metrics.clone())
470 .context_data(config.clone())
471 .context_data(move_registry_config.clone())
472 .context_data(MoveRegistryDataLoader::new(
473 move_registry_config,
474 metrics.clone(),
475 ));
476
477 if config.internal_features.feature_gate {
478 builder = builder.extension(FeatureGate);
479 }
480
481 if config.internal_features.logger {
482 builder = builder.extension(Logger::default());
483 }
484
485 if config.internal_features.query_limits_checker {
486 builder = builder.extension(QueryLimitsChecker);
487 }
488
489 if config.internal_features.directive_checker {
490 builder = builder.extension(DirectiveChecker);
491 }
492
493 if config.internal_features.query_timeout {
494 builder = builder.extension(Timeout);
495 }
496
497 if config.internal_features.tracing {
498 builder = builder.extension(Tracing);
499 }
500
501 if config.internal_features.apollo_tracing {
502 builder = builder.extension(ApolloTracing);
503 }
504
505 Ok(builder)
509 }
510}
511
512fn schema_builder() -> SchemaBuilder<Query, Mutation, EmptySubscription> {
513 async_graphql::Schema::build(Query, Mutation, EmptySubscription)
514 .register_output_type::<IMoveObject>()
515 .register_output_type::<IObject>()
516 .register_output_type::<IOwner>()
517 .register_output_type::<IMoveDatatype>()
518}
519
520pub fn export_schema() -> String {
522 schema_builder().finish().sdl()
523}
524
525async fn graphql_handler(
528 ConnectInfo(addr): ConnectInfo<SocketAddr>,
529 TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
530 schema: Extension<SuiGraphQLSchema>,
531 Extension(watermark_lock): Extension<WatermarkLock>,
532 Extension(chain_identifier_lock): Extension<ChainIdentifierLock>,
533 headers: HeaderMap,
534 req: GraphQLRequest,
535) -> (axum::http::Extensions, GraphQLResponse) {
536 let mut req = req.into_inner();
537
538 req.data.insert(PayloadSize(content_length));
539 req.data.insert(Uuid::new_v4());
540 if headers.contains_key(ShowUsage::name()) {
541 req.data.insert(ShowUsage)
542 }
543
544 req.data.insert(addr);
547 req.data.insert(Watermark::new(watermark_lock).await);
548 req.data.insert(chain_identifier_lock.read().await);
549
550 let result = schema.execute(req).await;
551
552 let mut extensions = axum::http::Extensions::new();
555 if result.is_err() {
556 extensions.insert(GraphqlErrors(std::sync::Arc::new(result.errors.clone())));
557 };
558 (extensions, result.into())
559}
560
561#[derive(Clone)]
562struct MetricsMakeCallbackHandler {
563 metrics: Metrics,
564}
565
566impl MakeCallbackHandler for MetricsMakeCallbackHandler {
567 type Handler = MetricsCallbackHandler;
568
569 fn make_handler(&self, _request: &http::request::Parts) -> Self::Handler {
570 let start = Instant::now();
571 let metrics = self.metrics.clone();
572
573 metrics.request_metrics.inflight_requests.inc();
574 metrics.inc_num_queries();
575
576 MetricsCallbackHandler { metrics, start }
577 }
578}
579
580struct MetricsCallbackHandler {
581 metrics: Metrics,
582 start: Instant,
583}
584
585impl ResponseHandler for MetricsCallbackHandler {
586 fn on_response(&mut self, response: &http::response::Parts) {
587 if let Some(errors) = response.extensions.get::<GraphqlErrors>() {
588 self.metrics.inc_errors(&errors.0);
589 }
590 }
591
592 fn on_error<E>(&mut self, _error: &E) {
593 }
598}
599
600impl Drop for MetricsCallbackHandler {
601 fn drop(&mut self) {
602 self.metrics.query_latency(self.start.elapsed());
603 self.metrics.request_metrics.inflight_requests.dec();
604 }
605}
606
607#[derive(Debug, Clone)]
608struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);
609
610async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
612 let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
613 return StatusCode::INTERNAL_SERVER_ERROR;
614 };
615
616 let Some(host) = url.host_str() else {
617 return StatusCode::INTERNAL_SERVER_ERROR;
618 };
619
620 let tcp_url = if let Some(port) = url.port() {
621 format!("{host}:{port}")
622 } else {
623 host.to_string()
624 };
625
626 if TcpStream::connect(tcp_url).is_err() {
627 StatusCode::INTERNAL_SERVER_ERROR
628 } else {
629 StatusCode::OK
630 }
631}
632
633#[derive(serde::Deserialize)]
634struct HealthParam {
635 max_checkpoint_lag_ms: Option<u64>,
636}
637
638async fn health_check(
643 State(connection): State<ConnectionConfig>,
644 Extension(watermark_lock): Extension<WatermarkLock>,
645 AxumQuery(query_params): AxumQuery<HealthParam>,
646) -> StatusCode {
647 let db_health_check = db_health_check(axum::extract::State(connection)).await;
648 if db_health_check != StatusCode::OK {
649 return db_health_check;
650 }
651
652 let max_checkpoint_lag_ms = query_params
653 .max_checkpoint_lag_ms
654 .map(Duration::from_millis)
655 .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);
656
657 let checkpoint_timestamp =
658 Duration::from_millis(watermark_lock.read().await.hi_cp_timestamp_ms);
659
660 let now_millis = Utc::now().timestamp_millis();
661
662 let now: Duration = match u64::try_from(now_millis) {
664 Ok(val) => Duration::from_millis(val),
665 Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
666 };
667
668 if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
669 return StatusCode::GATEWAY_TIMEOUT;
670 }
671
672 db_health_check
673}
674
675async fn get_or_init_server_start_time() -> &'static Instant {
677 static ONCE: OnceCell<Instant> = OnceCell::const_new();
678 ONCE.get_or_init(|| async move { Instant::now() }).await
679}
680
681#[cfg(test)]
682pub mod tests {
683 use super::*;
684 use crate::test_infra::cluster::{prep_executor_cluster, start_cluster};
685 use crate::types::chain_identifier::ChainIdentifier;
686 use crate::{
687 config::{ConnectionConfig, Limits, ServiceConfig, Version},
688 context_data::db_data_provider::PgManager,
689 extensions::{query_limits_checker::QueryLimitsChecker, timeout::Timeout},
690 };
691 use async_graphql::{
692 extensions::{Extension, ExtensionContext, NextExecute},
693 Request, Response, Variables,
694 };
695 use serde_json::json;
696 use std::sync::Arc;
697 use std::time::Duration;
698 use sui_pg_db::temp::get_available_port;
699 use sui_sdk::SuiClient;
700 use sui_types::digests::get_mainnet_chain_identifier;
701 use sui_types::transaction::TransactionData;
702 use uuid::Uuid;
703
704 async fn prep_schema(db_url: String, service_config: Option<ServiceConfig>) -> ServerBuilder {
707 let connection_config = ConnectionConfig {
708 port: get_available_port(),
709 host: "127.0.0.1".to_owned(),
710 db_url,
711 db_pool_size: 5,
712 prom_host: "127.0.0.1".to_owned(),
713 prom_port: get_available_port(),
714 skip_migration_consistency_check: false,
715 };
716 let service_config = service_config.unwrap_or_default();
717
718 let reader = PgManager::reader_with_config(
719 connection_config.db_url.clone(),
720 connection_config.db_pool_size,
721 service_config.limits.request_timeout_ms.into(),
722 )
723 .await
724 .expect("Failed to create pg connection pool");
725
726 let version = Version::for_testing();
727 let metrics = metrics();
728 let db = Db::new(
729 reader.clone(),
730 service_config.limits.clone(),
731 metrics.clone(),
732 );
733 let loader = DataLoader::new(db.clone());
734 let pg_conn_pool = PgManager::new(reader);
735 let cancellation_token = CancellationToken::new();
736 let watermark = Watermark {
737 hi_cp: 1,
738 hi_cp_timestamp_ms: 1,
739 hi_tx: 1,
740 epoch: 0,
741 lo_cp: 0,
742 lo_tx: 0,
743 };
744 let state = AppState::new(
745 connection_config.clone(),
746 service_config.clone(),
747 metrics.clone(),
748 cancellation_token.clone(),
749 version,
750 );
751 ServerBuilder::new(state)
752 .context_data(db)
753 .context_data(loader)
754 .context_data(pg_conn_pool)
755 .context_data(service_config)
756 .context_data(query_id())
757 .context_data(ip_address())
758 .context_data(watermark)
759 .context_data(ChainIdentifier::from(get_mainnet_chain_identifier()))
760 .context_data(metrics)
761 }
762
763 fn metrics() -> Metrics {
764 let binding_address: SocketAddr = format!("127.0.0.1:{}", get_available_port())
765 .parse()
766 .unwrap();
767 let registry = mysten_metrics::start_prometheus_server(binding_address).default_registry();
768 Metrics::new(®istry)
769 }
770
771 fn ip_address() -> SocketAddr {
772 let binding_address: SocketAddr = "127.0.0.1:51515".parse().unwrap();
773 binding_address
774 }
775
776 fn query_id() -> Uuid {
777 Uuid::new_v4()
778 }
779
780 #[tokio::test]
781 async fn test_timeout() {
782 telemetry_subscribers::init_for_testing();
783 let cluster = start_cluster(ServiceConfig::test_defaults()).await;
784 cluster
785 .wait_for_checkpoint_catchup(1, Duration::from_secs(30))
786 .await;
787 let wallet = &cluster.network.validator_fullnode_handle.wallet;
790 let db_url = cluster.network.graphql_connection_config.db_url.clone();
791
792 struct TimedExecuteExt {
793 pub min_req_delay: Duration,
794 }
795
796 impl ExtensionFactory for TimedExecuteExt {
797 fn create(&self) -> Arc<dyn Extension> {
798 Arc::new(TimedExecuteExt {
799 min_req_delay: self.min_req_delay,
800 })
801 }
802 }
803
804 #[async_trait::async_trait]
805 impl Extension for TimedExecuteExt {
806 async fn execute(
807 &self,
808 ctx: &ExtensionContext<'_>,
809 operation_name: Option<&str>,
810 next: NextExecute<'_>,
811 ) -> Response {
812 tokio::time::sleep(self.min_req_delay).await;
813 next.run(ctx, operation_name).await
814 }
815 }
816
817 async fn test_timeout(
818 delay: Duration,
819 timeout: Duration,
820 query: &str,
821 sui_client: &SuiClient,
822 db_url: String,
823 ) -> Response {
824 let mut cfg = ServiceConfig::default();
825 cfg.limits.request_timeout_ms = timeout.as_millis() as u32;
826 cfg.limits.mutation_timeout_ms = timeout.as_millis() as u32;
827
828 let schema = prep_schema(db_url, Some(cfg))
829 .await
830 .context_data(Some(sui_client.clone()))
831 .extension(Timeout)
832 .extension(TimedExecuteExt {
833 min_req_delay: delay,
834 })
835 .build_schema();
836
837 schema.execute(query).await
838 }
839
840 let query = r#"{ checkpoint(id: {sequenceNumber: 0 }) { digest }}"#;
841 let timeout = Duration::from_millis(1000);
842 let delay = Duration::from_millis(100);
843 let sui_client = wallet.get_client().await.unwrap();
844
845 test_timeout(delay, timeout, query, &sui_client, db_url.clone())
846 .await
847 .into_result()
848 .expect("Should complete successfully");
849
850 let errs: Vec<_> = test_timeout(delay, delay, query, &sui_client, db_url.clone())
852 .await
853 .into_result()
854 .unwrap_err()
855 .into_iter()
856 .map(|e| e.message)
857 .collect();
858 let exp = format!("Query request timed out. Limit: {}s", delay.as_secs_f32());
859 assert_eq!(errs, vec![exp]);
860
861 let addresses = wallet.get_addresses();
865 let gas = wallet
866 .get_one_gas_object_owned_by_address(addresses[0])
867 .await
868 .unwrap();
869 let tx_data = TransactionData::new_transfer_sui(
870 addresses[1],
871 addresses[0],
872 Some(1000),
873 gas.unwrap(),
874 1_000_000,
875 wallet.get_reference_gas_price().await.unwrap(),
876 );
877
878 let tx = wallet.sign_transaction(&tx_data).await;
879 let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
880
881 let signature_base64 = &signatures[0];
882 let query = format!(
883 r#"
884 mutation {{
885 executeTransactionBlock(txBytes: "{}", signatures: "{}") {{
886 effects {{
887 status
888 }}
889 }}
890 }}"#,
891 tx_bytes.encoded(),
892 signature_base64.encoded()
893 );
894 let errs: Vec<_> = test_timeout(delay, delay, &query, &sui_client, db_url.clone())
895 .await
896 .into_result()
897 .unwrap_err()
898 .into_iter()
899 .map(|e| e.message)
900 .collect();
901 let exp = format!(
902 "Mutation request timed out. Limit: {}s",
903 delay.as_secs_f32()
904 );
905 assert_eq!(errs, vec![exp]);
906 }
907
908 #[tokio::test]
909 async fn test_query_depth_limit() {
910 let cluster = prep_executor_cluster().await;
911 let db_url = cluster.graphql_connection_config.db_url.clone();
912
913 async fn exec_query_depth_limit(db_url: String, depth: u32, query: &str) -> Response {
914 let service_config = ServiceConfig {
915 limits: Limits {
916 max_query_depth: depth,
917 ..Default::default()
918 },
919 ..Default::default()
920 };
921
922 let schema = prep_schema(db_url, Some(service_config))
923 .await
924 .context_data(PayloadSize(100))
925 .extension(QueryLimitsChecker)
926 .build_schema();
927 schema.execute(query).await
928 }
929
930 exec_query_depth_limit(db_url.clone(), 1, "{ chainIdentifier }")
931 .await
932 .into_result()
933 .expect("Should complete successfully");
934
935 exec_query_depth_limit(
936 db_url.clone(),
937 5,
938 "{ chainIdentifier protocolConfig { configs { value key }} }",
939 )
940 .await
941 .into_result()
942 .expect("Should complete successfully");
943
944 let errs: Vec<_> = exec_query_depth_limit(db_url.clone(), 0, "{ chainIdentifier }")
946 .await
947 .into_result()
948 .unwrap_err()
949 .into_iter()
950 .map(|e| e.message)
951 .collect();
952
953 assert_eq!(errs, vec!["Query nesting is over 0".to_string()]);
954 let errs: Vec<_> = exec_query_depth_limit(
955 db_url.clone(),
956 2,
957 "{ chainIdentifier protocolConfig { configs { value key }} }",
958 )
959 .await
960 .into_result()
961 .unwrap_err()
962 .into_iter()
963 .map(|e| e.message)
964 .collect();
965 assert_eq!(errs, vec!["Query nesting is over 2".to_string()]);
966 }
967
968 #[tokio::test]
969 async fn test_query_node_limit() {
970 let cluster = prep_executor_cluster().await;
971 let db_url = cluster.graphql_connection_config.db_url.clone();
972 async fn exec_query_node_limit(db_url: String, nodes: u32, query: &str) -> Response {
973 let service_config = ServiceConfig {
974 limits: Limits {
975 max_query_nodes: nodes,
976 ..Default::default()
977 },
978 ..Default::default()
979 };
980
981 let schema = prep_schema(db_url, Some(service_config))
982 .await
983 .context_data(PayloadSize(100))
984 .extension(QueryLimitsChecker)
985 .build_schema();
986 schema.execute(query).await
987 }
988
989 exec_query_node_limit(db_url.clone(), 1, "{ chainIdentifier }")
990 .await
991 .into_result()
992 .expect("Should complete successfully");
993
994 exec_query_node_limit(
995 db_url.clone(),
996 5,
997 "{ chainIdentifier protocolConfig { configs { value key }} }",
998 )
999 .await
1000 .into_result()
1001 .expect("Should complete successfully");
1002
1003 let err: Vec<_> = exec_query_node_limit(db_url.clone(), 0, "{ chainIdentifier }")
1005 .await
1006 .into_result()
1007 .unwrap_err()
1008 .into_iter()
1009 .map(|e| e.message)
1010 .collect();
1011 assert_eq!(err, vec!["Query has over 0 nodes".to_string()]);
1012
1013 let err: Vec<_> = exec_query_node_limit(
1014 db_url.clone(),
1015 4,
1016 "{ chainIdentifier protocolConfig { configs { value key }} }",
1017 )
1018 .await
1019 .into_result()
1020 .unwrap_err()
1021 .into_iter()
1022 .map(|e| e.message)
1023 .collect();
1024 assert_eq!(err, vec!["Query has over 4 nodes".to_string()]);
1025 }
1026
1027 #[tokio::test]
1028 async fn test_query_default_page_limit() {
1029 let cluster = prep_executor_cluster().await;
1030 let db_url = cluster.graphql_connection_config.db_url.clone();
1031
1032 let service_config = ServiceConfig {
1033 limits: Limits {
1034 default_page_size: 1,
1035 ..Default::default()
1036 },
1037 ..Default::default()
1038 };
1039 let schema = prep_schema(db_url, Some(service_config))
1040 .await
1041 .build_schema();
1042
1043 let resp = schema
1044 .execute("{ checkpoints { nodes { sequenceNumber } } }")
1045 .await;
1046 let data = resp.data.clone().into_json().unwrap();
1047 let checkpoints = data
1048 .get("checkpoints")
1049 .unwrap()
1050 .get("nodes")
1051 .unwrap()
1052 .as_array()
1053 .unwrap();
1054 assert_eq!(
1055 checkpoints.len(),
1056 1,
1057 "Checkpoints should have exactly one element"
1058 );
1059
1060 let resp = schema
1061 .execute("{ checkpoints(first: 2) { nodes { sequenceNumber } } }")
1062 .await;
1063 let data = resp.data.clone().into_json().unwrap();
1064 let checkpoints = data
1065 .get("checkpoints")
1066 .unwrap()
1067 .get("nodes")
1068 .unwrap()
1069 .as_array()
1070 .unwrap();
1071 assert_eq!(
1072 checkpoints.len(),
1073 2,
1074 "Checkpoints should return two elements"
1075 );
1076 }
1077
1078 #[tokio::test]
1079 async fn test_query_max_page_limit() {
1080 telemetry_subscribers::init_for_testing();
1081 let cluster = prep_executor_cluster().await;
1082 let db_url = cluster.graphql_connection_config.db_url.clone();
1083 let schema = prep_schema(db_url, None).await.build_schema();
1084
1085 schema
1086 .execute("{ objects(first: 1) { nodes { version } } }")
1087 .await
1088 .into_result()
1089 .expect("Should complete successfully");
1090
1091 let err: Vec<_> = schema
1093 .execute("{ objects(first: 51) { nodes { version } } }")
1094 .await
1095 .into_result()
1096 .unwrap_err()
1097 .into_iter()
1098 .map(|e| e.message)
1099 .collect();
1100 assert_eq!(
1101 err,
1102 vec!["Connection's page size of 51 exceeds max of 50".to_string()]
1103 );
1104 }
1105
1106 #[tokio::test]
1107 async fn test_query_complexity_metrics() {
1108 telemetry_subscribers::init_for_testing();
1109 let cluster = prep_executor_cluster().await;
1110 let db_url = cluster.graphql_connection_config.db_url.clone();
1111 let server_builder = prep_schema(db_url, None)
1112 .await
1113 .context_data(PayloadSize(100));
1114 let metrics = server_builder.state.metrics.clone();
1115 let schema = server_builder
1116 .extension(QueryLimitsChecker) .build_schema();
1118
1119 schema
1120 .execute("{ chainIdentifier }")
1121 .await
1122 .into_result()
1123 .expect("Should complete successfully");
1124
1125 let req_metrics = metrics.request_metrics;
1126 assert_eq!(req_metrics.input_nodes.get_sample_count(), 1);
1127 assert_eq!(req_metrics.output_nodes.get_sample_count(), 1);
1128 assert_eq!(req_metrics.query_depth.get_sample_count(), 1);
1129 assert_eq!(req_metrics.input_nodes.get_sample_sum(), 1.);
1130 assert_eq!(req_metrics.output_nodes.get_sample_sum(), 1.);
1131 assert_eq!(req_metrics.query_depth.get_sample_sum(), 1.);
1132
1133 schema
1134 .execute("{ chainIdentifier protocolConfig { configs { value key }} }")
1135 .await
1136 .into_result()
1137 .expect("Should complete successfully");
1138
1139 assert_eq!(req_metrics.input_nodes.get_sample_count(), 2);
1140 assert_eq!(req_metrics.output_nodes.get_sample_count(), 2);
1141 assert_eq!(req_metrics.query_depth.get_sample_count(), 2);
1142 assert_eq!(req_metrics.input_nodes.get_sample_sum(), 2. + 4.);
1143 assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
1144 assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
1145 }
1146
1147 #[tokio::test]
1148 pub async fn test_health_check() {
1149 let cluster = prep_executor_cluster().await;
1150
1151 cluster
1152 .wait_for_checkpoint_catchup(6, Duration::from_secs(60))
1153 .await;
1154
1155 let url = format!(
1156 "http://{}:{}/health",
1157 cluster.graphql_connection_config.host, cluster.graphql_connection_config.port
1158 );
1159
1160 let resp = reqwest::get(&url).await.unwrap();
1161 assert_eq!(resp.status(), reqwest::StatusCode::OK);
1162
1163 let url_with_param = format!("{}?max_checkpoint_lag_ms=1", url);
1164 let resp = reqwest::get(&url_with_param).await.unwrap();
1165 assert_eq!(resp.status(), reqwest::StatusCode::GATEWAY_TIMEOUT);
1166 }
1167
1168 async fn execute_for_error(db_url: &str, limits: Limits, request: Request) -> String {
1171 let service_config = ServiceConfig {
1172 limits,
1173 ..Default::default()
1174 };
1175
1176 let schema = prep_schema(db_url.to_owned(), Some(service_config))
1177 .await
1178 .context_data(PayloadSize(
1179 serde_json::to_string(&request).unwrap().len() as u64,
1184 ))
1185 .extension(QueryLimitsChecker)
1186 .build_schema();
1187
1188 let errs: Vec<_> = schema
1189 .execute(request)
1190 .await
1191 .into_result()
1192 .unwrap_err()
1193 .into_iter()
1194 .map(|e| e.message)
1195 .collect();
1196
1197 errs.join("\n")
1198 }
1199
1200 #[tokio::test]
1201 async fn test_payload_read_exceeded() {
1202 let cluster = prep_executor_cluster().await;
1203 let db_url = cluster.graphql_connection_config.db_url.clone();
1204 assert_eq!(
1205 execute_for_error(
1206 &db_url,
1207 Limits {
1208 max_tx_payload_size: 400,
1209 max_query_payload_size: 10,
1210 ..Default::default()
1211 },
1212 r#"
1213 mutation {
1214 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1215 effects {
1216 status
1217 }
1218 }
1219 }
1220 "#
1221 .into()
1222 )
1223 .await,
1224 "Query part too large: 354 bytes. Requests are limited to 400 bytes or fewer on \
1225 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1226 or verifyZkloginSignature) and the rest of the request (the query part) must be 10 \
1227 bytes or fewer."
1228 );
1229 }
1230
1231 #[tokio::test]
1232 async fn test_payload_mutation_exceeded() {
1233 let cluster = prep_executor_cluster().await;
1234 let db_url = cluster.graphql_connection_config.db_url.clone();
1235 assert_eq!(
1236 execute_for_error(
1237 &db_url,
1238 Limits {
1239 max_tx_payload_size: 10,
1240 max_query_payload_size: 400,
1241 ..Default::default()
1242 },
1243 r#"
1244 mutation {
1245 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1246 effects {
1247 status
1248 }
1249 }
1250 }
1251 "#
1252 .into()
1253 )
1254 .await,
1255 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1256 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1257 or verifyZkloginSignature) and the rest of the request (the query part) must be 400 \
1258 bytes or fewer."
1259 );
1260 }
1261
1262 #[tokio::test]
1263 async fn test_payload_dry_run_exceeded() {
1264 let cluster = prep_executor_cluster().await;
1265 let db_url = cluster.graphql_connection_config.db_url.clone();
1266 assert_eq!(
1267 execute_for_error(
1268 &db_url,
1269 Limits {
1270 max_tx_payload_size: 10,
1271 max_query_payload_size: 400,
1272 ..Default::default()
1273 },
1274 r#"
1275 query {
1276 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1277 error
1278 transaction {
1279 digest
1280 }
1281 }
1282 }
1283 "#
1284 .into(),
1285 )
1286 .await,
1287 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1288 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1289 or verifyZkloginSignature) and the rest of the request (the query part) must be 400 bytes \
1290 or fewer."
1291 );
1292 }
1293
1294 #[tokio::test]
1295 async fn test_payload_zklogin_exceeded() {
1296 let cluster = prep_executor_cluster().await;
1297 let db_url = cluster.graphql_connection_config.db_url.clone();
1298 assert_eq!(
1299 execute_for_error(
1300 &db_url,
1301 Limits {
1302 max_tx_payload_size: 10,
1303 max_query_payload_size: 600,
1304 ..Default::default()
1305 },
1306 r#"
1307 query {
1308 verifyZkloginSignature(
1309 bytes: "AAABBBCCC",
1310 signature: "DDD",
1311 intentScope: TRANSACTION_DATA,
1312 author: "0xeee",
1313 ) {
1314 success
1315 errors
1316 }
1317 }
1318 "#
1319 .into(),
1320 )
1321 .await,
1322 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1323 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1324 or verifyZkloginSignature) and the rest of the request (the query part) must be 600 \
1325 bytes or fewer."
1326 );
1327 }
1328
1329 #[tokio::test]
1330 async fn test_payload_total_exceeded_impl() {
1331 let cluster = prep_executor_cluster().await;
1332 let db_url = cluster.graphql_connection_config.db_url.clone();
1333 assert_eq!(
1334 execute_for_error(
1335 &db_url,
1336 Limits {
1337 max_tx_payload_size: 10,
1338 max_query_payload_size: 10,
1339 ..Default::default()
1340 },
1341 r#"
1342 query {
1343 dryRunTransactionBlock(txByte: "AAABBB") {
1344 error
1345 transaction {
1346 digest
1347 }
1348 }
1349 }
1350 "#
1351 .into(),
1352 )
1353 .await,
1354 "Overall request too large: 380 bytes. Requests are limited to 10 bytes or fewer on \
1355 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1356 or verifyZkloginSignature) and the rest of the request (the query part) must be 10 \
1357 bytes or fewer."
1358 );
1359 }
1360
1361 #[tokio::test]
1362 async fn test_payload_using_vars_mutation_exceeded() {
1363 let cluster = prep_executor_cluster().await;
1364 let db_url = cluster.graphql_connection_config.db_url.clone();
1365 assert_eq!(
1366 execute_for_error(
1367 &db_url,
1368 Limits {
1369 max_tx_payload_size: 10,
1370 max_query_payload_size: 500,
1371 ..Default::default()
1372 },
1373 Request::new(
1374 r#"
1375 mutation ($tx: String!, $sigs: [String!]!) {
1376 executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1377 effects {
1378 status
1379 }
1380 }
1381 }
1382 "#
1383 )
1384 .variables(Variables::from_json(json!({
1385 "tx": "AAABBBCCC",
1386 "sigs": ["BBB"]
1387 })))
1388 )
1389 .await,
1390 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1391 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1392 or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
1393 bytes or fewer."
1394 );
1395 }
1396
1397 #[tokio::test]
1398 async fn test_payload_using_vars_read_exceeded() {
1399 let cluster = prep_executor_cluster().await;
1400 let db_url = cluster.graphql_connection_config.db_url.clone();
1401 assert_eq!(
1402 execute_for_error(
1403 &db_url,
1404 Limits {
1405 max_tx_payload_size: 500,
1406 max_query_payload_size: 10,
1407 ..Default::default()
1408 },
1409 Request::new(
1410 r#"
1411 mutation ($tx: String!, $sigs: [String!]!) {
1412 executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1413 effects {
1414 status
1415 }
1416 }
1417 }
1418 "#
1419 )
1420 .variables(Variables::from_json(json!({
1421 "tx": "AAA",
1422 "sigs": ["BBB"]
1423 })))
1424 )
1425 .await,
1426 "Query part too large: 409 bytes. Requests are limited to 500 bytes or fewer on \
1427 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1428 or verifyZkloginSignature) and the rest of the request (the query part) must be 10 \
1429 bytes or fewer."
1430 );
1431 }
1432
1433 #[tokio::test]
1434 async fn test_payload_using_vars_dry_run_exceeded() {
1435 let cluster = prep_executor_cluster().await;
1436 let db_url = cluster.graphql_connection_config.db_url.clone();
1437 assert_eq!(
1438 execute_for_error(
1439 &db_url,
1440 Limits {
1441 max_tx_payload_size: 10,
1442 max_query_payload_size: 400,
1443 ..Default::default()
1444 },
1445 Request::new(
1446 r#"
1447 query ($tx: String!) {
1448 dryRunTransactionBlock(txBytes: $tx) {
1449 error
1450 transaction {
1451 digest
1452 }
1453 }
1454 }
1455 "#
1456 )
1457 .variables(Variables::from_json(json!({
1458 "tx": "AAABBBCCC"
1459 }))),
1460 )
1461 .await,
1462 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1463 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1464 or verifyZkloginSignature) and the rest of the request (the query part) must be 400 \
1465 bytes or fewer."
1466 );
1467 }
1468
1469 #[tokio::test]
1470 async fn test_payload_using_vars_dry_run_read_exceeded() {
1471 let cluster = prep_executor_cluster().await;
1472 let db_url = cluster.graphql_connection_config.db_url.clone();
1473 assert_eq!(
1474 execute_for_error(
1475 &db_url,
1476 Limits {
1477 max_tx_payload_size: 400,
1478 max_query_payload_size: 10,
1479 ..Default::default()
1480 },
1481 Request::new(
1482 r#"
1483 query ($tx: String!) {
1484 dryRunTransactionBlock(txBytes: $tx) {
1485 error
1486 transaction {
1487 digest
1488 }
1489 }
1490 }
1491 "#
1492 )
1493 .variables(Variables::from_json(json!({
1494 "tx": "AAABBBCCC"
1495 }))),
1496 )
1497 .await,
1498 "Query part too large: 398 bytes. Requests are limited to 400 bytes or fewer on \
1499 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1500 or verifyZkloginSignature) and the rest of the request (the query part) must be 10 \
1501 bytes or fewer."
1502 );
1503 }
1504
1505 #[tokio::test]
1506 async fn test_payload_multiple_execution_exceeded() {
1507 let cluster = prep_executor_cluster().await;
1508 let db_url = cluster.graphql_connection_config.db_url.clone();
1509 let err = execute_for_error(
1512 &db_url,
1513 Limits {
1514 max_tx_payload_size: 30,
1515 max_query_payload_size: 320,
1516 ..Default::default()
1517 },
1518 r#"
1519 mutation {
1520 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1521 effects {
1522 status
1523 }
1524 }
1525 }
1526 "#
1527 .into(),
1528 )
1529 .await;
1530 assert!(err.starts_with("Query part too large"), "{err}");
1531
1532 assert_eq!(
1533 execute_for_error(
1534 &db_url,
1535 Limits {
1536 max_tx_payload_size: 30,
1537 max_query_payload_size: 800,
1538 ..Default::default()
1539 },
1540 r#"
1541 mutation {
1542 e0: executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1543 effects {
1544 status
1545 }
1546 }
1547 e1: executeTransactionBlock(txBytes: "EEEFFFGGG", signatures: ["HHH"]) {
1548 effects {
1549 status
1550 }
1551 }
1552 }
1553 "#
1554 .into()
1555 )
1556 .await,
1557 "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1558 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1559 or verifyZkloginSignature) and the rest of the request (the query part) must be 800 \
1560 bytes or fewer."
1561 );
1562 }
1563
1564 #[tokio::test]
1565 async fn test_payload_multiple_dry_run_exceeded() {
1566 let cluster = prep_executor_cluster().await;
1567 let db_url = cluster.graphql_connection_config.db_url.clone();
1568 let err = execute_for_error(
1571 &db_url,
1572 Limits {
1573 max_tx_payload_size: 20,
1574 max_query_payload_size: 330,
1575 ..Default::default()
1576 },
1577 r#"
1578 query {
1579 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1580 error
1581 transaction {
1582 digest
1583 }
1584 }
1585 }
1586 "#
1587 .into(),
1588 )
1589 .await;
1590 assert!(err.starts_with("Query part too large"), "{err}");
1591
1592 assert_eq!(
1593 execute_for_error(
1594 &db_url,
1595 Limits {
1596 max_tx_payload_size: 20,
1597 max_query_payload_size: 800,
1598 ..Default::default()
1599 },
1600 r#"
1601 query {
1602 d0: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1603 error
1604 transaction {
1605 digest
1606 }
1607 }
1608 d1: dryRunTransactionBlock(txBytes: "DDDEEEFFF") {
1609 error
1610 transaction {
1611 digest
1612 }
1613 }
1614 }
1615 "#
1616 .into()
1617 )
1618 .await,
1619 "Transaction payload too large. Requests are limited to 20 bytes or fewer on \
1620 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1621 or verifyZkloginSignature) and the rest of the request (the query part) must be 800 \
1622 bytes or fewer."
1623 );
1624 }
1625
1626 #[tokio::test]
1627 async fn test_payload_execution_multiple_sigs_exceeded() {
1628 let cluster = prep_executor_cluster().await;
1629 let db_url = cluster.graphql_connection_config.db_url.clone();
1630 let err = execute_for_error(
1633 &db_url,
1634 Limits {
1635 max_tx_payload_size: 30,
1636 max_query_payload_size: 320,
1637 ..Default::default()
1638 },
1639 r#"
1640 mutation {
1641 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1642 effects {
1643 status
1644 }
1645 }
1646 }
1647 "#
1648 .into(),
1649 )
1650 .await;
1651
1652 assert!(err.starts_with("Query part too large"), "{err}");
1653
1654 assert_eq!(
1655 execute_for_error(
1656 &db_url,
1657 Limits {
1658 max_tx_payload_size: 30,
1659 max_query_payload_size: 500,
1660 ..Default::default()
1661 },
1662 r#"
1663 mutation {
1664 executeTransactionBlock(
1665 txBytes: "AAA",
1666 signatures: ["BBB", "CCC", "DDD", "EEE", "FFF"]
1667 ) {
1668 effects {
1669 status
1670 }
1671 }
1672 }
1673 "#
1674 .into(),
1675 )
1676 .await,
1677 "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1678 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1679 or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
1680 bytes or fewer.",
1681 )
1682 }
1683
1684 #[tokio::test]
1685 async fn test_payload_sig_var_execution_exceeded() {
1686 let cluster = prep_executor_cluster().await;
1687 let db_url = cluster.graphql_connection_config.db_url.clone();
1688 assert_eq!(
1691 execute_for_error(
1692 &db_url,
1693 Limits {
1694 max_tx_payload_size: 10,
1695 max_query_payload_size: 500,
1696 ..Default::default()
1697 },
1698 Request::new(
1699 r#"
1700 mutation ($tx: String!, $sig: String!) {
1701 executeTransactionBlock(txBytes: $tx, signatures: [$sig]) {
1702 effects {
1703 status
1704 }
1705 }
1706 }
1707 "#
1708 )
1709 .variables(Variables::from_json(json!({
1710 "tx": "AAA",
1711 "sig": "BBB"
1712 })))
1713 )
1714 .await,
1715 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1716 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1717 or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
1718 bytes or fewer."
1719 );
1720 }
1721
1722 fn passed_tx_checks(err: &str) -> bool {
1725 !err.starts_with("Overall request too large")
1726 && !err.starts_with("Transaction payload too large")
1727 }
1728
1729 #[tokio::test]
1730 async fn test_payload_reusing_vars_execution() {
1731 let cluster = prep_executor_cluster().await;
1732 let db_url = cluster.graphql_connection_config.db_url.clone();
1733 assert!(!passed_tx_checks(
1739 &execute_for_error(
1740 &db_url,
1741 Limits {
1742 max_tx_payload_size: 1,
1743 max_query_payload_size: 1,
1744 ..Default::default()
1745 },
1746 r#"
1747 mutation {
1748 executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1749 effects {
1750 status
1751 }
1752 }
1753 }
1754 "#
1755 .into()
1756 )
1757 .await
1758 ));
1759
1760 let limits = Limits {
1761 max_tx_payload_size: 20,
1762 max_query_payload_size: 1000,
1763 ..Default::default()
1764 };
1765
1766 assert!(passed_tx_checks(
1769 &execute_for_error(
1770 &db_url,
1771 limits.clone(),
1772 Request::new(
1773 r#"
1774 mutation ($sig: String!) {
1775 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig]) {
1776 effects {
1777 status
1778 }
1779 }
1780 }
1781 "#,
1782 )
1783 .variables(Variables::from_json(json!({
1784 "sig": "BBB"
1785 })))
1786 )
1787 .await
1788 ));
1789
1790 assert!(!passed_tx_checks(
1793 &execute_for_error(
1794 &db_url,
1795 limits.clone(),
1796 Request::new(
1797 r#"
1798 mutation ($sig: String!) {
1799 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, "BBB"]) {
1800 effects {
1801 status
1802 }
1803 }
1804 }
1805 "#,
1806 )
1807 .variables(Variables::from_json(json!({
1808 "sig": "BBB"
1809 })))
1810 )
1811 .await
1812 ));
1813
1814 assert!(passed_tx_checks(
1817 &execute_for_error(
1818 &db_url,
1819 limits,
1820 Request::new(
1821 r#"
1822 mutation ($sig: String!) {
1823 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, $sig]) {
1824 effects {
1825 status
1826 }
1827 }
1828 }
1829 "#,
1830 )
1831 .variables(Variables::from_json(json!({
1832 "sig": "BBB"
1833 })))
1834 )
1835 .await
1836 ));
1837 }
1838
1839 #[tokio::test]
1840 async fn test_payload_reusing_vars_dry_run() {
1841 let cluster = prep_executor_cluster().await;
1842 let db_url = cluster.graphql_connection_config.db_url.clone();
1843 let limits = Limits {
1846 max_tx_payload_size: 20,
1847 max_query_payload_size: 1000,
1848 ..Default::default()
1849 };
1850
1851 assert!(passed_tx_checks(
1853 &execute_for_error(
1854 &db_url,
1855 limits.clone(),
1856 Request::new(
1857 r#"
1858 query ($tx: String!) {
1859 dryRunTransactionBlock(txBytes: $tx) {
1860 error
1861 transaction {
1862 digest
1863 }
1864 }
1865 }
1866 "#,
1867 )
1868 .variables(Variables::from_json(json!({
1869 "tx": "AAABBBCCC"
1870 })))
1871 )
1872 .await
1873 ));
1874
1875 assert!(!passed_tx_checks(
1877 &execute_for_error(
1878 &db_url,
1879 limits.clone(),
1880 Request::new(
1881 r#"
1882 query ($tx: String!) {
1883 d0: dryRunTransactionBlock(txBytes: $tx) {
1884 error
1885 transaction {
1886 digest
1887 }
1888 }
1889
1890 d1: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1891 error
1892 transaction {
1893 digest
1894 }
1895 }
1896 }
1897 "#,
1898 )
1899 .variables(Variables::from_json(json!({
1900 "tx": "AAABBBCCC"
1901 })))
1902 )
1903 .await
1904 ));
1905
1906 assert!(passed_tx_checks(
1908 &execute_for_error(
1909 &db_url,
1910 limits,
1911 Request::new(
1912 r#"
1913 query ($tx: String!) {
1914 d0: dryRunTransactionBlock(txBytes: $tx) {
1915 error
1916 transaction {
1917 digest
1918 }
1919 }
1920
1921 d1: dryRunTransactionBlock(txBytes: $tx) {
1922 error
1923 transaction {
1924 digest
1925 }
1926 }
1927 }
1928 "#,
1929 )
1930 .variables(Variables::from_json(json!({
1931 "tx": "AAABBBCCC"
1932 })))
1933 )
1934 .await
1935 ));
1936 }
1937
1938 #[tokio::test]
1939 async fn test_payload_named_fragment_execution_exceeded() {
1940 let cluster = prep_executor_cluster().await;
1941 let db_url = cluster.graphql_connection_config.db_url.clone();
1942 assert_eq!(
1943 execute_for_error(
1944 &db_url,
1945 Limits {
1946 max_tx_payload_size: 10,
1947 max_query_payload_size: 500,
1948 ..Default::default()
1949 },
1950 r#"
1951 mutation {
1952 ...Tx
1953 }
1954
1955 fragment Tx on Mutation {
1956 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1957 effects {
1958 status
1959 }
1960 }
1961 }
1962 "#
1963 .into()
1964 )
1965 .await,
1966 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1967 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
1968 or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
1969 bytes or fewer."
1970 );
1971 }
1972
1973 #[tokio::test]
1974 async fn test_payload_inline_fragment_execution_exceeded() {
1975 let cluster = prep_executor_cluster().await;
1976 let db_url = cluster.graphql_connection_config.db_url.clone();
1977 assert_eq!(
1978 execute_for_error(
1979 &db_url,
1980 Limits {
1981 max_tx_payload_size: 10,
1982 max_query_payload_size: 500,
1983 ..Default::default()
1984 },
1985 r#"
1986 mutation {
1987 ... on Mutation {
1988 executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1989 effects {
1990 status
1991 }
1992 }
1993 }
1994 }
1995 "#
1996 .into()
1997 )
1998 .await,
1999 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2000 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
2001 or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
2002 bytes or fewer."
2003 );
2004 }
2005
2006 #[tokio::test]
2007 async fn test_payload_named_fragment_dry_run_exceeded() {
2008 let cluster = prep_executor_cluster().await;
2009 let db_url = cluster.graphql_connection_config.db_url.clone();
2010 assert_eq!(
2011 execute_for_error(
2012 &db_url,
2013 Limits {
2014 max_tx_payload_size: 10,
2015 max_query_payload_size: 500,
2016 ..Default::default()
2017 },
2018 r#"
2019 query {
2020 ...DryRun
2021 }
2022
2023 fragment DryRun on Query {
2024 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2025 error
2026 transaction {
2027 digest
2028 }
2029 }
2030 }
2031 "#
2032 .into(),
2033 )
2034 .await,
2035 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2036 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
2037 or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
2038 bytes or fewer."
2039 );
2040 }
2041
2042 #[tokio::test]
2043 async fn test_payload_inline_fragment_dry_run_exceeded() {
2044 let cluster = prep_executor_cluster().await;
2045 let db_url = cluster.graphql_connection_config.db_url.clone();
2046 assert_eq!(
2047 execute_for_error(
2048 &db_url,
2049 Limits {
2050 max_tx_payload_size: 10,
2051 max_query_payload_size: 500,
2052 ..Default::default()
2053 },
2054 r#"
2055 query {
2056 ... on Query {
2057 dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2058 error
2059 transaction {
2060 digest
2061 }
2062 }
2063 }
2064 }
2065 "#
2066 .into(),
2067 )
2068 .await,
2069 "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2070 transaction payloads (all inputs to executeTransactionBlock, dryRunTransactionBlock, \
2071 or verifyZkloginSignature) and the rest of the request (the query part) must be 500 \
2072 bytes or fewer."
2073 );
2074 }
2075
2076 #[tokio::test]
2077 async fn test_multi_get_objects_query_limits_exceeded() {
2078 let cluster = prep_executor_cluster().await;
2079 let db_url = cluster.graphql_connection_config.db_url.clone();
2080 assert_eq!(
2081 execute_for_error(
2082 &db_url,
2083 Limits {
2084 max_output_nodes: 5,
2085 ..Default::default()
2086 }, r#"
2089 query {
2090 multiGetObjects(
2091 keys: [
2092 {objectId: "0x01dcb4674affb04e68d8088895e951f4ea335ef1695e9e50c166618f6789d808", version: 2},
2093 {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2094 ]
2095 ) {
2096 address
2097 status
2098 version
2099 }
2100 }
2101 "#
2102 .into(),
2103 )
2104 .await,
2105 "Estimated output nodes exceeds 5"
2106 );
2107 assert_eq!(
2108 execute_for_error(
2109 &db_url,
2110 Limits {
2111 max_output_nodes: 4,
2112 ..Default::default()
2113 }, r#"
2115 query {
2116 multiGetObjects(
2117 keys: [
2118 {objectId: "0x01dcb4674affb04e68d8088895e951f4ea335ef1695e9e50c166618f6789d808", version: 2},
2119 {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2120 {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2121 {objectId: "0x33032e0706337632361f2607b79df8c9d1079e8069259b27b1fa5c0394e79893", version: 2},
2122 {objectId: "0x388295e3ecad53986ebf9a7a1e5854b7df94c3f1f0bba934c5396a2a9eb4550b", version: 2},
2123 ]
2124 ) {
2125 address
2126 }
2127 }
2128 "#
2129 .into(),
2130 )
2131 .await,
2132 "Estimated output nodes exceeds 4"
2133 );
2134 }
2135
2136 #[tokio::test]
2137 async fn test_multi_get_objects_query_limits_pass() {
2138 let cluster = prep_executor_cluster().await;
2139 let db_url = cluster.graphql_connection_config.db_url.clone();
2140 let service_config = ServiceConfig {
2141 limits: Limits {
2142 max_output_nodes: 5,
2143 ..Default::default()
2144 },
2145 ..Default::default()
2146 };
2147
2148 let schema = prep_schema(db_url, Some(service_config))
2149 .await
2150 .build_schema();
2151
2152 let resp = schema
2153 .execute(
2154 r#"
2156 query {
2157 multiGetObjects(
2158 keys: [
2159 {objectId: "0x01dcb4674affb04e68d8088895e951f4ea335ef1695e9e50c166618f6789d808", version: 2},
2160 {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2161 {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2162 {objectId: "0x33032e0706337632361f2607b79df8c9d1079e8069259b27b1fa5c0394e79893", version: 2},
2163 {objectId: "0x388295e3ecad53986ebf9a7a1e5854b7df94c3f1f0bba934c5396a2a9eb4550b", version: 2},
2164 ]
2165 ) {
2166 address
2167 }
2168 }
2169 "#)
2170 .await;
2171 assert!(resp.is_ok());
2172 assert!(resp.errors.is_empty());
2173
2174 let resp = schema
2175 .execute(
2176 r#"
2178 query {
2179 multiGetObjects(
2180 keys: [
2181 {objectId: "0x01dcb4674affb04e68d8088895e951f4ea335ef1695e9e50c166618f6789d808", version: 2},
2182 {objectId: "0x23e340e97fb41249278c85b1f067dc88576f750670c6dc56572e90971f857c8c", version: 2},
2183 ]
2184 ) {
2185 address
2186 status
2187 }
2188 }
2189 "#)
2190 .await;
2191 assert!(resp.is_ok());
2192 assert!(resp.errors.is_empty());
2193 }
2194}