1use std::collections::HashMap;
5use std::fs;
6use std::net::IpAddr;
7use std::net::Ipv4Addr;
8use std::net::SocketAddr;
9use std::path::Path;
10use std::time::Duration;
11
12use anyhow::Context;
13use anyhow::ensure;
14use diesel::ExpressionMethods;
15use diesel::OptionalExtension;
16use diesel::QueryDsl;
17use diesel_async::RunQueryDsl;
18use prost::Message;
19use reqwest::Client;
20use serde_json::Value;
21use serde_json::json;
22use simulacrum::AdvanceEpochConfig;
23use simulacrum::Simulacrum;
24use sui_futures::service::Service;
25use sui_indexer_alt::BootstrapGenesis;
26use sui_indexer_alt::config::IndexerConfig;
27use sui_indexer_alt::setup_indexer;
28use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::AvailableRangeRequest;
29use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::consistent_service_client::ConsistentServiceClient;
30use sui_indexer_alt_consistent_store::args::RpcArgs as ConsistentArgs;
31use sui_indexer_alt_consistent_store::args::TlsArgs as ConsistentTlsArgs;
32use sui_indexer_alt_consistent_store::config::ServiceConfig as ConsistentConfig;
33use sui_indexer_alt_consistent_store::start_service as start_consistent_store;
34use sui_indexer_alt_framework::IndexerArgs;
35use sui_indexer_alt_framework::ingestion::ClientArgs;
36use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientArgs;
37use sui_indexer_alt_framework::pipeline::CommitterConfig;
38use sui_indexer_alt_framework::postgres::schema::watermarks;
39use sui_indexer_alt_graphql::RpcArgs as GraphQlArgs;
40use sui_indexer_alt_graphql::args::SubscriptionArgs;
41use sui_indexer_alt_graphql::config::RpcConfig as GraphQlConfig;
42use sui_indexer_alt_graphql::start_rpc as start_graphql;
43use sui_indexer_alt_jsonrpc::NodeArgs as JsonRpcNodeArgs;
44use sui_indexer_alt_jsonrpc::RpcArgs as JsonRpcArgs;
45use sui_indexer_alt_jsonrpc::config::RpcConfig as JsonRpcConfig;
46use sui_indexer_alt_jsonrpc::start_rpc as start_jsonrpc;
47use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
48use sui_indexer_alt_reader::fullnode_client::FullnodeArgs;
49use sui_indexer_alt_reader::kv_loader::KvArgs;
50use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
51use sui_kv_rpc::KvRpcConfig;
52use sui_kv_rpc::KvRpcServer;
53use sui_kvstore::ALL_PIPELINE_NAMES;
54use sui_kvstore::ALPHA_PIPELINE_NAMES;
55use sui_kvstore::BigTableClient;
56use sui_kvstore::BigTableIndexer;
57use sui_kvstore::BigTableStore;
58use sui_kvstore::IndexerConfig as BtIndexerConfig;
59use sui_kvstore::IngestionConfig as BtIngestionConfig;
60use sui_kvstore::KeyValueStoreReader;
61use sui_kvstore::PipelineLayer;
62use sui_kvstore::testing::BigTableEmulator;
63use sui_kvstore::testing::INSTANCE_ID;
64use sui_kvstore::testing::create_tables;
65use sui_pg_db::Db;
66use sui_pg_db::DbArgs;
67use sui_pg_db::temp::TempDb;
68use sui_pg_db::temp::get_available_port;
69use sui_protocol_config::Chain;
70use sui_rpc::field::FieldMask;
71use sui_rpc::field::FieldMaskUtil;
72use sui_rpc::merge::Merge;
73use sui_rpc::proto::sui::rpc;
74use sui_types::base_types::ObjectRef;
75use sui_types::base_types::SuiAddress;
76use sui_types::crypto::AccountKeyPair;
77use sui_types::effects::TransactionEffects;
78use sui_types::error::ExecutionError;
79use sui_types::full_checkpoint_content::Checkpoint;
80use sui_types::messages_checkpoint::VerifiedCheckpoint;
81use sui_types::transaction::Transaction;
82use tempfile::TempDir;
83use tokio::time::error::Elapsed;
84use tokio::time::interval;
85use tokio::try_join;
86use url::Url;
87
88pub mod coin_registry;
89pub mod find;
90pub mod graphql;
91pub mod move_helpers;
92pub mod transaction;
93
94pub struct FullCluster {
97 executor: Simulacrum,
99
100 offchain: OffchainCluster,
103
104 #[allow(unused)]
106 temp_dir: TempDir,
107}
108
109pub struct OffchainCluster {
117 consistent_listen_address: SocketAddr,
119
120 jsonrpc_listen_address: SocketAddr,
122
123 graphql_listen_address: SocketAddr,
125
126 kv_rpc_listen_address: SocketAddr,
128
129 bigtable_client: BigTableClient,
131
132 db: Db,
134
135 pipelines: Vec<&'static str>,
137
138 #[allow(unused)]
141 services: Service,
142
143 #[allow(unused)]
145 bigtable_emulator: BigTableEmulator,
146
147 #[allow(unused)]
149 database: TempDb,
150
151 #[allow(unused)]
154 dir: TempDir,
155}
156
157pub struct OffchainClusterConfig {
158 pub indexer_args: IndexerArgs,
159 pub consistent_indexer_args: IndexerArgs,
160 pub fullnode_args: FullnodeArgs,
161 pub indexer_config: IndexerConfig,
162 pub consistent_config: ConsistentConfig,
163 pub jsonrpc_config: JsonRpcConfig,
164 pub jsonrpc_node_args: JsonRpcNodeArgs,
165 pub graphql_config: GraphQlConfig,
166 pub bootstrap_genesis: Option<BootstrapGenesis>,
167 pub kv_rpc_config: KvRpcConfig,
168}
169
170impl FullCluster {
171 pub async fn new() -> anyhow::Result<Self> {
174 Self::new_with_configs(
175 Simulacrum::new(),
176 OffchainClusterConfig::default(),
177 &prometheus::Registry::new(),
178 )
179 .await
180 }
181
182 pub async fn new_with_configs(
186 mut executor: Simulacrum,
187 offchain_cluster_config: OffchainClusterConfig,
188 registry: &prometheus::Registry,
189 ) -> anyhow::Result<Self> {
190 let (client_args, temp_dir) = local_ingestion_client_args();
191 executor.set_data_ingestion_path(temp_dir.path().to_owned());
192
193 let offchain = OffchainCluster::new(client_args, offchain_cluster_config, registry)
194 .await
195 .context("Failed to create off-chain cluster")?;
196
197 Ok(Self {
198 executor,
199 offchain,
200 temp_dir,
201 })
202 }
203
204 pub fn reference_gas_price(&self) -> u64 {
206 self.executor.reference_gas_price()
207 }
208
209 pub fn funded_account(
212 &mut self,
213 amount: u64,
214 ) -> anyhow::Result<(SuiAddress, AccountKeyPair, ObjectRef)> {
215 self.executor.funded_account(amount)
216 }
217
218 pub fn request_gas(
221 &mut self,
222 address: SuiAddress,
223 amount: u64,
224 ) -> anyhow::Result<TransactionEffects> {
225 self.executor.request_gas(address, amount)
226 }
227
228 pub fn execute_transaction(
230 &mut self,
231 tx: Transaction,
232 ) -> anyhow::Result<(TransactionEffects, Option<ExecutionError>)> {
233 self.executor.execute_transaction(tx)
234 }
235
236 pub fn advance_clock(&mut self, duration: Duration) -> TransactionEffects {
238 self.executor.advance_clock(duration)
239 }
240
241 pub fn advance_epoch(&mut self) {
245 self.executor.advance_epoch(AdvanceEpochConfig::default());
246 }
247
248 pub async fn create_checkpoint(&mut self) -> VerifiedCheckpoint {
252 let checkpoint = self.executor.create_checkpoint();
253 let timeout = Duration::from_secs(100);
254 let indexer = self
255 .offchain
256 .wait_for_indexer(checkpoint.sequence_number, timeout);
257 let consistent_store = self
258 .offchain
259 .wait_for_consistent_store(checkpoint.sequence_number, timeout);
260 let graphql = self
261 .offchain
262 .wait_for_graphql(checkpoint.sequence_number, timeout);
263 let bigtable = self
264 .offchain
265 .wait_for_bigtable(checkpoint.sequence_number, timeout);
266
267 try_join!(indexer, consistent_store, graphql, bigtable)
268 .expect("Timed out waiting for off-chain services");
269
270 checkpoint
271 }
272
273 pub fn db_url(&self) -> Url {
275 self.offchain.db_url()
276 }
277
278 pub fn consistent_store_url(&self) -> Url {
280 self.offchain.consistent_store_url()
281 }
282
283 pub fn jsonrpc_url(&self) -> Url {
285 self.offchain.jsonrpc_url()
286 }
287
288 pub fn graphql_url(&self) -> Url {
290 self.offchain.graphql_url()
291 }
292
293 pub fn kv_rpc_url(&self) -> Url {
295 self.offchain.kv_rpc_url()
296 }
297
298 pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
301 self.offchain.latest_checkpoint().await
302 }
303
304 pub async fn wait_for_indexer(
307 &self,
308 checkpoint: u64,
309 timeout: Duration,
310 ) -> Result<(), Elapsed> {
311 self.offchain.wait_for_indexer(checkpoint, timeout).await
312 }
313
314 pub async fn wait_for_pruner(
317 &self,
318 pipeline: &str,
319 checkpoint: u64,
320 timeout: Duration,
321 ) -> Result<(), Elapsed> {
322 self.offchain
323 .wait_for_pruner(pipeline, checkpoint, timeout)
324 .await
325 }
326
327 pub async fn wait_for_graphql(
330 &self,
331 checkpoint: u64,
332 timeout: Duration,
333 ) -> Result<(), Elapsed> {
334 self.offchain.wait_for_graphql(checkpoint, timeout).await
335 }
336}
337
338impl OffchainCluster {
339 pub async fn new(
347 client_args: ClientArgs,
348 OffchainClusterConfig {
349 indexer_args,
350 consistent_indexer_args,
351 fullnode_args,
352 indexer_config,
353 consistent_config,
354 jsonrpc_config,
355 jsonrpc_node_args,
356 graphql_config,
357 bootstrap_genesis,
358 kv_rpc_config,
359 }: OffchainClusterConfig,
360 registry: &prometheus::Registry,
361 ) -> anyhow::Result<Self> {
362 let consistent_port = get_available_port();
363 let consistent_listen_address =
364 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), consistent_port);
365
366 let jsonrpc_port = get_available_port();
367 let jsonrpc_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), jsonrpc_port);
368
369 let graphql_port = get_available_port();
370 let graphql_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), graphql_port);
371
372 let kv_rpc_port = get_available_port();
373 let kv_rpc_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), kv_rpc_port);
374
375 let database = TempDb::new().context("Failed to create database")?;
376 let database_url = database.database().url();
377
378 let dir = tempfile::tempdir().context("Failed to create temporary directory")?;
379 let rocksdb_path = dir.path().join("rocksdb");
380
381 let consistent_args = ConsistentArgs {
382 rpc_listen_address: consistent_listen_address,
383 tls: ConsistentTlsArgs::default(),
384 };
385
386 let jsonrpc_args = JsonRpcArgs {
387 rpc_listen_address: jsonrpc_listen_address,
388 ..Default::default()
389 };
390
391 let graphql_args = GraphQlArgs {
392 rpc_listen_address: graphql_listen_address,
393 no_ide: true,
394 };
395
396 let db = Db::for_read(database_url.clone(), DbArgs::default())
397 .await
398 .context("Failed to connect to database")?;
399
400 let indexer = setup_indexer(
401 database_url.clone(),
402 DbArgs::default(),
403 indexer_args,
404 client_args.clone(),
405 indexer_config,
406 bootstrap_genesis,
407 registry,
408 )
409 .await
410 .context("Failed to setup indexer")?;
411
412 let pipelines: Vec<_> = indexer.pipelines().collect();
413 let indexer = indexer.run().await.context("Failed to start indexer")?;
414
415 let consistent_store = start_consistent_store(
416 rocksdb_path,
417 consistent_indexer_args,
418 client_args.clone(),
419 consistent_args,
420 "0.0.0",
421 consistent_config,
422 registry,
423 )
424 .await
425 .context("Failed to start Consistent Store")?;
426
427 let consistent_reader_args = ConsistentReaderArgs {
428 consistent_store_url: Some(
429 Url::parse(&format!("http://{consistent_listen_address}")).unwrap(),
430 ),
431 ..Default::default()
432 };
433
434 let (bigtable_client, bigtable_emulator, archival_service) =
435 start_archival(client_args.clone(), kv_rpc_address, kv_rpc_config, registry).await?;
436
437 let kv_args = KvArgs {
438 ledger_grpc_url: Some(
439 format!("http://{kv_rpc_address}")
440 .parse()
441 .expect("Failed to parse kv-rpc URI"),
442 ),
443 ..Default::default()
444 };
445
446 let jsonrpc = start_jsonrpc(
447 Some(database_url.clone()),
448 DbArgs::default(),
449 kv_args.clone(),
450 consistent_reader_args.clone(),
451 jsonrpc_args,
452 jsonrpc_node_args,
453 SystemPackageTaskArgs::default(),
454 jsonrpc_config,
455 registry,
456 )
457 .await
458 .context("Failed to start JSON-RPC server")?;
459
460 let graphql = start_graphql(
461 Some(database_url.clone()),
462 fullnode_args,
463 DbArgs::default(),
464 kv_args,
465 consistent_reader_args,
466 graphql_args,
467 SystemPackageTaskArgs::default(),
468 SubscriptionArgs::default(),
469 "0.0.0",
470 graphql_config,
471 pipelines.iter().map(|p| p.to_string()).collect(),
472 registry,
473 )
474 .await
475 .context("Failed to start GraphQL server")?;
476
477 let services = indexer
478 .merge(consistent_store)
479 .merge(jsonrpc)
480 .merge(graphql)
481 .merge(archival_service);
482
483 Ok(Self {
484 consistent_listen_address,
485 jsonrpc_listen_address,
486 graphql_listen_address,
487 kv_rpc_listen_address: kv_rpc_address,
488 bigtable_client,
489 db,
490 pipelines,
491 services,
492 bigtable_emulator,
493 database,
494 dir,
495 })
496 }
497
498 pub fn db_url(&self) -> Url {
500 self.database.database().url().clone()
501 }
502
503 pub fn consistent_store_url(&self) -> Url {
505 Url::parse(&format!("http://{}/", self.consistent_listen_address))
506 .expect("Failed to parse RPC URL")
507 }
508
509 pub fn jsonrpc_url(&self) -> Url {
511 Url::parse(&format!("http://{}/", self.jsonrpc_listen_address))
512 .expect("Failed to parse RPC URL")
513 }
514
515 pub fn graphql_url(&self) -> Url {
517 Url::parse(&format!("http://{}/graphql", self.graphql_listen_address))
518 .expect("Failed to parse RPC URL")
519 }
520
521 pub fn kv_rpc_url(&self) -> Url {
523 Url::parse(&format!("http://{}/", self.kv_rpc_listen_address))
524 .expect("Failed to parse RPC URL")
525 }
526
527 pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
530 use watermarks::dsl as w;
531
532 let mut conn = self
533 .db
534 .connect()
535 .await
536 .context("Failed to connect to database")?;
537
538 let latest: HashMap<String, i64> = w::watermarks
539 .select((w::pipeline, w::checkpoint_hi_inclusive))
540 .filter(w::pipeline.eq_any(&self.pipelines))
541 .filter(w::reader_lo.le(w::checkpoint_hi_inclusive))
542 .load(&mut conn)
543 .await?
544 .into_iter()
545 .collect();
546
547 if latest.len() != self.pipelines.len() {
548 return Ok(None);
549 }
550
551 Ok(latest.into_values().min().map(|l| l as u64))
552 }
553
554 pub async fn latest_pruner_checkpoint(&self, pipeline: &str) -> anyhow::Result<Option<u64>> {
557 use watermarks::dsl as w;
558
559 let mut conn = self
560 .db
561 .connect()
562 .await
563 .context("Failed to connect to database")?;
564
565 let latest: Option<i64> = w::watermarks
566 .select(w::reader_lo)
567 .filter(w::pipeline.eq(pipeline))
568 .first(&mut conn)
569 .await
570 .optional()?;
571
572 Ok(latest.map(|l| l as u64))
573 }
574
575 pub async fn latest_consistent_store_checkpoint(&self) -> anyhow::Result<u64> {
577 ConsistentServiceClient::connect(self.consistent_store_url().to_string())
578 .await
579 .context("Failed to connect to Consistent Store")?
580 .available_range(AvailableRangeRequest {})
581 .await
582 .context("Failed to fetch available range from Consistent Store")?
583 .into_inner()
584 .max_checkpoint
585 .context("Consistent Store has not started yet")
586 }
587
588 pub async fn latest_graphql_checkpoint(&self) -> anyhow::Result<u64> {
590 let query = json!({
591 "query": "query { checkpoint { sequenceNumber } }"
592 });
593
594 let client = Client::new();
595 let request = client.post(self.graphql_url()).json(&query);
596 let response = request
597 .send()
598 .await
599 .context("Request to GraphQL server failed")?;
600
601 let body: Value = response
602 .json()
603 .await
604 .context("Failed to parse GraphQL response")?;
605
606 let sequence_number = body
607 .pointer("/data/checkpoint/sequenceNumber")
608 .context("Failed to find checkpoint sequence number in response")?;
609
610 let sequence_number: i64 = serde_json::from_value(sequence_number.clone())
611 .context("Failed to parse sequence number as i64")?;
612
613 ensure!(sequence_number != i64::MAX, "Indexer has not started yet");
614
615 Ok(sequence_number as u64)
616 }
617
618 pub async fn latest_graphql_epoch(&self) -> anyhow::Result<u64> {
620 let query = json!({
621 "query": "query { epoch { epochId } }"
622 });
623
624 let client = Client::new();
625 let request = client.post(self.graphql_url()).json(&query);
626 let response = request
627 .send()
628 .await
629 .context("Request to GraphQL server failed")?;
630
631 let body: Value = response
632 .json()
633 .await
634 .context("Failed to parse GraphQL response")?;
635
636 let epoch_id = body
637 .pointer("/data/epoch/epochId")
638 .context("Failed to find epochId in response")?;
639
640 let epoch_id: i64 =
641 serde_json::from_value(epoch_id.clone()).context("Failed to parse epochId as i64")?;
642
643 ensure!(epoch_id != i64::MAX, "Indexer has not started yet");
644
645 Ok(epoch_id as u64)
646 }
647
648 pub async fn wait_for_indexer(
651 &self,
652 checkpoint: u64,
653 timeout: Duration,
654 ) -> Result<(), Elapsed> {
655 tokio::time::timeout(timeout, async move {
656 let mut interval = interval(Duration::from_millis(200));
657 loop {
658 interval.tick().await;
659 if matches!(self.latest_checkpoint().await, Ok(Some(l)) if l >= checkpoint) {
660 break;
661 }
662 }
663 })
664 .await
665 }
666
667 pub async fn wait_for_pruner(
670 &self,
671 pipeline: &str,
672 checkpoint: u64,
673 timeout: Duration,
674 ) -> Result<(), Elapsed> {
675 tokio::time::timeout(timeout, async move {
676 let mut interval = interval(Duration::from_millis(200));
677 loop {
678 interval.tick().await;
679 if matches!(self.latest_pruner_checkpoint(pipeline).await, Ok(Some(l)) if l >= checkpoint) {
680 break;
681 }
682 }
683 }).await
684 }
685
686 pub async fn wait_for_consistent_store(
689 &self,
690 checkpoint: u64,
691 timeout: Duration,
692 ) -> Result<(), Elapsed> {
693 tokio::time::timeout(timeout, async move {
694 let mut interval = interval(Duration::from_millis(200));
695 loop {
696 interval.tick().await;
697 if matches!(self.latest_consistent_store_checkpoint().await, Ok(l) if l >= checkpoint) {
698 break;
699 }
700 }
701 })
702 .await
703 }
704
705 pub async fn wait_for_graphql(
708 &self,
709 checkpoint: u64,
710 timeout: Duration,
711 ) -> Result<(), Elapsed> {
712 tokio::time::timeout(timeout, async move {
713 let mut interval = interval(Duration::from_millis(200));
714 loop {
715 interval.tick().await;
716 if matches!(self.latest_graphql_checkpoint().await, Ok(l) if l >= checkpoint) {
717 break;
718 }
719 }
720 })
721 .await
722 }
723
724 pub async fn wait_for_bigtable(
727 &self,
728 checkpoint: u64,
729 timeout: Duration,
730 ) -> Result<(), Elapsed> {
731 let mut client = self.bigtable_client.clone();
732 tokio::time::timeout(timeout, async move {
733 let mut interval = interval(Duration::from_millis(200));
734 loop {
735 interval.tick().await;
736 if client
737 .get_watermark_for_pipelines(&ALL_PIPELINE_NAMES)
738 .await
739 .is_ok_and(|wm| {
740 wm.is_some_and(|wm| {
741 wm.checkpoint_hi_inclusive
742 .is_some_and(|cp| cp >= checkpoint)
743 })
744 })
745 {
746 break;
747 }
748 }
749 })
750 .await
751 }
752}
753
754impl Default for OffchainClusterConfig {
755 fn default() -> Self {
756 Self {
757 indexer_args: Default::default(),
758 consistent_indexer_args: Default::default(),
759 fullnode_args: FullnodeArgs::default(),
760 indexer_config: IndexerConfig::for_test(),
761 consistent_config: ConsistentConfig::for_test(),
762 jsonrpc_config: Default::default(),
763 jsonrpc_node_args: Default::default(),
764 graphql_config: Default::default(),
765 bootstrap_genesis: None,
766 kv_rpc_config: KvRpcConfig::default(),
767 }
768 }
769}
770
771pub fn local_ingestion_client_args() -> (ClientArgs, TempDir) {
773 let temp_dir = tempfile::tempdir()
774 .context("Failed to create data ingestion path")
775 .unwrap();
776 let client_args = ClientArgs {
777 ingestion: IngestionClientArgs {
778 local_ingestion_path: Some(temp_dir.path().to_owned()),
779 ..Default::default()
780 },
781 ..Default::default()
782 };
783 (client_args, temp_dir)
784}
785
786pub async fn write_checkpoint(path: &Path, checkpoint: Checkpoint) -> anyhow::Result<()> {
788 let sequence_number = checkpoint.summary.sequence_number;
789
790 let mask = FieldMask::from_paths([
791 rpc::v2::Checkpoint::path_builder().sequence_number(),
792 rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
793 rpc::v2::Checkpoint::path_builder().signature().finish(),
794 rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
795 rpc::v2::Checkpoint::path_builder()
796 .transactions()
797 .transaction()
798 .bcs()
799 .value(),
800 rpc::v2::Checkpoint::path_builder()
801 .transactions()
802 .effects()
803 .bcs()
804 .value(),
805 rpc::v2::Checkpoint::path_builder()
806 .transactions()
807 .effects()
808 .unchanged_loaded_runtime_objects()
809 .finish(),
810 rpc::v2::Checkpoint::path_builder()
811 .transactions()
812 .events()
813 .bcs()
814 .value(),
815 rpc::v2::Checkpoint::path_builder()
816 .objects()
817 .objects()
818 .bcs()
819 .value(),
820 ]);
821
822 let proto_checkpoint = rpc::v2::Checkpoint::merge_from(&checkpoint, &mask.into());
823 let proto_bytes = proto_checkpoint.encode_to_vec();
824 let compressed = zstd::encode_all(&proto_bytes[..], 3)?;
825
826 let file_name = format!("{}.binpb.zst", sequence_number);
827 let file_path = path.join(file_name);
828 fs::write(file_path, compressed)?;
829 Ok(())
830}
831
832async fn start_archival(
834 client_args: ClientArgs,
835 kv_rpc_address: SocketAddr,
836 kv_rpc_config: KvRpcConfig,
837 registry: &prometheus::Registry,
838) -> anyhow::Result<(BigTableClient, BigTableEmulator, Service)> {
839 let emulator = tokio::task::spawn_blocking(BigTableEmulator::start)
840 .await
841 .context("spawn_blocking panicked")?
842 .context("Failed to start BigTable emulator")?;
843
844 create_tables(emulator.host(), INSTANCE_ID)
845 .await
846 .context("Failed to create BigTable tables")?;
847
848 let bigtable_client =
849 BigTableClient::new_local(emulator.host().to_string(), INSTANCE_ID.to_string())
850 .await
851 .context("Failed to create BigTable client")?;
852
853 let store = BigTableStore::new(
854 BigTableClient::new_local(emulator.host().to_string(), INSTANCE_ID.to_string())
855 .await
856 .context("Failed to create BigTable client for indexer")?,
857 );
858 let bt_indexer = BigTableIndexer::new(
859 store,
860 IndexerArgs::default(),
861 client_args,
862 BtIngestionConfig::default(),
863 CommitterConfig::default(),
864 BtIndexerConfig::default(),
865 PipelineLayer::default(),
866 Chain::Unknown,
867 &ALPHA_PIPELINE_NAMES,
868 registry,
869 )
870 .await
871 .context("Failed to create BigTable indexer")?;
872
873 let bt_indexer_service = bt_indexer
876 .run()
877 .await
878 .context("Failed to start BigTable indexer")?;
879
880 let kv_rpc_server = KvRpcServer::new_local_with_config(
881 emulator.host().to_string(),
882 INSTANCE_ID.to_string(),
883 None,
884 kv_rpc_config.ledger_history(),
885 kv_rpc_config.request_bigtable_concurrency(),
886 kv_rpc_config.stages(),
887 )
888 .await
889 .context("Failed to create KvRpcServer")?;
890 let kv_rpc_service = kv_rpc_server
891 .start_service(
892 kv_rpc_address,
893 sui_kv_rpc::ServerConfig {
894 enable_experimental_query_apis: true,
895 ..Default::default()
896 },
897 )
898 .await
899 .context("Failed to start kv-rpc server")?;
900
901 let service = bt_indexer_service.merge(kv_rpc_service);
902 Ok((bigtable_client, emulator, service))
903}