1use std::{collections::BTreeMap, sync::Arc};
5
6use anyhow::{Result, anyhow, bail, ensure};
7use diesel::prelude::QueryableByName;
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::{
10 FieldCount,
11 pipeline::Processor,
12 postgres::{Connection, handler::Handler},
13 types::{
14 TypeTag,
15 base_types::{ObjectID, SuiAddress},
16 full_checkpoint_content::Checkpoint,
17 object::{Object, Owner},
18 },
19};
20use sui_indexer_alt_schema::{
21 objects::{
22 StoredCoinBalanceBucket, StoredCoinBalanceBucketDeletionReference, StoredCoinOwnerKind,
23 },
24 schema::{coin_balance_buckets, coin_balance_buckets_deletion_reference},
25};
26
27use super::checkpoint_input_objects;
28use async_trait::async_trait;
29
30pub(crate) struct CoinBalanceBuckets;
36
37#[derive(Debug, PartialEq, Eq)]
38pub(crate) struct ProcessedCoinBalanceBucket {
39 pub object_id: ObjectID,
40 pub cp_sequence_number: u64,
41 pub change: CoinBalanceBucketChangeKind,
42}
43
44#[derive(Debug, PartialEq, Eq)]
45pub(crate) enum CoinBalanceBucketChangeKind {
46 Upsert {
47 owner_kind: StoredCoinOwnerKind,
48 owner_id: SuiAddress,
49 coin_type: TypeTag,
50 balance_bucket: i16,
51 created: bool,
53 },
54 Delete,
55}
56
57#[async_trait]
58impl Processor for CoinBalanceBuckets {
59 const NAME: &'static str = "coin_balance_buckets";
60 type Value = ProcessedCoinBalanceBucket;
61
62 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
63 let cp_sequence_number = checkpoint.summary.sequence_number;
64 let checkpoint_input_objects = checkpoint_input_objects(checkpoint)?;
65 let latest_live_output_objects = checkpoint.latest_live_output_objects();
66
67 let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
68 for (object_id, input_object) in checkpoint_input_objects.iter() {
69 if !input_object.is_coin() {
72 continue;
73 }
74 if get_coin_owner(input_object).is_none() {
75 continue;
76 }
77 if latest_live_output_objects.contains_key(object_id) {
78 continue;
79 }
80 values.insert(
81 *object_id,
82 ProcessedCoinBalanceBucket {
83 object_id: *object_id,
84 cp_sequence_number,
85 change: CoinBalanceBucketChangeKind::Delete,
86 },
87 );
88 }
89 for (object_id, output_object) in latest_live_output_objects.iter() {
90 let Some(coin_type) = output_object.coin_type_maybe() else {
91 continue;
92 };
93
94 let (input_bucket, input_owner) = match checkpoint_input_objects.get(object_id) {
95 Some(input_object) => {
96 let bucket = get_coin_balance_bucket(input_object)?;
97 let owner = get_coin_owner(input_object);
98 (Some(bucket), owner)
99 }
100 None => (None, None),
101 };
102
103 let output_balance_bucket = get_coin_balance_bucket(output_object)?;
104 let output_owner = get_coin_owner(output_object);
105
106 match (input_owner, output_owner) {
107 (Some(_), None) => {
108 values.insert(
112 *object_id,
113 ProcessedCoinBalanceBucket {
114 object_id: *object_id,
115 cp_sequence_number,
116 change: CoinBalanceBucketChangeKind::Delete,
117 },
118 );
119 }
120 (_, Some(new_owner))
121 if input_owner != output_owner
122 || input_bucket != Some(output_balance_bucket) =>
123 {
124 values.insert(
128 *object_id,
129 ProcessedCoinBalanceBucket {
130 object_id: *object_id,
131 cp_sequence_number,
132 change: CoinBalanceBucketChangeKind::Upsert {
133 owner_kind: new_owner.0,
134 owner_id: new_owner.1,
135 coin_type,
136 balance_bucket: output_balance_bucket,
137 created: input_owner.is_none(),
138 },
139 },
140 );
141 }
142 _ => {}
143 }
144 }
145
146 Ok(values.into_values().collect())
147 }
148}
149
150#[async_trait]
151impl Handler for CoinBalanceBuckets {
152 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
153 let stored = values
154 .iter()
155 .map(|v| v.try_into())
156 .collect::<Result<Vec<StoredCoinBalanceBucket>>>()?;
157
158 let mut references = Vec::new();
159 for value in values {
160 match &value.change {
161 CoinBalanceBucketChangeKind::Upsert { created, .. } => {
162 if !created {
163 references.push(StoredCoinBalanceBucketDeletionReference {
164 object_id: value.object_id.to_vec(),
165 cp_sequence_number: value.cp_sequence_number as i64,
166 });
167 }
168 }
169 CoinBalanceBucketChangeKind::Delete => {
173 references.push(StoredCoinBalanceBucketDeletionReference {
174 object_id: value.object_id.to_vec(),
175 cp_sequence_number: value.cp_sequence_number as i64,
176 });
177 references.push(StoredCoinBalanceBucketDeletionReference {
178 object_id: value.object_id.to_vec(),
179 cp_sequence_number: value.cp_sequence_number as i64 + 1,
180 });
181 }
182 }
183 }
184
185 let count = diesel::insert_into(coin_balance_buckets::table)
186 .values(&stored)
187 .on_conflict_do_nothing()
188 .execute(conn)
189 .await?;
190 let deleted_refs = if !references.is_empty() {
191 diesel::insert_into(coin_balance_buckets_deletion_reference::table)
192 .values(&references)
193 .on_conflict_do_nothing()
194 .execute(conn)
195 .await?
196 } else {
197 0
198 };
199
200 Ok(count + deleted_refs)
201 }
202
203 async fn prune<'a>(
205 &self,
206 from: u64,
207 to_exclusive: u64,
208 conn: &mut Connection<'a>,
209 ) -> anyhow::Result<usize> {
210 let query = format!(
225 "
226 -- Delete reference records and return immediate predecessor refs to the main table.
227 WITH deletion_refs AS (
228 DELETE FROM
229 coin_balance_buckets_deletion_reference dr
230 WHERE
231 {} <= cp_sequence_number AND cp_sequence_number < {}
232 RETURNING
233 object_id, (
234 SELECT
235 cb.cp_sequence_number
236 FROM
237 coin_balance_buckets cb
238 WHERE
239 dr.object_id = cb.object_id
240 AND cb.cp_sequence_number < dr.cp_sequence_number
241 ORDER BY
242 cb.cp_sequence_number DESC
243 LIMIT
244 1
245 ) AS cp_sequence_number
246 ),
247 deleted_coins AS (
248 DELETE FROM
249 coin_balance_buckets cb
250 USING
251 deletion_refs dr
252 WHERE
253 cb.object_id = dr.object_id
254 AND cb.cp_sequence_number = dr.cp_sequence_number
255 RETURNING
256 cb.object_id
257 )
258 SELECT
259 (SELECT COUNT(*) FROM deleted_coins) AS deleted_coins,
260 (SELECT COUNT(*) FROM deletion_refs) AS deleted_refs
261 ",
262 from, to_exclusive
263 );
264
265 #[derive(QueryableByName)]
266 struct CountResult {
267 #[diesel(sql_type = diesel::sql_types::BigInt)]
268 deleted_coins: i64,
269 #[diesel(sql_type = diesel::sql_types::BigInt)]
270 deleted_refs: i64,
271 }
272
273 let CountResult {
274 deleted_coins,
275 deleted_refs,
276 } = diesel::sql_query(query)
277 .get_result::<CountResult>(conn)
278 .await?;
279
280 ensure!(
281 deleted_coins == deleted_refs,
282 "Deleted coins count ({deleted_coins}) does not match deleted refs count ({deleted_refs})",
283 );
284
285 Ok((deleted_coins + deleted_refs) as usize)
286 }
287}
288
289impl FieldCount for ProcessedCoinBalanceBucket {
290 const FIELD_COUNT: usize = StoredCoinBalanceBucket::FIELD_COUNT;
291}
292
293impl TryInto<StoredCoinBalanceBucket> for &ProcessedCoinBalanceBucket {
294 type Error = anyhow::Error;
295
296 fn try_into(self) -> Result<StoredCoinBalanceBucket> {
297 match &self.change {
298 CoinBalanceBucketChangeKind::Upsert {
299 owner_kind,
300 owner_id,
301 coin_type,
302 balance_bucket,
303 created: _,
304 } => {
305 let serialized_coin_type = bcs::to_bytes(&coin_type)
306 .map_err(|_| anyhow!("Failed to serialize type for {}", self.object_id))?;
307 Ok(StoredCoinBalanceBucket {
308 object_id: self.object_id.to_vec(),
309 cp_sequence_number: self.cp_sequence_number as i64,
310 owner_kind: Some(*owner_kind),
311 owner_id: Some(owner_id.to_vec()),
312 coin_type: Some(serialized_coin_type),
313 coin_balance_bucket: Some(*balance_bucket),
314 })
315 }
316 CoinBalanceBucketChangeKind::Delete => Ok(StoredCoinBalanceBucket {
317 object_id: self.object_id.to_vec(),
318 cp_sequence_number: self.cp_sequence_number as i64,
319 owner_kind: None,
320 owner_id: None,
321 coin_type: None,
322 coin_balance_bucket: None,
323 }),
324 }
325 }
326}
327
328pub(crate) fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)> {
331 match object.owner() {
332 Owner::AddressOwner(owner_id) => Some((StoredCoinOwnerKind::Fastpath, *owner_id)),
333 Owner::ConsensusAddressOwner { owner, .. } => {
334 Some((StoredCoinOwnerKind::Consensus, *owner))
335 }
336 Owner::Immutable | Owner::ObjectOwner(_) | Owner::Shared { .. } => None,
337 }
338}
339
340pub(crate) fn get_coin_balance_bucket(coin: &Object) -> anyhow::Result<i16> {
341 let Some(coin) = coin.as_coin_maybe() else {
342 bail!("Failed to deserialize Coin for {}", coin.id());
344 };
345 let balance = coin.balance.value();
346 if balance == 0 {
347 return Ok(0);
348 }
349 let bucket = balance.ilog10() as i16;
350 Ok(bucket)
351}
352
353#[cfg(test)]
354mod tests {
355 use std::str::FromStr;
356
357 use super::*;
358 use diesel::QueryDsl;
359 use sui_indexer_alt_framework::{
360 Indexer,
361 types::{
362 base_types::{MoveObjectType, ObjectID, SequenceNumber, SuiAddress, dbg_addr},
363 digests::TransactionDigest,
364 gas_coin::GAS,
365 object::{MoveObject, Object},
366 test_checkpoint_data_builder::TestCheckpointBuilder,
367 },
368 };
369 use sui_indexer_alt_schema::MIGRATIONS;
370 use sui_protocol_config::ProtocolConfig;
371
372 async fn get_all_balance_buckets(conn: &mut Connection<'_>) -> Vec<StoredCoinBalanceBucket> {
374 coin_balance_buckets::table
375 .order_by((
376 coin_balance_buckets::object_id,
377 coin_balance_buckets::cp_sequence_number,
378 ))
379 .load(conn)
380 .await
381 .unwrap()
382 }
383
384 #[test]
385 fn test_get_coin_balance_bucket() {
386 let id = ObjectID::random();
387
388 let zero_coin = Object::with_id_owner_gas_for_testing(id, SuiAddress::ZERO, 0);
390 assert_eq!(get_coin_balance_bucket(&zero_coin).unwrap(), 0);
391
392 let one_coin = Object::with_id_owner_gas_for_testing(id, SuiAddress::ZERO, 1);
394 assert_eq!(get_coin_balance_bucket(&one_coin).unwrap(), 0);
395
396 let hundred_coin = Object::with_id_owner_gas_for_testing(id, SuiAddress::ZERO, 100);
398 assert_eq!(get_coin_balance_bucket(&hundred_coin).unwrap(), 2);
399
400 let million_coin = Object::with_id_owner_gas_for_testing(id, SuiAddress::ZERO, 1000000);
402 assert_eq!(get_coin_balance_bucket(&million_coin).unwrap(), 6);
403
404 let invalid_coin = unsafe {
406 Object::new_move(
407 MoveObject::new_from_execution(
408 MoveObjectType::staked_sui(),
409 false,
410 SequenceNumber::new(),
411 bcs::to_bytes(&Object::new_gas_for_testing()).unwrap(),
412 &ProtocolConfig::get_for_max_version_UNSAFE(),
413 false,
414 )
415 .unwrap(),
416 Owner::AddressOwner(SuiAddress::ZERO),
417 TransactionDigest::ZERO,
418 )
419 };
420 assert!(get_coin_balance_bucket(&invalid_coin).is_err());
421 }
422
423 #[test]
424 fn test_get_coin_owner() {
425 let id = ObjectID::random();
426 let addr1 = SuiAddress::random_for_testing_only();
427 let addr_owned = Object::with_id_owner_for_testing(id, addr1);
428 assert_eq!(
429 get_coin_owner(&addr_owned),
430 Some((StoredCoinOwnerKind::Fastpath, addr1))
431 );
432
433 let obj_owned = Object::with_object_owner_for_testing(id, addr1.into());
435 assert_eq!(get_coin_owner(&obj_owned), None);
436
437 let shared = Object::shared_for_testing();
439 assert_eq!(get_coin_owner(&shared), None);
440
441 let immutable = Object::immutable_with_id_for_testing(id);
443 assert_eq!(get_coin_owner(&immutable), None);
444
445 let consensus_v2 = Object::with_id_owner_version_for_testing(
446 id,
447 SequenceNumber::new(),
448 Owner::ConsensusAddressOwner {
449 start_version: SequenceNumber::new(),
450 owner: addr1,
451 },
452 );
453 assert_eq!(
454 get_coin_owner(&consensus_v2),
455 Some((StoredCoinOwnerKind::Consensus, addr1))
456 );
457 }
458
459 #[tokio::test]
460 async fn test_process_coin_balance_buckets_new_sui_coin() {
461 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
462 let mut conn = indexer.store().connect().await.unwrap();
463 let mut builder = TestCheckpointBuilder::new(0);
464 builder = builder
465 .start_transaction(0)
466 .create_sui_object(0, 0)
467 .create_sui_object(1, 100)
468 .finish_transaction();
469 let checkpoint = builder.build_checkpoint();
470 let values = CoinBalanceBuckets
471 .process(&Arc::new(checkpoint))
472 .await
473 .unwrap();
474 assert_eq!(values.len(), 2);
475 assert!(values.iter().any(|v| matches!(
476 v.change,
477 CoinBalanceBucketChangeKind::Upsert {
478 owner_kind: StoredCoinOwnerKind::Fastpath,
479 balance_bucket: 0,
480 created: true,
481 ..
482 }
483 )));
484 assert!(values.iter().any(|v| matches!(
485 v.change,
486 CoinBalanceBucketChangeKind::Upsert {
487 owner_kind: StoredCoinOwnerKind::Fastpath,
488 balance_bucket: 2,
489 created: true,
490 ..
491 }
492 )));
493 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
494 .await
495 .unwrap();
496 assert_eq!(rows_inserted, 2);
497 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
498 assert_eq!(all_balance_buckets.len(), 2);
499 let rows_pruned = CoinBalanceBuckets.prune(0, 1, &mut conn).await.unwrap();
500 assert_eq!(rows_pruned, 0);
501 }
502
503 #[tokio::test]
504 async fn test_process_coin_balance_buckets_new_other_coin() {
505 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
506 let mut conn = indexer.store().connect().await.unwrap();
507 let mut builder = TestCheckpointBuilder::new(0);
508 let coin_type = TypeTag::from_str("0x0::a::b").unwrap();
509 builder = builder
510 .start_transaction(0)
511 .create_coin_object(0, 0, 10, coin_type.clone())
512 .finish_transaction();
513 let checkpoint = builder.build_checkpoint();
514 let values = CoinBalanceBuckets
515 .process(&Arc::new(checkpoint))
516 .await
517 .unwrap();
518 assert_eq!(values.len(), 1);
519 assert_eq!(
520 &values[0].change,
521 &CoinBalanceBucketChangeKind::Upsert {
522 owner_kind: StoredCoinOwnerKind::Fastpath,
523 balance_bucket: 1,
524 coin_type: coin_type.clone(),
525 owner_id: TestCheckpointBuilder::derive_address(0),
526 created: true,
527 }
528 );
529 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
530 .await
531 .unwrap();
532 assert_eq!(rows_inserted, 1);
533 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
534 assert_eq!(all_balance_buckets.len(), 1);
535 let rows_pruned = CoinBalanceBuckets.prune(0, 1, &mut conn).await.unwrap();
536 assert_eq!(rows_pruned, 0);
537 }
538
539 #[tokio::test]
540 async fn test_process_coin_balance_buckets_balance_change() {
541 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
542 let mut conn = indexer.store().connect().await.unwrap();
543 let mut builder = TestCheckpointBuilder::new(0);
544 builder = builder
545 .start_transaction(0)
546 .create_sui_object(0, 10010)
547 .finish_transaction();
548 let checkpoint = builder.build_checkpoint();
549 let values = CoinBalanceBuckets
550 .process(&Arc::new(checkpoint))
551 .await
552 .unwrap();
553 assert_eq!(values.len(), 1);
554 assert_eq!(
556 values[0].change,
557 CoinBalanceBucketChangeKind::Upsert {
558 owner_kind: StoredCoinOwnerKind::Fastpath,
559 balance_bucket: 4,
560 coin_type: GAS::type_tag(),
561 owner_id: TestCheckpointBuilder::derive_address(0),
562 created: true,
563 }
564 );
565 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
566 .await
567 .unwrap();
568 assert_eq!(rows_inserted, 1);
569 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
570 assert_eq!(all_balance_buckets.len(), 1);
571
572 builder = builder
576 .start_transaction(0)
577 .transfer_coin_balance(0, 1, 1, 10)
578 .finish_transaction();
579 let checkpoint = builder.build_checkpoint();
580 let values = CoinBalanceBuckets
581 .process(&Arc::new(checkpoint))
582 .await
583 .unwrap();
584 assert_eq!(values.len(), 1);
585 assert_eq!(
587 values[0].change,
588 CoinBalanceBucketChangeKind::Upsert {
589 owner_kind: StoredCoinOwnerKind::Fastpath,
590 balance_bucket: 1,
591 coin_type: GAS::type_tag(),
592 owner_id: TestCheckpointBuilder::derive_address(1),
593 created: true
594 }
595 );
596 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
597 .await
598 .unwrap();
599 assert_eq!(rows_inserted, 1);
600 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
601 assert_eq!(all_balance_buckets.len(), 2);
602
603 let rows_pruned = CoinBalanceBuckets.prune(0, 2, &mut conn).await.unwrap();
605 assert_eq!(rows_pruned, 0);
606
607 builder = builder
610 .start_transaction(0)
611 .transfer_coin_balance(0, 2, 1, 1)
612 .finish_transaction();
613 let checkpoint = builder.build_checkpoint();
614 let values = CoinBalanceBuckets
615 .process(&Arc::new(checkpoint))
616 .await
617 .unwrap();
618 assert_eq!(values.len(), 2);
619 assert!(values.iter().any(|v| v.change
621 == CoinBalanceBucketChangeKind::Upsert {
622 owner_kind: StoredCoinOwnerKind::Fastpath,
623 balance_bucket: 3,
624 coin_type: GAS::type_tag(),
625 owner_id: TestCheckpointBuilder::derive_address(0),
626 created: false,
627 }));
628 assert!(values.iter().any(|v| v.change
629 == CoinBalanceBucketChangeKind::Upsert {
630 owner_kind: StoredCoinOwnerKind::Fastpath,
631 balance_bucket: 0,
632 coin_type: GAS::type_tag(),
633 owner_id: TestCheckpointBuilder::derive_address(1),
634 created: true,
635 }));
636 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
637 .await
638 .unwrap();
639 assert_eq!(rows_inserted, 3);
642 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
643 assert_eq!(all_balance_buckets.len(), 4);
644
645 let rows_pruned = CoinBalanceBuckets.prune(2, 3, &mut conn).await.unwrap();
646 assert_eq!(rows_pruned, 2);
647 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
648 assert_eq!(all_balance_buckets.len(), 3);
649 assert_eq!(
650 all_balance_buckets[0],
651 StoredCoinBalanceBucket {
652 object_id: TestCheckpointBuilder::derive_object_id(0).to_vec(),
653 cp_sequence_number: 2,
654 owner_kind: Some(StoredCoinOwnerKind::Fastpath),
655 owner_id: Some(TestCheckpointBuilder::derive_address(0).to_vec()),
656 coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
657 coin_balance_bucket: Some(3),
658 }
659 );
660 assert_eq!(
661 all_balance_buckets[1],
662 StoredCoinBalanceBucket {
663 object_id: TestCheckpointBuilder::derive_object_id(1).to_vec(),
664 cp_sequence_number: 1,
665 owner_kind: Some(StoredCoinOwnerKind::Fastpath),
666 owner_id: Some(TestCheckpointBuilder::derive_address(1).to_vec()),
667 coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
668 coin_balance_bucket: Some(1),
669 }
670 );
671 assert_eq!(
672 all_balance_buckets[2],
673 StoredCoinBalanceBucket {
674 object_id: TestCheckpointBuilder::derive_object_id(2).to_vec(),
675 cp_sequence_number: 2,
676 owner_kind: Some(StoredCoinOwnerKind::Fastpath),
677 owner_id: Some(TestCheckpointBuilder::derive_address(1).to_vec()),
678 coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
679 coin_balance_bucket: Some(0),
680 }
681 );
682 }
683
684 #[tokio::test]
685 async fn test_process_coin_balance_buckets_coin_deleted() {
686 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
687 let mut conn = indexer.store().connect().await.unwrap();
688 let mut builder = TestCheckpointBuilder::new(0);
689 builder = builder
690 .start_transaction(0)
691 .create_owned_object(0)
692 .finish_transaction();
693 let checkpoint = builder.build_checkpoint();
694 let values = CoinBalanceBuckets
695 .process(&Arc::new(checkpoint))
696 .await
697 .unwrap();
698 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
699 .await
700 .unwrap();
701 assert_eq!(rows_inserted, 1);
702
703 builder = builder
704 .start_transaction(0)
705 .delete_object(0)
706 .finish_transaction();
707 let checkpoint = builder.build_checkpoint();
708 let values = CoinBalanceBuckets
709 .process(&Arc::new(checkpoint))
710 .await
711 .unwrap();
712 assert_eq!(values.len(), 1);
713 assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
714 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
715 .await
716 .unwrap();
717 assert_eq!(rows_inserted, 3);
719 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
720 assert_eq!(all_balance_buckets.len(), 2);
721 assert_eq!(
722 all_balance_buckets[1],
723 StoredCoinBalanceBucket {
724 object_id: TestCheckpointBuilder::derive_object_id(0).to_vec(),
725 cp_sequence_number: 1,
726 owner_kind: None,
727 owner_id: None,
728 coin_type: None,
729 coin_balance_bucket: None,
730 }
731 );
732
733 let rows_pruned = CoinBalanceBuckets.prune(0, 2, &mut conn).await.unwrap();
734 let sentinel_rows_pruned = CoinBalanceBuckets.prune(2, 3, &mut conn).await.unwrap();
735 assert_eq!(rows_pruned + sentinel_rows_pruned, 4);
736 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
737 assert_eq!(all_balance_buckets.len(), 0);
738 }
739
740 #[tokio::test]
741 async fn test_process_coin_balance_buckets_owner_change() {
742 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
743 let mut conn = indexer.store().connect().await.unwrap();
744 let mut builder = TestCheckpointBuilder::new(0);
745 builder = builder
746 .start_transaction(0)
747 .create_sui_object(0, 100)
748 .finish_transaction();
749 let checkpoint = builder.build_checkpoint();
750 let values = CoinBalanceBuckets
751 .process(&Arc::new(checkpoint))
752 .await
753 .unwrap();
754 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
755 .await
756 .unwrap();
757 assert_eq!(rows_inserted, 1);
758
759 builder = builder
760 .start_transaction(0)
761 .transfer_object(0, 1)
762 .finish_transaction();
763 let checkpoint = builder.build_checkpoint();
764 let values = CoinBalanceBuckets
765 .process(&Arc::new(checkpoint))
766 .await
767 .unwrap();
768 assert_eq!(values.len(), 1);
769 assert_eq!(
770 values[0].change,
771 CoinBalanceBucketChangeKind::Upsert {
772 owner_kind: StoredCoinOwnerKind::Fastpath,
773 balance_bucket: 2,
774 coin_type: GAS::type_tag(),
775 owner_id: TestCheckpointBuilder::derive_address(1),
776 created: false,
777 }
778 );
779 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
780 .await
781 .unwrap();
782 assert_eq!(rows_inserted, 2);
783
784 let rows_pruned = CoinBalanceBuckets.prune(0, 2, &mut conn).await.unwrap();
785 assert_eq!(rows_pruned, 2);
786 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
787 assert_eq!(all_balance_buckets.len(), 1);
788 assert_eq!(
789 all_balance_buckets[0],
790 StoredCoinBalanceBucket {
791 object_id: TestCheckpointBuilder::derive_object_id(0).to_vec(),
792 cp_sequence_number: 1,
793 owner_kind: Some(StoredCoinOwnerKind::Fastpath),
794 owner_id: Some(TestCheckpointBuilder::derive_address(1).to_vec()),
795 coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
796 coin_balance_bucket: Some(2),
797 }
798 );
799 }
800
801 #[tokio::test]
802 async fn test_process_coin_balance_buckets_object_owned() {
803 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
804 let mut conn = indexer.store().connect().await.unwrap();
805 let mut builder = TestCheckpointBuilder::new(0);
806 builder = builder
807 .start_transaction(0)
808 .create_owned_object(0)
809 .finish_transaction();
810 let checkpoint = builder.build_checkpoint();
811 let values = CoinBalanceBuckets
812 .process(&Arc::new(checkpoint))
813 .await
814 .unwrap();
815 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
816 .await
817 .unwrap();
818 assert_eq!(rows_inserted, 1);
819
820 builder = builder
822 .start_transaction(0)
823 .change_object_owner(0, Owner::ObjectOwner(dbg_addr(1)))
824 .finish_transaction();
825 let checkpoint = builder.build_checkpoint();
826 let values = CoinBalanceBuckets
827 .process(&Arc::new(checkpoint))
828 .await
829 .unwrap();
830 assert_eq!(values.len(), 1);
831 assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
832 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
833 .await
834 .unwrap();
835 assert_eq!(rows_inserted, 3);
836
837 let rows_pruned = CoinBalanceBuckets.prune(0, 2, &mut conn).await.unwrap();
838 let sentinel_rows_pruned = CoinBalanceBuckets.prune(2, 3, &mut conn).await.unwrap();
839 assert_eq!(rows_pruned + sentinel_rows_pruned, 4);
840 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
841 assert_eq!(all_balance_buckets.len(), 0);
842 }
843
844 #[tokio::test]
845 async fn test_process_coin_balance_buckets_wrap_and_prune_after_unwrap() {
846 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
847 let mut conn = indexer.store().connect().await.unwrap();
848 let mut builder = TestCheckpointBuilder::new(0);
849
850 builder = builder
852 .start_transaction(0)
853 .create_sui_object(0, 100)
854 .finish_transaction();
855 let checkpoint = builder.build_checkpoint();
856 let values = CoinBalanceBuckets
857 .process(&Arc::new(checkpoint))
858 .await
859 .unwrap();
860 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
861 .await
862 .unwrap();
863 assert_eq!(rows_inserted, 1);
864
865 builder = builder
867 .start_transaction(0)
868 .wrap_object(0)
869 .finish_transaction();
870 let checkpoint = builder.build_checkpoint();
871 let values = CoinBalanceBuckets
872 .process(&Arc::new(checkpoint))
873 .await
874 .unwrap();
875 assert_eq!(values.len(), 1);
876 assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
877 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
879 .await
880 .unwrap();
881 assert_eq!(rows_inserted, 3);
882
883 builder = builder
885 .start_transaction(0)
886 .unwrap_object(0)
887 .finish_transaction();
888 let checkpoint = builder.build_checkpoint();
889 let values = CoinBalanceBuckets
890 .process(&Arc::new(checkpoint))
891 .await
892 .unwrap();
893 assert_eq!(values.len(), 1);
894 assert_eq!(
895 values[0].change,
896 CoinBalanceBucketChangeKind::Upsert {
897 owner_kind: StoredCoinOwnerKind::Fastpath,
898 balance_bucket: 2,
899 coin_type: GAS::type_tag(),
900 owner_id: TestCheckpointBuilder::derive_address(0),
901 created: true,
902 }
903 );
904 let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
905 .await
906 .unwrap();
907 assert_eq!(rows_inserted, 1);
908
909 let rows_pruned = CoinBalanceBuckets.prune(0, 3, &mut conn).await.unwrap();
911 assert_eq!(rows_pruned, 4);
912
913 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
914 assert_eq!(all_balance_buckets.len(), 1);
915 assert_eq!(all_balance_buckets[0].cp_sequence_number, 2);
916 assert!(all_balance_buckets[0].owner_kind.is_some());
917 }
918
919 #[tokio::test]
922 async fn test_process_coin_balance_buckets_out_of_order_pruning() {
923 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
924 let mut conn = indexer.store().connect().await.unwrap();
925 let mut builder = TestCheckpointBuilder::new(0);
926
927 builder = builder
929 .start_transaction(0)
930 .create_sui_object(0, 100)
931 .create_sui_object(1, 1000)
932 .create_sui_object(2, 10000)
933 .finish_transaction();
934 let checkpoint0 = builder.build_checkpoint();
935 let result = CoinBalanceBuckets
936 .process(&Arc::new(checkpoint0))
937 .await
938 .unwrap();
939 assert_eq!(result.len(), 3);
940 let rows_inserted = CoinBalanceBuckets::commit(&result, &mut conn)
941 .await
942 .unwrap();
943 assert_eq!(rows_inserted, 3);
944
945 builder = builder
947 .start_transaction(0)
948 .transfer_object(0, 1)
949 .transfer_object(1, 1)
950 .transfer_object(2, 1)
951 .finish_transaction();
952 let checkpoint1 = builder.build_checkpoint();
953 let result = CoinBalanceBuckets
954 .process(&Arc::new(checkpoint1))
955 .await
956 .unwrap();
957 assert_eq!(result.len(), 3);
958 let rows_inserted = CoinBalanceBuckets::commit(&result, &mut conn)
959 .await
960 .unwrap();
961 assert_eq!(rows_inserted, 6);
962
963 builder = builder
965 .start_transaction(1)
966 .transfer_object(0, 0)
967 .transfer_object(1, 0)
968 .transfer_object(2, 0)
969 .finish_transaction();
970 let checkpoint2 = builder.build_checkpoint();
971 let result = CoinBalanceBuckets
972 .process(&Arc::new(checkpoint2))
973 .await
974 .unwrap();
975 assert_eq!(result.len(), 3);
976 let rows_inserted = CoinBalanceBuckets::commit(&result, &mut conn)
977 .await
978 .unwrap();
979 assert_eq!(rows_inserted, 6);
980
981 let all_deletion_references = coin_balance_buckets_deletion_reference::table
983 .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
984 .await
985 .unwrap();
986 assert_eq!(all_deletion_references.len(), 6);
987 for reference in &all_deletion_references {
988 assert!(reference.cp_sequence_number == 1 || reference.cp_sequence_number == 2);
989 }
990
991 let rows_pruned = CoinBalanceBuckets.prune(2, 3, &mut conn).await.unwrap();
993 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
994 let all_deletion_references = coin_balance_buckets_deletion_reference::table
995 .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
996 .await
997 .unwrap();
998
999 for bucket in &all_balance_buckets {
1001 assert!(bucket.cp_sequence_number == 0 || bucket.cp_sequence_number == 2);
1002 }
1003 assert_eq!(rows_pruned, 6);
1004 assert_eq!(all_balance_buckets.len(), 6);
1005 for reference in &all_deletion_references {
1007 assert!(reference.cp_sequence_number != 2);
1008 }
1009
1010 let rows_pruned = CoinBalanceBuckets.prune(1, 2, &mut conn).await.unwrap();
1012 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
1013 let all_deletion_references = coin_balance_buckets_deletion_reference::table
1014 .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
1015 .await
1016 .unwrap();
1017
1018 for bucket in &all_balance_buckets {
1020 assert_eq!(bucket.cp_sequence_number, 2);
1021 }
1022 assert_eq!(rows_pruned, 6);
1023 assert_eq!(all_balance_buckets.len(), 3);
1024 for reference in &all_deletion_references {
1026 assert_eq!(reference.cp_sequence_number, 0);
1027 }
1028 }
1029
1030 #[tokio::test]
1034 async fn test_process_coin_balance_buckets_concurrent_pruning() {
1035 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
1036 let mut conn = indexer.store().connect().await.unwrap();
1037 let mut builder = TestCheckpointBuilder::new(0);
1038
1039 builder = builder
1041 .start_transaction(0)
1042 .create_sui_object(0, 100)
1043 .create_sui_object(1, 1000)
1044 .create_sui_object(2, 10000)
1045 .finish_transaction();
1046 let checkpoint0 = builder.build_checkpoint();
1047 let result = CoinBalanceBuckets
1048 .process(&Arc::new(checkpoint0))
1049 .await
1050 .unwrap();
1051 CoinBalanceBuckets::commit(&result, &mut conn)
1052 .await
1053 .unwrap();
1054
1055 builder = builder
1056 .start_transaction(0)
1057 .transfer_object(0, 1)
1058 .transfer_object(1, 1)
1059 .transfer_object(2, 1)
1060 .finish_transaction();
1061 let checkpoint1 = builder.build_checkpoint();
1062 let result = CoinBalanceBuckets
1063 .process(&Arc::new(checkpoint1))
1064 .await
1065 .unwrap();
1066 CoinBalanceBuckets::commit(&result, &mut conn)
1067 .await
1068 .unwrap();
1069
1070 builder = builder
1071 .start_transaction(1)
1072 .transfer_object(0, 0)
1073 .transfer_object(1, 0)
1074 .transfer_object(2, 0)
1075 .finish_transaction();
1076 let checkpoint2 = builder.build_checkpoint();
1077 let result = CoinBalanceBuckets
1078 .process(&Arc::new(checkpoint2))
1079 .await
1080 .unwrap();
1081 CoinBalanceBuckets::commit(&result, &mut conn)
1082 .await
1083 .unwrap();
1084
1085 let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
1087 assert_eq!(all_balance_buckets.len(), 9); let all_deletion_references = coin_balance_buckets_deletion_reference::table
1089 .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
1090 .await
1091 .unwrap();
1092 assert_eq!(all_deletion_references.len(), 6);
1093
1094 let mut handles = Vec::new();
1096
1097 let store = indexer.store().clone();
1099
1100 let store1 = store.clone();
1102 handles.push(tokio::spawn(async move {
1103 let mut conn = store1.connect().await.unwrap();
1104 CoinBalanceBuckets.prune(2, 3, &mut conn).await
1105 }));
1106
1107 let store2 = store.clone();
1109 handles.push(tokio::spawn(async move {
1110 let mut conn = store2.connect().await.unwrap();
1111 CoinBalanceBuckets.prune(1, 2, &mut conn).await
1112 }));
1113
1114 let store3 = store.clone();
1116 handles.push(tokio::spawn(async move {
1117 let mut conn = store3.connect().await.unwrap();
1118 CoinBalanceBuckets.prune(0, 1, &mut conn).await
1119 }));
1120
1121 let results: Vec<Result<usize, anyhow::Error>> = futures::future::join_all(handles)
1123 .await
1124 .into_iter()
1125 .map(|r| r.unwrap())
1126 .collect();
1127
1128 for result in &results {
1130 assert!(result.is_ok(), "Pruning operation failed: {:?}", result);
1131 }
1132
1133 let final_balance_buckets = get_all_balance_buckets(&mut conn).await;
1135 let final_deletion_references = coin_balance_buckets_deletion_reference::table
1136 .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
1137 .await
1138 .unwrap();
1139
1140 assert_eq!(final_balance_buckets.len(), 3);
1142 for bucket in &final_balance_buckets {
1143 assert_eq!(bucket.cp_sequence_number, 2);
1144 }
1145
1146 assert_eq!(final_deletion_references.len(), 0);
1148
1149 let total_pruned: usize = results.into_iter().map(|r| r.unwrap()).sum();
1151 assert_eq!(total_pruned, 12);
1152 for bucket in &final_balance_buckets {
1153 assert_eq!(bucket.cp_sequence_number, 2);
1154 }
1155 }
1156}