1use std::collections::HashMap;
5use std::fs;
6use std::net::IpAddr;
7use std::net::Ipv4Addr;
8use std::net::SocketAddr;
9use std::path::Path;
10use std::time::Duration;
11
12use anyhow::Context;
13use anyhow::ensure;
14use diesel::ExpressionMethods;
15use diesel::OptionalExtension;
16use diesel::QueryDsl;
17use diesel_async::RunQueryDsl;
18use prost::Message;
19use reqwest::Client;
20use serde_json::Value;
21use serde_json::json;
22use simulacrum::Simulacrum;
23use sui_futures::service::Service;
24use sui_indexer_alt::BootstrapGenesis;
25use sui_indexer_alt::config::IndexerConfig;
26use sui_indexer_alt::setup_indexer;
27use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::AvailableRangeRequest;
28use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::consistent_service_client::ConsistentServiceClient;
29use sui_indexer_alt_consistent_store::args::RpcArgs as ConsistentArgs;
30use sui_indexer_alt_consistent_store::args::TlsArgs as ConsistentTlsArgs;
31use sui_indexer_alt_consistent_store::config::ServiceConfig as ConsistentConfig;
32use sui_indexer_alt_consistent_store::start_service as start_consistent_store;
33use sui_indexer_alt_framework::IndexerArgs;
34use sui_indexer_alt_framework::ingestion::ClientArgs;
35use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientArgs;
36use sui_indexer_alt_framework::pipeline::CommitterConfig;
37use sui_indexer_alt_framework::postgres::schema::watermarks;
38use sui_indexer_alt_graphql::RpcArgs as GraphQlArgs;
39use sui_indexer_alt_graphql::args::SubscriptionArgs;
40use sui_indexer_alt_graphql::config::RpcConfig as GraphQlConfig;
41use sui_indexer_alt_graphql::start_rpc as start_graphql;
42use sui_indexer_alt_jsonrpc::NodeArgs as JsonRpcNodeArgs;
43use sui_indexer_alt_jsonrpc::RpcArgs as JsonRpcArgs;
44use sui_indexer_alt_jsonrpc::config::RpcConfig as JsonRpcConfig;
45use sui_indexer_alt_jsonrpc::start_rpc as start_jsonrpc;
46use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
47use sui_indexer_alt_reader::fullnode_client::FullnodeArgs;
48use sui_indexer_alt_reader::kv_loader::KvArgs;
49use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
50use sui_kv_rpc::KvRpcServer;
51use sui_kvstore::ALL_PIPELINE_NAMES;
52use sui_kvstore::ALPHA_PIPELINE_NAMES;
53use sui_kvstore::BigTableClient;
54use sui_kvstore::BigTableIndexer;
55use sui_kvstore::BigTableStore;
56use sui_kvstore::IndexerConfig as BtIndexerConfig;
57use sui_kvstore::IngestionConfig as BtIngestionConfig;
58use sui_kvstore::KeyValueStoreReader;
59use sui_kvstore::PipelineLayer;
60use sui_kvstore::testing::BigTableEmulator;
61use sui_kvstore::testing::INSTANCE_ID;
62use sui_kvstore::testing::create_tables;
63use sui_pg_db::Db;
64use sui_pg_db::DbArgs;
65use sui_pg_db::temp::TempDb;
66use sui_pg_db::temp::get_available_port;
67use sui_protocol_config::Chain;
68use sui_rpc::field::FieldMask;
69use sui_rpc::field::FieldMaskUtil;
70use sui_rpc::merge::Merge;
71use sui_rpc::proto::sui::rpc;
72use sui_types::base_types::ObjectRef;
73use sui_types::base_types::SuiAddress;
74use sui_types::crypto::AccountKeyPair;
75use sui_types::effects::TransactionEffects;
76use sui_types::error::ExecutionError;
77use sui_types::full_checkpoint_content::Checkpoint;
78use sui_types::messages_checkpoint::VerifiedCheckpoint;
79use sui_types::transaction::Transaction;
80use tempfile::TempDir;
81use tokio::time::error::Elapsed;
82use tokio::time::interval;
83use tokio::try_join;
84use url::Url;
85
86pub mod coin_registry;
87pub mod find;
88pub mod move_helpers;
89
90pub struct FullCluster {
93 executor: Simulacrum,
95
96 offchain: OffchainCluster,
99
100 #[allow(unused)]
102 temp_dir: TempDir,
103}
104
105pub struct OffchainCluster {
113 consistent_listen_address: SocketAddr,
115
116 jsonrpc_listen_address: SocketAddr,
118
119 graphql_listen_address: SocketAddr,
121
122 kv_rpc_listen_address: SocketAddr,
124
125 bigtable_client: BigTableClient,
127
128 db: Db,
130
131 pipelines: Vec<&'static str>,
133
134 #[allow(unused)]
137 services: Service,
138
139 #[allow(unused)]
141 bigtable_emulator: BigTableEmulator,
142
143 #[allow(unused)]
145 database: TempDb,
146
147 #[allow(unused)]
150 dir: TempDir,
151}
152
153pub struct OffchainClusterConfig {
154 pub indexer_args: IndexerArgs,
155 pub consistent_indexer_args: IndexerArgs,
156 pub fullnode_args: FullnodeArgs,
157 pub indexer_config: IndexerConfig,
158 pub consistent_config: ConsistentConfig,
159 pub jsonrpc_config: JsonRpcConfig,
160 pub jsonrpc_node_args: JsonRpcNodeArgs,
161 pub graphql_config: GraphQlConfig,
162 pub bootstrap_genesis: Option<BootstrapGenesis>,
163}
164
165impl FullCluster {
166 pub async fn new() -> anyhow::Result<Self> {
169 Self::new_with_configs(
170 Simulacrum::new(),
171 OffchainClusterConfig::default(),
172 &prometheus::Registry::new(),
173 )
174 .await
175 }
176
177 pub async fn new_with_configs(
181 mut executor: Simulacrum,
182 offchain_cluster_config: OffchainClusterConfig,
183 registry: &prometheus::Registry,
184 ) -> anyhow::Result<Self> {
185 let (client_args, temp_dir) = local_ingestion_client_args();
186 executor.set_data_ingestion_path(temp_dir.path().to_owned());
187
188 let offchain = OffchainCluster::new(client_args, offchain_cluster_config, registry)
189 .await
190 .context("Failed to create off-chain cluster")?;
191
192 Ok(Self {
193 executor,
194 offchain,
195 temp_dir,
196 })
197 }
198
199 pub fn reference_gas_price(&self) -> u64 {
201 self.executor.reference_gas_price()
202 }
203
204 pub fn funded_account(
207 &mut self,
208 amount: u64,
209 ) -> anyhow::Result<(SuiAddress, AccountKeyPair, ObjectRef)> {
210 self.executor.funded_account(amount)
211 }
212
213 pub fn request_gas(
216 &mut self,
217 address: SuiAddress,
218 amount: u64,
219 ) -> anyhow::Result<TransactionEffects> {
220 self.executor.request_gas(address, amount)
221 }
222
223 pub fn execute_transaction(
225 &mut self,
226 tx: Transaction,
227 ) -> anyhow::Result<(TransactionEffects, Option<ExecutionError>)> {
228 self.executor.execute_transaction(tx)
229 }
230
231 pub fn advance_clock(&mut self, duration: Duration) -> TransactionEffects {
233 self.executor.advance_clock(duration)
234 }
235
236 pub async fn create_checkpoint(&mut self) -> VerifiedCheckpoint {
240 let checkpoint = self.executor.create_checkpoint();
241 let timeout = Duration::from_secs(100);
242 let indexer = self
243 .offchain
244 .wait_for_indexer(checkpoint.sequence_number, timeout);
245 let consistent_store = self
246 .offchain
247 .wait_for_consistent_store(checkpoint.sequence_number, timeout);
248 let graphql = self
249 .offchain
250 .wait_for_graphql(checkpoint.sequence_number, timeout);
251 let bigtable = self
252 .offchain
253 .wait_for_bigtable(checkpoint.sequence_number, timeout);
254
255 try_join!(indexer, consistent_store, graphql, bigtable)
256 .expect("Timed out waiting for off-chain services");
257
258 checkpoint
259 }
260
261 pub fn db_url(&self) -> Url {
263 self.offchain.db_url()
264 }
265
266 pub fn consistent_store_url(&self) -> Url {
268 self.offchain.consistent_store_url()
269 }
270
271 pub fn jsonrpc_url(&self) -> Url {
273 self.offchain.jsonrpc_url()
274 }
275
276 pub fn graphql_url(&self) -> Url {
278 self.offchain.graphql_url()
279 }
280
281 pub fn kv_rpc_url(&self) -> Url {
283 self.offchain.kv_rpc_url()
284 }
285
286 pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
289 self.offchain.latest_checkpoint().await
290 }
291
292 pub async fn wait_for_indexer(
295 &self,
296 checkpoint: u64,
297 timeout: Duration,
298 ) -> Result<(), Elapsed> {
299 self.offchain.wait_for_indexer(checkpoint, timeout).await
300 }
301
302 pub async fn wait_for_pruner(
305 &self,
306 pipeline: &str,
307 checkpoint: u64,
308 timeout: Duration,
309 ) -> Result<(), Elapsed> {
310 self.offchain
311 .wait_for_pruner(pipeline, checkpoint, timeout)
312 .await
313 }
314
315 pub async fn wait_for_graphql(
318 &self,
319 checkpoint: u64,
320 timeout: Duration,
321 ) -> Result<(), Elapsed> {
322 self.offchain.wait_for_graphql(checkpoint, timeout).await
323 }
324}
325
326impl OffchainCluster {
327 pub async fn new(
335 client_args: ClientArgs,
336 OffchainClusterConfig {
337 indexer_args,
338 consistent_indexer_args,
339 fullnode_args,
340 indexer_config,
341 consistent_config,
342 jsonrpc_config,
343 jsonrpc_node_args,
344 graphql_config,
345 bootstrap_genesis,
346 }: OffchainClusterConfig,
347 registry: &prometheus::Registry,
348 ) -> anyhow::Result<Self> {
349 let consistent_port = get_available_port();
350 let consistent_listen_address =
351 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), consistent_port);
352
353 let jsonrpc_port = get_available_port();
354 let jsonrpc_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), jsonrpc_port);
355
356 let graphql_port = get_available_port();
357 let graphql_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), graphql_port);
358
359 let kv_rpc_port = get_available_port();
360 let kv_rpc_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), kv_rpc_port);
361
362 let database = TempDb::new().context("Failed to create database")?;
363 let database_url = database.database().url();
364
365 let dir = tempfile::tempdir().context("Failed to create temporary directory")?;
366 let rocksdb_path = dir.path().join("rocksdb");
367
368 let consistent_args = ConsistentArgs {
369 rpc_listen_address: consistent_listen_address,
370 tls: ConsistentTlsArgs::default(),
371 };
372
373 let jsonrpc_args = JsonRpcArgs {
374 rpc_listen_address: jsonrpc_listen_address,
375 ..Default::default()
376 };
377
378 let graphql_args = GraphQlArgs {
379 rpc_listen_address: graphql_listen_address,
380 no_ide: true,
381 };
382
383 let db = Db::for_read(database_url.clone(), DbArgs::default())
384 .await
385 .context("Failed to connect to database")?;
386
387 let indexer = setup_indexer(
388 database_url.clone(),
389 DbArgs::default(),
390 indexer_args,
391 client_args.clone(),
392 indexer_config,
393 bootstrap_genesis,
394 registry,
395 )
396 .await
397 .context("Failed to setup indexer")?;
398
399 let pipelines: Vec<_> = indexer.pipelines().collect();
400 let indexer = indexer.run().await.context("Failed to start indexer")?;
401
402 let consistent_store = start_consistent_store(
403 rocksdb_path,
404 consistent_indexer_args,
405 client_args.clone(),
406 consistent_args,
407 "0.0.0",
408 consistent_config,
409 registry,
410 )
411 .await
412 .context("Failed to start Consistent Store")?;
413
414 let consistent_reader_args = ConsistentReaderArgs {
415 consistent_store_url: Some(
416 Url::parse(&format!("http://{consistent_listen_address}")).unwrap(),
417 ),
418 ..Default::default()
419 };
420
421 let (bigtable_client, bigtable_emulator, archival_service) =
422 start_archival(client_args.clone(), kv_rpc_address, registry).await?;
423
424 let kv_args = KvArgs {
425 ledger_grpc_url: Some(
426 format!("http://{kv_rpc_address}")
427 .parse()
428 .expect("Failed to parse kv-rpc URI"),
429 ),
430 ..Default::default()
431 };
432
433 let jsonrpc = start_jsonrpc(
434 Some(database_url.clone()),
435 DbArgs::default(),
436 kv_args.clone(),
437 consistent_reader_args.clone(),
438 jsonrpc_args,
439 jsonrpc_node_args,
440 SystemPackageTaskArgs::default(),
441 jsonrpc_config,
442 registry,
443 )
444 .await
445 .context("Failed to start JSON-RPC server")?;
446
447 let graphql = start_graphql(
448 Some(database_url.clone()),
449 fullnode_args,
450 DbArgs::default(),
451 kv_args,
452 consistent_reader_args,
453 graphql_args,
454 SystemPackageTaskArgs::default(),
455 SubscriptionArgs::default(),
456 "0.0.0",
457 graphql_config,
458 pipelines.iter().map(|p| p.to_string()).collect(),
459 registry,
460 )
461 .await
462 .context("Failed to start GraphQL server")?;
463
464 let services = indexer
465 .merge(consistent_store)
466 .merge(jsonrpc)
467 .merge(graphql)
468 .merge(archival_service);
469
470 Ok(Self {
471 consistent_listen_address,
472 jsonrpc_listen_address,
473 graphql_listen_address,
474 kv_rpc_listen_address: kv_rpc_address,
475 bigtable_client,
476 db,
477 pipelines,
478 services,
479 bigtable_emulator,
480 database,
481 dir,
482 })
483 }
484
485 pub fn db_url(&self) -> Url {
487 self.database.database().url().clone()
488 }
489
490 pub fn consistent_store_url(&self) -> Url {
492 Url::parse(&format!("http://{}/", self.consistent_listen_address))
493 .expect("Failed to parse RPC URL")
494 }
495
496 pub fn jsonrpc_url(&self) -> Url {
498 Url::parse(&format!("http://{}/", self.jsonrpc_listen_address))
499 .expect("Failed to parse RPC URL")
500 }
501
502 pub fn graphql_url(&self) -> Url {
504 Url::parse(&format!("http://{}/graphql", self.graphql_listen_address))
505 .expect("Failed to parse RPC URL")
506 }
507
508 pub fn kv_rpc_url(&self) -> Url {
510 Url::parse(&format!("http://{}/", self.kv_rpc_listen_address))
511 .expect("Failed to parse RPC URL")
512 }
513
514 pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
517 use watermarks::dsl as w;
518
519 let mut conn = self
520 .db
521 .connect()
522 .await
523 .context("Failed to connect to database")?;
524
525 let latest: HashMap<String, i64> = w::watermarks
526 .select((w::pipeline, w::checkpoint_hi_inclusive))
527 .filter(w::pipeline.eq_any(&self.pipelines))
528 .filter(w::reader_lo.le(w::checkpoint_hi_inclusive))
529 .load(&mut conn)
530 .await?
531 .into_iter()
532 .collect();
533
534 if latest.len() != self.pipelines.len() {
535 return Ok(None);
536 }
537
538 Ok(latest.into_values().min().map(|l| l as u64))
539 }
540
541 pub async fn latest_pruner_checkpoint(&self, pipeline: &str) -> anyhow::Result<Option<u64>> {
544 use watermarks::dsl as w;
545
546 let mut conn = self
547 .db
548 .connect()
549 .await
550 .context("Failed to connect to database")?;
551
552 let latest: Option<i64> = w::watermarks
553 .select(w::reader_lo)
554 .filter(w::pipeline.eq(pipeline))
555 .first(&mut conn)
556 .await
557 .optional()?;
558
559 Ok(latest.map(|l| l as u64))
560 }
561
562 pub async fn latest_consistent_store_checkpoint(&self) -> anyhow::Result<u64> {
564 ConsistentServiceClient::connect(self.consistent_store_url().to_string())
565 .await
566 .context("Failed to connect to Consistent Store")?
567 .available_range(AvailableRangeRequest {})
568 .await
569 .context("Failed to fetch available range from Consistent Store")?
570 .into_inner()
571 .max_checkpoint
572 .context("Consistent Store has not started yet")
573 }
574
575 pub async fn latest_graphql_checkpoint(&self) -> anyhow::Result<u64> {
577 let query = json!({
578 "query": "query { checkpoint { sequenceNumber } }"
579 });
580
581 let client = Client::new();
582 let request = client.post(self.graphql_url()).json(&query);
583 let response = request
584 .send()
585 .await
586 .context("Request to GraphQL server failed")?;
587
588 let body: Value = response
589 .json()
590 .await
591 .context("Failed to parse GraphQL response")?;
592
593 let sequence_number = body
594 .pointer("/data/checkpoint/sequenceNumber")
595 .context("Failed to find checkpoint sequence number in response")?;
596
597 let sequence_number: i64 = serde_json::from_value(sequence_number.clone())
598 .context("Failed to parse sequence number as i64")?;
599
600 ensure!(sequence_number != i64::MAX, "Indexer has not started yet");
601
602 Ok(sequence_number as u64)
603 }
604
605 pub async fn latest_graphql_epoch(&self) -> anyhow::Result<u64> {
607 let query = json!({
608 "query": "query { epoch { epochId } }"
609 });
610
611 let client = Client::new();
612 let request = client.post(self.graphql_url()).json(&query);
613 let response = request
614 .send()
615 .await
616 .context("Request to GraphQL server failed")?;
617
618 let body: Value = response
619 .json()
620 .await
621 .context("Failed to parse GraphQL response")?;
622
623 let epoch_id = body
624 .pointer("/data/epoch/epochId")
625 .context("Failed to find epochId in response")?;
626
627 let epoch_id: i64 =
628 serde_json::from_value(epoch_id.clone()).context("Failed to parse epochId as i64")?;
629
630 ensure!(epoch_id != i64::MAX, "Indexer has not started yet");
631
632 Ok(epoch_id as u64)
633 }
634
635 pub async fn wait_for_indexer(
638 &self,
639 checkpoint: u64,
640 timeout: Duration,
641 ) -> Result<(), Elapsed> {
642 tokio::time::timeout(timeout, async move {
643 let mut interval = interval(Duration::from_millis(200));
644 loop {
645 interval.tick().await;
646 if matches!(self.latest_checkpoint().await, Ok(Some(l)) if l >= checkpoint) {
647 break;
648 }
649 }
650 })
651 .await
652 }
653
654 pub async fn wait_for_pruner(
657 &self,
658 pipeline: &str,
659 checkpoint: u64,
660 timeout: Duration,
661 ) -> Result<(), Elapsed> {
662 tokio::time::timeout(timeout, async move {
663 let mut interval = interval(Duration::from_millis(200));
664 loop {
665 interval.tick().await;
666 if matches!(self.latest_pruner_checkpoint(pipeline).await, Ok(Some(l)) if l >= checkpoint) {
667 break;
668 }
669 }
670 }).await
671 }
672
673 pub async fn wait_for_consistent_store(
676 &self,
677 checkpoint: u64,
678 timeout: Duration,
679 ) -> Result<(), Elapsed> {
680 tokio::time::timeout(timeout, async move {
681 let mut interval = interval(Duration::from_millis(200));
682 loop {
683 interval.tick().await;
684 if matches!(self.latest_consistent_store_checkpoint().await, Ok(l) if l >= checkpoint) {
685 break;
686 }
687 }
688 })
689 .await
690 }
691
692 pub async fn wait_for_graphql(
695 &self,
696 checkpoint: u64,
697 timeout: Duration,
698 ) -> Result<(), Elapsed> {
699 tokio::time::timeout(timeout, async move {
700 let mut interval = interval(Duration::from_millis(200));
701 loop {
702 interval.tick().await;
703 if matches!(self.latest_graphql_checkpoint().await, Ok(l) if l >= checkpoint) {
704 break;
705 }
706 }
707 })
708 .await
709 }
710
711 pub async fn wait_for_bigtable(
714 &self,
715 checkpoint: u64,
716 timeout: Duration,
717 ) -> Result<(), Elapsed> {
718 let mut client = self.bigtable_client.clone();
719 tokio::time::timeout(timeout, async move {
720 let mut interval = interval(Duration::from_millis(200));
721 loop {
722 interval.tick().await;
723 if client
724 .get_watermark_for_pipelines(&ALL_PIPELINE_NAMES)
725 .await
726 .is_ok_and(|wm| {
727 wm.is_some_and(|wm| {
728 wm.checkpoint_hi_inclusive
729 .is_some_and(|cp| cp >= checkpoint)
730 })
731 })
732 {
733 break;
734 }
735 }
736 })
737 .await
738 }
739}
740
741impl Default for OffchainClusterConfig {
742 fn default() -> Self {
743 Self {
744 indexer_args: Default::default(),
745 consistent_indexer_args: Default::default(),
746 fullnode_args: FullnodeArgs::default(),
747 indexer_config: IndexerConfig::for_test(),
748 consistent_config: ConsistentConfig::for_test(),
749 jsonrpc_config: Default::default(),
750 jsonrpc_node_args: Default::default(),
751 graphql_config: Default::default(),
752 bootstrap_genesis: None,
753 }
754 }
755}
756
757pub fn local_ingestion_client_args() -> (ClientArgs, TempDir) {
759 let temp_dir = tempfile::tempdir()
760 .context("Failed to create data ingestion path")
761 .unwrap();
762 let client_args = ClientArgs {
763 ingestion: IngestionClientArgs {
764 local_ingestion_path: Some(temp_dir.path().to_owned()),
765 ..Default::default()
766 },
767 ..Default::default()
768 };
769 (client_args, temp_dir)
770}
771
772pub async fn write_checkpoint(path: &Path, checkpoint: Checkpoint) -> anyhow::Result<()> {
774 let sequence_number = checkpoint.summary.sequence_number;
775
776 let mask = FieldMask::from_paths([
777 rpc::v2::Checkpoint::path_builder().sequence_number(),
778 rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
779 rpc::v2::Checkpoint::path_builder().signature().finish(),
780 rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
781 rpc::v2::Checkpoint::path_builder()
782 .transactions()
783 .transaction()
784 .bcs()
785 .value(),
786 rpc::v2::Checkpoint::path_builder()
787 .transactions()
788 .effects()
789 .bcs()
790 .value(),
791 rpc::v2::Checkpoint::path_builder()
792 .transactions()
793 .effects()
794 .unchanged_loaded_runtime_objects()
795 .finish(),
796 rpc::v2::Checkpoint::path_builder()
797 .transactions()
798 .events()
799 .bcs()
800 .value(),
801 rpc::v2::Checkpoint::path_builder()
802 .objects()
803 .objects()
804 .bcs()
805 .value(),
806 ]);
807
808 let proto_checkpoint = rpc::v2::Checkpoint::merge_from(&checkpoint, &mask.into());
809 let proto_bytes = proto_checkpoint.encode_to_vec();
810 let compressed = zstd::encode_all(&proto_bytes[..], 3)?;
811
812 let file_name = format!("{}.binpb.zst", sequence_number);
813 let file_path = path.join(file_name);
814 fs::write(file_path, compressed)?;
815 Ok(())
816}
817
818async fn start_archival(
820 client_args: ClientArgs,
821 kv_rpc_address: SocketAddr,
822 registry: &prometheus::Registry,
823) -> anyhow::Result<(BigTableClient, BigTableEmulator, Service)> {
824 let emulator = tokio::task::spawn_blocking(BigTableEmulator::start)
825 .await
826 .context("spawn_blocking panicked")?
827 .context("Failed to start BigTable emulator")?;
828
829 create_tables(emulator.host(), INSTANCE_ID)
830 .await
831 .context("Failed to create BigTable tables")?;
832
833 let bigtable_client =
834 BigTableClient::new_local(emulator.host().to_string(), INSTANCE_ID.to_string())
835 .await
836 .context("Failed to create BigTable client")?;
837
838 let store = BigTableStore::new(
839 BigTableClient::new_local(emulator.host().to_string(), INSTANCE_ID.to_string())
840 .await
841 .context("Failed to create BigTable client for indexer")?,
842 );
843 let bt_indexer = BigTableIndexer::new(
844 store,
845 IndexerArgs::default(),
846 client_args,
847 BtIngestionConfig::default(),
848 CommitterConfig::default(),
849 BtIndexerConfig::default(),
850 PipelineLayer::default(),
851 Chain::Unknown,
852 &ALPHA_PIPELINE_NAMES,
853 registry,
854 )
855 .await
856 .context("Failed to create BigTable indexer")?;
857
858 let bt_indexer_service = bt_indexer
859 .indexer
860 .run()
861 .await
862 .context("Failed to start BigTable indexer")?;
863
864 let kv_rpc_server = KvRpcServer::new_local(
865 emulator.host().to_string(),
866 INSTANCE_ID.to_string(),
867 None,
868 None,
869 )
870 .await
871 .context("Failed to create KvRpcServer")?;
872 let kv_rpc_service = kv_rpc_server
873 .start_service(kv_rpc_address, sui_kv_rpc::ServerConfig::default())
874 .await
875 .context("Failed to start kv-rpc server")?;
876
877 let service = bt_indexer_service.merge(kv_rpc_service);
878 Ok((bigtable_client, emulator, service))
879}