sui_indexer_alt/handlers/
coin_balance_buckets.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, sync::Arc};
5
6use anyhow::{Result, anyhow, bail, ensure};
7use diesel::prelude::QueryableByName;
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::{
10    FieldCount,
11    pipeline::Processor,
12    postgres::{Connection, handler::Handler},
13    types::{
14        TypeTag,
15        base_types::{ObjectID, SuiAddress},
16        full_checkpoint_content::Checkpoint,
17        object::{Object, Owner},
18    },
19};
20use sui_indexer_alt_schema::{
21    objects::{
22        StoredCoinBalanceBucket, StoredCoinBalanceBucketDeletionReference, StoredCoinOwnerKind,
23    },
24    schema::{coin_balance_buckets, coin_balance_buckets_deletion_reference},
25};
26
27use super::checkpoint_input_objects;
28use async_trait::async_trait;
29
30/// This handler is used to track the balance buckets of address-owned coins.
31/// The balance bucket is calculated using log10 of the coin balance.
32/// Whenever a coin object's presence, owner or balance bucket changes,
33/// we will insert a new row into the `coin_balance_buckets` table.
34/// A Delete record will be inserted when a coin object is no longer present or no longer owned by an address.
35pub(crate) struct CoinBalanceBuckets;
36
37#[derive(Debug, PartialEq, Eq)]
38pub(crate) struct ProcessedCoinBalanceBucket {
39    pub object_id: ObjectID,
40    pub cp_sequence_number: u64,
41    pub change: CoinBalanceBucketChangeKind,
42}
43
44#[derive(Debug, PartialEq, Eq)]
45pub(crate) enum CoinBalanceBucketChangeKind {
46    Upsert {
47        owner_kind: StoredCoinOwnerKind,
48        owner_id: SuiAddress,
49        coin_type: TypeTag,
50        balance_bucket: i16,
51        /// Indicates whether the coin was created/unwrapped in this checkpoint.
52        created: bool,
53    },
54    Delete,
55}
56
57#[async_trait]
58impl Processor for CoinBalanceBuckets {
59    const NAME: &'static str = "coin_balance_buckets";
60    type Value = ProcessedCoinBalanceBucket;
61
62    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
63        let cp_sequence_number = checkpoint.summary.sequence_number;
64        let checkpoint_input_objects = checkpoint_input_objects(checkpoint)?;
65        let latest_live_output_objects = checkpoint.latest_live_output_objects();
66
67        let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
68        for (object_id, input_object) in checkpoint_input_objects.iter() {
69            // This loop processes all coins that were owned by a single address prior to the checkpoint,
70            // but is now deleted or wrapped after the checkpoint.
71            if !input_object.is_coin() {
72                continue;
73            }
74            if get_coin_owner(input_object).is_none() {
75                continue;
76            }
77            if latest_live_output_objects.contains_key(object_id) {
78                continue;
79            }
80            values.insert(
81                *object_id,
82                ProcessedCoinBalanceBucket {
83                    object_id: *object_id,
84                    cp_sequence_number,
85                    change: CoinBalanceBucketChangeKind::Delete,
86                },
87            );
88        }
89        for (object_id, output_object) in latest_live_output_objects.iter() {
90            let Some(coin_type) = output_object.coin_type_maybe() else {
91                continue;
92            };
93
94            let (input_bucket, input_owner) = match checkpoint_input_objects.get(object_id) {
95                Some(input_object) => {
96                    let bucket = get_coin_balance_bucket(input_object)?;
97                    let owner = get_coin_owner(input_object);
98                    (Some(bucket), owner)
99                }
100                None => (None, None),
101            };
102
103            let output_balance_bucket = get_coin_balance_bucket(output_object)?;
104            let output_owner = get_coin_owner(output_object);
105
106            match (input_owner, output_owner) {
107                (Some(_), None) => {
108                    // In this case, the coin was owned by a single address prior to the checkpoint,
109                    // but is now either shared or immutable after the checkpoint. We treat this the same
110                    // as if the coin was deleted, from the perspective of the balance bucket.
111                    values.insert(
112                        *object_id,
113                        ProcessedCoinBalanceBucket {
114                            object_id: *object_id,
115                            cp_sequence_number,
116                            change: CoinBalanceBucketChangeKind::Delete,
117                        },
118                    );
119                }
120                (_, Some(new_owner))
121                    if input_owner != output_owner
122                        || input_bucket != Some(output_balance_bucket) =>
123                {
124                    // In this case, the coin is still owned by a single address after the checkpoint,
125                    // but either the owner or the balance bucket has changed. This also includes the case
126                    // where the coin did not exist prior to the checkpoint, and is now created/unwrapped.
127                    values.insert(
128                        *object_id,
129                        ProcessedCoinBalanceBucket {
130                            object_id: *object_id,
131                            cp_sequence_number,
132                            change: CoinBalanceBucketChangeKind::Upsert {
133                                owner_kind: new_owner.0,
134                                owner_id: new_owner.1,
135                                coin_type,
136                                balance_bucket: output_balance_bucket,
137                                created: input_owner.is_none(),
138                            },
139                        },
140                    );
141                }
142                _ => {}
143            }
144        }
145
146        Ok(values.into_values().collect())
147    }
148}
149
150#[async_trait]
151impl Handler for CoinBalanceBuckets {
152    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
153        let stored = values
154            .iter()
155            .map(|v| v.try_into())
156            .collect::<Result<Vec<StoredCoinBalanceBucket>>>()?;
157
158        let mut references = Vec::new();
159        for value in values {
160            match &value.change {
161                CoinBalanceBucketChangeKind::Upsert { created, .. } => {
162                    if !created {
163                        references.push(StoredCoinBalanceBucketDeletionReference {
164                            object_id: value.object_id.to_vec(),
165                            cp_sequence_number: value.cp_sequence_number as i64,
166                        });
167                    }
168                }
169                // Store record of current version to delete previous version, and another to delete
170                // itself. When pruning, the deletion record will not be pruned in the
171                // `value.cp_sequence_number` checkpoint, but the next one.
172                CoinBalanceBucketChangeKind::Delete => {
173                    references.push(StoredCoinBalanceBucketDeletionReference {
174                        object_id: value.object_id.to_vec(),
175                        cp_sequence_number: value.cp_sequence_number as i64,
176                    });
177                    references.push(StoredCoinBalanceBucketDeletionReference {
178                        object_id: value.object_id.to_vec(),
179                        cp_sequence_number: value.cp_sequence_number as i64 + 1,
180                    });
181                }
182            }
183        }
184
185        let count = diesel::insert_into(coin_balance_buckets::table)
186            .values(&stored)
187            .on_conflict_do_nothing()
188            .execute(conn)
189            .await?;
190        let deleted_refs = if !references.is_empty() {
191            diesel::insert_into(coin_balance_buckets_deletion_reference::table)
192                .values(&references)
193                .on_conflict_do_nothing()
194                .execute(conn)
195                .await?
196        } else {
197            0
198        };
199
200        Ok(count + deleted_refs)
201    }
202
203    // TODO: Add tests for this function.
204    async fn prune<'a>(
205        &self,
206        from: u64,
207        to_exclusive: u64,
208        conn: &mut Connection<'a>,
209    ) -> anyhow::Result<usize> {
210        // This query first deletes from coin_balance_buckets_deletion_reference and computes
211        // predecessors, then deletes from coin_balance_buckets using the precomputed predecessor
212        // information. The inline compute avoids HashAggregate operations and the ensuing
213        // materialization overhead.
214        //
215        // This works best on under 1.5 million object changes, roughly 15k checkpoints. Performance
216        // degrades sharply beyond this, since the planner switches to hash joins and full table
217        // scans. A HashAggregate approach interestingly becomes more performant in this scenario.
218        //
219        // If the first call to prune succeeds, subsequent calls will find no records to delete from
220        // coin_balance_buckets_deletion_reference, and consequently no records to delete from the
221        // main table. Pruning is thus idempotent after the initial run.
222        //
223        // TODO: use sui_sql_macro's query!
224        let query = format!(
225            "
226            -- Delete reference records and return immediate predecessor refs to the main table.
227            WITH deletion_refs AS (
228                DELETE FROM
229                    coin_balance_buckets_deletion_reference dr
230                WHERE
231                    {} <= cp_sequence_number AND cp_sequence_number < {}
232                RETURNING
233                    object_id, (
234                    SELECT
235                        cb.cp_sequence_number
236                    FROM
237                        coin_balance_buckets cb
238                    WHERE
239                        dr.object_id = cb.object_id
240                    AND cb.cp_sequence_number < dr.cp_sequence_number
241                    ORDER BY
242                        cb.cp_sequence_number DESC
243                    LIMIT
244                        1
245                    ) AS cp_sequence_number
246            ),
247            deleted_coins AS (
248                DELETE FROM
249                    coin_balance_buckets cb
250                USING
251                    deletion_refs dr
252                WHERE
253                    cb.object_id = dr.object_id
254                AND cb.cp_sequence_number = dr.cp_sequence_number
255                RETURNING
256                    cb.object_id
257            )
258            SELECT
259                (SELECT COUNT(*) FROM deleted_coins) AS deleted_coins,
260                (SELECT COUNT(*) FROM deletion_refs) AS deleted_refs
261            ",
262            from, to_exclusive
263        );
264
265        #[derive(QueryableByName)]
266        struct CountResult {
267            #[diesel(sql_type = diesel::sql_types::BigInt)]
268            deleted_coins: i64,
269            #[diesel(sql_type = diesel::sql_types::BigInt)]
270            deleted_refs: i64,
271        }
272
273        let CountResult {
274            deleted_coins,
275            deleted_refs,
276        } = diesel::sql_query(query)
277            .get_result::<CountResult>(conn)
278            .await?;
279
280        ensure!(
281            deleted_coins == deleted_refs,
282            "Deleted coins count ({deleted_coins}) does not match deleted refs count ({deleted_refs})",
283        );
284
285        Ok((deleted_coins + deleted_refs) as usize)
286    }
287}
288
289impl FieldCount for ProcessedCoinBalanceBucket {
290    const FIELD_COUNT: usize = StoredCoinBalanceBucket::FIELD_COUNT;
291}
292
293impl TryInto<StoredCoinBalanceBucket> for &ProcessedCoinBalanceBucket {
294    type Error = anyhow::Error;
295
296    fn try_into(self) -> Result<StoredCoinBalanceBucket> {
297        match &self.change {
298            CoinBalanceBucketChangeKind::Upsert {
299                owner_kind,
300                owner_id,
301                coin_type,
302                balance_bucket,
303                created: _,
304            } => {
305                let serialized_coin_type = bcs::to_bytes(&coin_type)
306                    .map_err(|_| anyhow!("Failed to serialize type for {}", self.object_id))?;
307                Ok(StoredCoinBalanceBucket {
308                    object_id: self.object_id.to_vec(),
309                    cp_sequence_number: self.cp_sequence_number as i64,
310                    owner_kind: Some(*owner_kind),
311                    owner_id: Some(owner_id.to_vec()),
312                    coin_type: Some(serialized_coin_type),
313                    coin_balance_bucket: Some(*balance_bucket),
314                })
315            }
316            CoinBalanceBucketChangeKind::Delete => Ok(StoredCoinBalanceBucket {
317                object_id: self.object_id.to_vec(),
318                cp_sequence_number: self.cp_sequence_number as i64,
319                owner_kind: None,
320                owner_id: None,
321                coin_type: None,
322                coin_balance_bucket: None,
323            }),
324        }
325    }
326}
327
328/// Get the owner kind and address of a coin, if it is owned by a single address,
329/// either through fast-path ownership or consensus ownership.
330pub(crate) fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)> {
331    match object.owner() {
332        Owner::AddressOwner(owner_id) => Some((StoredCoinOwnerKind::Fastpath, *owner_id)),
333        Owner::ConsensusAddressOwner { owner, .. } => {
334            Some((StoredCoinOwnerKind::Consensus, *owner))
335        }
336        Owner::Immutable | Owner::ObjectOwner(_) | Owner::Shared { .. } => None,
337    }
338}
339
340pub(crate) fn get_coin_balance_bucket(coin: &Object) -> anyhow::Result<i16> {
341    let Some(coin) = coin.as_coin_maybe() else {
342        // TODO: We should make this an invariant violation.
343        bail!("Failed to deserialize Coin for {}", coin.id());
344    };
345    let balance = coin.balance.value();
346    if balance == 0 {
347        return Ok(0);
348    }
349    let bucket = balance.ilog10() as i16;
350    Ok(bucket)
351}
352
353#[cfg(test)]
354mod tests {
355    use std::str::FromStr;
356
357    use super::*;
358    use diesel::QueryDsl;
359    use sui_indexer_alt_framework::{
360        Indexer,
361        types::{
362            base_types::{MoveObjectType, ObjectID, SequenceNumber, SuiAddress, dbg_addr},
363            digests::TransactionDigest,
364            gas_coin::GAS,
365            object::{MoveObject, Object},
366            test_checkpoint_data_builder::TestCheckpointBuilder,
367        },
368    };
369    use sui_indexer_alt_schema::MIGRATIONS;
370    use sui_protocol_config::ProtocolConfig;
371
372    // Get all balance buckets from the database, sorted by object_id and cp_sequence_number.
373    async fn get_all_balance_buckets(conn: &mut Connection<'_>) -> Vec<StoredCoinBalanceBucket> {
374        coin_balance_buckets::table
375            .order_by((
376                coin_balance_buckets::object_id,
377                coin_balance_buckets::cp_sequence_number,
378            ))
379            .load(conn)
380            .await
381            .unwrap()
382    }
383
384    #[test]
385    fn test_get_coin_balance_bucket() {
386        let id = ObjectID::random();
387
388        // Test coin with 0 balance
389        let zero_coin = Object::with_id_owner_gas_for_testing(id, SuiAddress::ZERO, 0);
390        assert_eq!(get_coin_balance_bucket(&zero_coin).unwrap(), 0);
391
392        // Test coin with balance 1 (10^0)
393        let one_coin = Object::with_id_owner_gas_for_testing(id, SuiAddress::ZERO, 1);
394        assert_eq!(get_coin_balance_bucket(&one_coin).unwrap(), 0);
395
396        // Test coin with balance 100 (10^2)
397        let hundred_coin = Object::with_id_owner_gas_for_testing(id, SuiAddress::ZERO, 100);
398        assert_eq!(get_coin_balance_bucket(&hundred_coin).unwrap(), 2);
399
400        // Test coin with balance 1000000 (10^6)
401        let million_coin = Object::with_id_owner_gas_for_testing(id, SuiAddress::ZERO, 1000000);
402        assert_eq!(get_coin_balance_bucket(&million_coin).unwrap(), 6);
403
404        // The type of this object is a staked SUI, not a coin.
405        let invalid_coin = unsafe {
406            Object::new_move(
407                MoveObject::new_from_execution(
408                    MoveObjectType::staked_sui(),
409                    false,
410                    SequenceNumber::new(),
411                    bcs::to_bytes(&Object::new_gas_for_testing()).unwrap(),
412                    &ProtocolConfig::get_for_max_version_UNSAFE(),
413                    /* system_mutation */ false,
414                )
415                .unwrap(),
416                Owner::AddressOwner(SuiAddress::ZERO),
417                TransactionDigest::ZERO,
418            )
419        };
420        assert!(get_coin_balance_bucket(&invalid_coin).is_err());
421    }
422
423    #[test]
424    fn test_get_coin_owner() {
425        let id = ObjectID::random();
426        let addr1 = SuiAddress::random_for_testing_only();
427        let addr_owned = Object::with_id_owner_for_testing(id, addr1);
428        assert_eq!(
429            get_coin_owner(&addr_owned),
430            Some((StoredCoinOwnerKind::Fastpath, addr1))
431        );
432
433        // Test object owner (should return None)
434        let obj_owned = Object::with_object_owner_for_testing(id, addr1.into());
435        assert_eq!(get_coin_owner(&obj_owned), None);
436
437        // Test shared owner (should return None)
438        let shared = Object::shared_for_testing();
439        assert_eq!(get_coin_owner(&shared), None);
440
441        // Test immutable owner (should return None)
442        let immutable = Object::immutable_with_id_for_testing(id);
443        assert_eq!(get_coin_owner(&immutable), None);
444
445        let consensus_v2 = Object::with_id_owner_version_for_testing(
446            id,
447            SequenceNumber::new(),
448            Owner::ConsensusAddressOwner {
449                start_version: SequenceNumber::new(),
450                owner: addr1,
451            },
452        );
453        assert_eq!(
454            get_coin_owner(&consensus_v2),
455            Some((StoredCoinOwnerKind::Consensus, addr1))
456        );
457    }
458
459    #[tokio::test]
460    async fn test_process_coin_balance_buckets_new_sui_coin() {
461        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
462        let mut conn = indexer.store().connect().await.unwrap();
463        let mut builder = TestCheckpointBuilder::new(0);
464        builder = builder
465            .start_transaction(0)
466            .create_sui_object(0, 0)
467            .create_sui_object(1, 100)
468            .finish_transaction();
469        let checkpoint = builder.build_checkpoint();
470        let values = CoinBalanceBuckets
471            .process(&Arc::new(checkpoint))
472            .await
473            .unwrap();
474        assert_eq!(values.len(), 2);
475        assert!(values.iter().any(|v| matches!(
476            v.change,
477            CoinBalanceBucketChangeKind::Upsert {
478                owner_kind: StoredCoinOwnerKind::Fastpath,
479                balance_bucket: 0,
480                created: true,
481                ..
482            }
483        )));
484        assert!(values.iter().any(|v| matches!(
485            v.change,
486            CoinBalanceBucketChangeKind::Upsert {
487                owner_kind: StoredCoinOwnerKind::Fastpath,
488                balance_bucket: 2,
489                created: true,
490                ..
491            }
492        )));
493        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
494            .await
495            .unwrap();
496        assert_eq!(rows_inserted, 2);
497        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
498        assert_eq!(all_balance_buckets.len(), 2);
499        let rows_pruned = CoinBalanceBuckets.prune(0, 1, &mut conn).await.unwrap();
500        assert_eq!(rows_pruned, 0);
501    }
502
503    #[tokio::test]
504    async fn test_process_coin_balance_buckets_new_other_coin() {
505        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
506        let mut conn = indexer.store().connect().await.unwrap();
507        let mut builder = TestCheckpointBuilder::new(0);
508        let coin_type = TypeTag::from_str("0x0::a::b").unwrap();
509        builder = builder
510            .start_transaction(0)
511            .create_coin_object(0, 0, 10, coin_type.clone())
512            .finish_transaction();
513        let checkpoint = builder.build_checkpoint();
514        let values = CoinBalanceBuckets
515            .process(&Arc::new(checkpoint))
516            .await
517            .unwrap();
518        assert_eq!(values.len(), 1);
519        assert_eq!(
520            &values[0].change,
521            &CoinBalanceBucketChangeKind::Upsert {
522                owner_kind: StoredCoinOwnerKind::Fastpath,
523                balance_bucket: 1,
524                coin_type: coin_type.clone(),
525                owner_id: TestCheckpointBuilder::derive_address(0),
526                created: true,
527            }
528        );
529        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
530            .await
531            .unwrap();
532        assert_eq!(rows_inserted, 1);
533        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
534        assert_eq!(all_balance_buckets.len(), 1);
535        let rows_pruned = CoinBalanceBuckets.prune(0, 1, &mut conn).await.unwrap();
536        assert_eq!(rows_pruned, 0);
537    }
538
539    #[tokio::test]
540    async fn test_process_coin_balance_buckets_balance_change() {
541        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
542        let mut conn = indexer.store().connect().await.unwrap();
543        let mut builder = TestCheckpointBuilder::new(0);
544        builder = builder
545            .start_transaction(0)
546            .create_sui_object(0, 10010)
547            .finish_transaction();
548        let checkpoint = builder.build_checkpoint();
549        let values = CoinBalanceBuckets
550            .process(&Arc::new(checkpoint))
551            .await
552            .unwrap();
553        assert_eq!(values.len(), 1);
554        // Checkpoint 0 creates coin object 0.
555        assert_eq!(
556            values[0].change,
557            CoinBalanceBucketChangeKind::Upsert {
558                owner_kind: StoredCoinOwnerKind::Fastpath,
559                balance_bucket: 4,
560                coin_type: GAS::type_tag(),
561                owner_id: TestCheckpointBuilder::derive_address(0),
562                created: true,
563            }
564        );
565        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
566            .await
567            .unwrap();
568        assert_eq!(rows_inserted, 1);
569        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
570        assert_eq!(all_balance_buckets.len(), 1);
571
572        // Transfer 10 MIST, balance goes from 10010 to 10000.
573        // The balance bucket for the original coin does not change.
574        // We should only see the creation of the new coin in the processed results.
575        builder = builder
576            .start_transaction(0)
577            .transfer_coin_balance(0, 1, 1, 10)
578            .finish_transaction();
579        let checkpoint = builder.build_checkpoint();
580        let values = CoinBalanceBuckets
581            .process(&Arc::new(checkpoint))
582            .await
583            .unwrap();
584        assert_eq!(values.len(), 1);
585        // Checkpoint 1 creates coin object 1.
586        assert_eq!(
587            values[0].change,
588            CoinBalanceBucketChangeKind::Upsert {
589                owner_kind: StoredCoinOwnerKind::Fastpath,
590                balance_bucket: 1,
591                coin_type: GAS::type_tag(),
592                owner_id: TestCheckpointBuilder::derive_address(1),
593                created: true
594            }
595        );
596        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
597            .await
598            .unwrap();
599        assert_eq!(rows_inserted, 1);
600        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
601        assert_eq!(all_balance_buckets.len(), 2);
602
603        // Nothing to prune because the two coins in the table have not been updated since creation.
604        let rows_pruned = CoinBalanceBuckets.prune(0, 2, &mut conn).await.unwrap();
605        assert_eq!(rows_pruned, 0);
606
607        // Transfer 1 MIST, balance goes from 10000 to 9999.
608        // The balance bucket changes, we should see a change, both for the old owner and the new owner.
609        builder = builder
610            .start_transaction(0)
611            .transfer_coin_balance(0, 2, 1, 1)
612            .finish_transaction();
613        let checkpoint = builder.build_checkpoint();
614        let values = CoinBalanceBuckets
615            .process(&Arc::new(checkpoint))
616            .await
617            .unwrap();
618        assert_eq!(values.len(), 2);
619        // Checkpoint 2 creates coin object 2, and mutates coin object 0.
620        assert!(values.iter().any(|v| v.change
621            == CoinBalanceBucketChangeKind::Upsert {
622                owner_kind: StoredCoinOwnerKind::Fastpath,
623                balance_bucket: 3,
624                coin_type: GAS::type_tag(),
625                owner_id: TestCheckpointBuilder::derive_address(0),
626                created: false,
627            }));
628        assert!(values.iter().any(|v| v.change
629            == CoinBalanceBucketChangeKind::Upsert {
630                owner_kind: StoredCoinOwnerKind::Fastpath,
631                balance_bucket: 0,
632                coin_type: GAS::type_tag(),
633                owner_id: TestCheckpointBuilder::derive_address(1),
634                created: true,
635            }));
636        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
637            .await
638            .unwrap();
639        // 2 inserts to main table, only 1 from the transfer - creations don't emit rows on ref
640        // table.
641        assert_eq!(rows_inserted, 3);
642        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
643        assert_eq!(all_balance_buckets.len(), 4);
644
645        let rows_pruned = CoinBalanceBuckets.prune(2, 3, &mut conn).await.unwrap();
646        assert_eq!(rows_pruned, 2);
647        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
648        assert_eq!(all_balance_buckets.len(), 3);
649        assert_eq!(
650            all_balance_buckets[0],
651            StoredCoinBalanceBucket {
652                object_id: TestCheckpointBuilder::derive_object_id(0).to_vec(),
653                cp_sequence_number: 2,
654                owner_kind: Some(StoredCoinOwnerKind::Fastpath),
655                owner_id: Some(TestCheckpointBuilder::derive_address(0).to_vec()),
656                coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
657                coin_balance_bucket: Some(3),
658            }
659        );
660        assert_eq!(
661            all_balance_buckets[1],
662            StoredCoinBalanceBucket {
663                object_id: TestCheckpointBuilder::derive_object_id(1).to_vec(),
664                cp_sequence_number: 1,
665                owner_kind: Some(StoredCoinOwnerKind::Fastpath),
666                owner_id: Some(TestCheckpointBuilder::derive_address(1).to_vec()),
667                coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
668                coin_balance_bucket: Some(1),
669            }
670        );
671        assert_eq!(
672            all_balance_buckets[2],
673            StoredCoinBalanceBucket {
674                object_id: TestCheckpointBuilder::derive_object_id(2).to_vec(),
675                cp_sequence_number: 2,
676                owner_kind: Some(StoredCoinOwnerKind::Fastpath),
677                owner_id: Some(TestCheckpointBuilder::derive_address(1).to_vec()),
678                coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
679                coin_balance_bucket: Some(0),
680            }
681        );
682    }
683
684    #[tokio::test]
685    async fn test_process_coin_balance_buckets_coin_deleted() {
686        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
687        let mut conn = indexer.store().connect().await.unwrap();
688        let mut builder = TestCheckpointBuilder::new(0);
689        builder = builder
690            .start_transaction(0)
691            .create_owned_object(0)
692            .finish_transaction();
693        let checkpoint = builder.build_checkpoint();
694        let values = CoinBalanceBuckets
695            .process(&Arc::new(checkpoint))
696            .await
697            .unwrap();
698        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
699            .await
700            .unwrap();
701        assert_eq!(rows_inserted, 1);
702
703        builder = builder
704            .start_transaction(0)
705            .delete_object(0)
706            .finish_transaction();
707        let checkpoint = builder.build_checkpoint();
708        let values = CoinBalanceBuckets
709            .process(&Arc::new(checkpoint))
710            .await
711            .unwrap();
712        assert_eq!(values.len(), 1);
713        assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
714        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
715            .await
716            .unwrap();
717        // 1 insertion to main table, 2 to ref table because of delete.
718        assert_eq!(rows_inserted, 3);
719        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
720        assert_eq!(all_balance_buckets.len(), 2);
721        assert_eq!(
722            all_balance_buckets[1],
723            StoredCoinBalanceBucket {
724                object_id: TestCheckpointBuilder::derive_object_id(0).to_vec(),
725                cp_sequence_number: 1,
726                owner_kind: None,
727                owner_id: None,
728                coin_type: None,
729                coin_balance_bucket: None,
730            }
731        );
732
733        let rows_pruned = CoinBalanceBuckets.prune(0, 2, &mut conn).await.unwrap();
734        let sentinel_rows_pruned = CoinBalanceBuckets.prune(2, 3, &mut conn).await.unwrap();
735        assert_eq!(rows_pruned + sentinel_rows_pruned, 4);
736        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
737        assert_eq!(all_balance_buckets.len(), 0);
738    }
739
740    #[tokio::test]
741    async fn test_process_coin_balance_buckets_owner_change() {
742        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
743        let mut conn = indexer.store().connect().await.unwrap();
744        let mut builder = TestCheckpointBuilder::new(0);
745        builder = builder
746            .start_transaction(0)
747            .create_sui_object(0, 100)
748            .finish_transaction();
749        let checkpoint = builder.build_checkpoint();
750        let values = CoinBalanceBuckets
751            .process(&Arc::new(checkpoint))
752            .await
753            .unwrap();
754        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
755            .await
756            .unwrap();
757        assert_eq!(rows_inserted, 1);
758
759        builder = builder
760            .start_transaction(0)
761            .transfer_object(0, 1)
762            .finish_transaction();
763        let checkpoint = builder.build_checkpoint();
764        let values = CoinBalanceBuckets
765            .process(&Arc::new(checkpoint))
766            .await
767            .unwrap();
768        assert_eq!(values.len(), 1);
769        assert_eq!(
770            values[0].change,
771            CoinBalanceBucketChangeKind::Upsert {
772                owner_kind: StoredCoinOwnerKind::Fastpath,
773                balance_bucket: 2,
774                coin_type: GAS::type_tag(),
775                owner_id: TestCheckpointBuilder::derive_address(1),
776                created: false,
777            }
778        );
779        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
780            .await
781            .unwrap();
782        assert_eq!(rows_inserted, 2);
783
784        let rows_pruned = CoinBalanceBuckets.prune(0, 2, &mut conn).await.unwrap();
785        assert_eq!(rows_pruned, 2);
786        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
787        assert_eq!(all_balance_buckets.len(), 1);
788        assert_eq!(
789            all_balance_buckets[0],
790            StoredCoinBalanceBucket {
791                object_id: TestCheckpointBuilder::derive_object_id(0).to_vec(),
792                cp_sequence_number: 1,
793                owner_kind: Some(StoredCoinOwnerKind::Fastpath),
794                owner_id: Some(TestCheckpointBuilder::derive_address(1).to_vec()),
795                coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
796                coin_balance_bucket: Some(2),
797            }
798        );
799    }
800
801    #[tokio::test]
802    async fn test_process_coin_balance_buckets_object_owned() {
803        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
804        let mut conn = indexer.store().connect().await.unwrap();
805        let mut builder = TestCheckpointBuilder::new(0);
806        builder = builder
807            .start_transaction(0)
808            .create_owned_object(0)
809            .finish_transaction();
810        let checkpoint = builder.build_checkpoint();
811        let values = CoinBalanceBuckets
812            .process(&Arc::new(checkpoint))
813            .await
814            .unwrap();
815        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
816            .await
817            .unwrap();
818        assert_eq!(rows_inserted, 1);
819
820        // So this is considered as a delete.
821        builder = builder
822            .start_transaction(0)
823            .change_object_owner(0, Owner::ObjectOwner(dbg_addr(1)))
824            .finish_transaction();
825        let checkpoint = builder.build_checkpoint();
826        let values = CoinBalanceBuckets
827            .process(&Arc::new(checkpoint))
828            .await
829            .unwrap();
830        assert_eq!(values.len(), 1);
831        assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
832        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
833            .await
834            .unwrap();
835        assert_eq!(rows_inserted, 3);
836
837        let rows_pruned = CoinBalanceBuckets.prune(0, 2, &mut conn).await.unwrap();
838        let sentinel_rows_pruned = CoinBalanceBuckets.prune(2, 3, &mut conn).await.unwrap();
839        assert_eq!(rows_pruned + sentinel_rows_pruned, 4);
840        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
841        assert_eq!(all_balance_buckets.len(), 0);
842    }
843
844    #[tokio::test]
845    async fn test_process_coin_balance_buckets_wrap_and_prune_after_unwrap() {
846        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
847        let mut conn = indexer.store().connect().await.unwrap();
848        let mut builder = TestCheckpointBuilder::new(0);
849
850        // Create a coin in checkpoint 0
851        builder = builder
852            .start_transaction(0)
853            .create_sui_object(0, 100)
854            .finish_transaction();
855        let checkpoint = builder.build_checkpoint();
856        let values = CoinBalanceBuckets
857            .process(&Arc::new(checkpoint))
858            .await
859            .unwrap();
860        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
861            .await
862            .unwrap();
863        assert_eq!(rows_inserted, 1);
864
865        // Wrap the coin in checkpoint 1
866        builder = builder
867            .start_transaction(0)
868            .wrap_object(0)
869            .finish_transaction();
870        let checkpoint = builder.build_checkpoint();
871        let values = CoinBalanceBuckets
872            .process(&Arc::new(checkpoint))
873            .await
874            .unwrap();
875        assert_eq!(values.len(), 1);
876        assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
877        // 1 insertion to main table, 2 to ref table because of wrap.
878        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
879            .await
880            .unwrap();
881        assert_eq!(rows_inserted, 3);
882
883        // Unwrap the coin in checkpoint 2
884        builder = builder
885            .start_transaction(0)
886            .unwrap_object(0)
887            .finish_transaction();
888        let checkpoint = builder.build_checkpoint();
889        let values = CoinBalanceBuckets
890            .process(&Arc::new(checkpoint))
891            .await
892            .unwrap();
893        assert_eq!(values.len(), 1);
894        assert_eq!(
895            values[0].change,
896            CoinBalanceBucketChangeKind::Upsert {
897                owner_kind: StoredCoinOwnerKind::Fastpath,
898                balance_bucket: 2,
899                coin_type: GAS::type_tag(),
900                owner_id: TestCheckpointBuilder::derive_address(0),
901                created: true,
902            }
903        );
904        let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
905            .await
906            .unwrap();
907        assert_eq!(rows_inserted, 1);
908
909        // Prune after unwrap
910        let rows_pruned = CoinBalanceBuckets.prune(0, 3, &mut conn).await.unwrap();
911        assert_eq!(rows_pruned, 4);
912
913        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
914        assert_eq!(all_balance_buckets.len(), 1);
915        assert_eq!(all_balance_buckets[0].cp_sequence_number, 2);
916        assert!(all_balance_buckets[0].owner_kind.is_some());
917    }
918
919    /// Three coins are created in checkpoint 0. All are transferred in checkpoint 1, and
920    /// transferred back in checkpoint 2. Prune `[2, 3)` first, then `[1, 2)`, finally `[0, 1)`.
921    #[tokio::test]
922    async fn test_process_coin_balance_buckets_out_of_order_pruning() {
923        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
924        let mut conn = indexer.store().connect().await.unwrap();
925        let mut builder = TestCheckpointBuilder::new(0);
926
927        // Create three coins in checkpoint 0
928        builder = builder
929            .start_transaction(0)
930            .create_sui_object(0, 100)
931            .create_sui_object(1, 1000)
932            .create_sui_object(2, 10000)
933            .finish_transaction();
934        let checkpoint0 = builder.build_checkpoint();
935        let result = CoinBalanceBuckets
936            .process(&Arc::new(checkpoint0))
937            .await
938            .unwrap();
939        assert_eq!(result.len(), 3);
940        let rows_inserted = CoinBalanceBuckets::commit(&result, &mut conn)
941            .await
942            .unwrap();
943        assert_eq!(rows_inserted, 3);
944
945        // Transfer all coins to address 1 in checkpoint 1
946        builder = builder
947            .start_transaction(0)
948            .transfer_object(0, 1)
949            .transfer_object(1, 1)
950            .transfer_object(2, 1)
951            .finish_transaction();
952        let checkpoint1 = builder.build_checkpoint();
953        let result = CoinBalanceBuckets
954            .process(&Arc::new(checkpoint1))
955            .await
956            .unwrap();
957        assert_eq!(result.len(), 3);
958        let rows_inserted = CoinBalanceBuckets::commit(&result, &mut conn)
959            .await
960            .unwrap();
961        assert_eq!(rows_inserted, 6);
962
963        // Transfer all coins back to address 0 in checkpoint 2
964        builder = builder
965            .start_transaction(1)
966            .transfer_object(0, 0)
967            .transfer_object(1, 0)
968            .transfer_object(2, 0)
969            .finish_transaction();
970        let checkpoint2 = builder.build_checkpoint();
971        let result = CoinBalanceBuckets
972            .process(&Arc::new(checkpoint2))
973            .await
974            .unwrap();
975        assert_eq!(result.len(), 3);
976        let rows_inserted = CoinBalanceBuckets::commit(&result, &mut conn)
977            .await
978            .unwrap();
979        assert_eq!(rows_inserted, 6);
980
981        // Each of the 3 coins will have two deletion references, one at cp_sequence_number 1, another at 2.
982        let all_deletion_references = coin_balance_buckets_deletion_reference::table
983            .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
984            .await
985            .unwrap();
986        assert_eq!(all_deletion_references.len(), 6);
987        for reference in &all_deletion_references {
988            assert!(reference.cp_sequence_number == 1 || reference.cp_sequence_number == 2);
989        }
990
991        // Prune [2, 3) first (reverse order)
992        let rows_pruned = CoinBalanceBuckets.prune(2, 3, &mut conn).await.unwrap();
993        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
994        let all_deletion_references = coin_balance_buckets_deletion_reference::table
995            .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
996            .await
997            .unwrap();
998
999        // Each coin should have two entries, with cp_sequence_number being either 0 or 2.
1000        for bucket in &all_balance_buckets {
1001            assert!(bucket.cp_sequence_number == 0 || bucket.cp_sequence_number == 2);
1002        }
1003        assert_eq!(rows_pruned, 6);
1004        assert_eq!(all_balance_buckets.len(), 6);
1005        // References at cp_sequence_number 2 should be pruned.
1006        for reference in &all_deletion_references {
1007            assert!(reference.cp_sequence_number != 2);
1008        }
1009
1010        // Prune [1, 2) next
1011        let rows_pruned = CoinBalanceBuckets.prune(1, 2, &mut conn).await.unwrap();
1012        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
1013        let all_deletion_references = coin_balance_buckets_deletion_reference::table
1014            .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
1015            .await
1016            .unwrap();
1017
1018        // Each coin should have a single entry with cp_sequence_number 2.
1019        for bucket in &all_balance_buckets {
1020            assert_eq!(bucket.cp_sequence_number, 2);
1021        }
1022        assert_eq!(rows_pruned, 6);
1023        assert_eq!(all_balance_buckets.len(), 3);
1024        // References at cp_sequence_number 1 should be pruned.
1025        for reference in &all_deletion_references {
1026            assert_eq!(reference.cp_sequence_number, 0);
1027        }
1028    }
1029
1030    /// Test concurrent pruning operations to ensure thread safety and data consistency.
1031    /// This test creates the same scenario as test_process_coin_balance_buckets_out_of_order_pruning but runs
1032    /// multiple pruning operations concurrently.
1033    #[tokio::test]
1034    async fn test_process_coin_balance_buckets_concurrent_pruning() {
1035        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
1036        let mut conn = indexer.store().connect().await.unwrap();
1037        let mut builder = TestCheckpointBuilder::new(0);
1038
1039        // Create the same scenario as the out-of-order test
1040        builder = builder
1041            .start_transaction(0)
1042            .create_sui_object(0, 100)
1043            .create_sui_object(1, 1000)
1044            .create_sui_object(2, 10000)
1045            .finish_transaction();
1046        let checkpoint0 = builder.build_checkpoint();
1047        let result = CoinBalanceBuckets
1048            .process(&Arc::new(checkpoint0))
1049            .await
1050            .unwrap();
1051        CoinBalanceBuckets::commit(&result, &mut conn)
1052            .await
1053            .unwrap();
1054
1055        builder = builder
1056            .start_transaction(0)
1057            .transfer_object(0, 1)
1058            .transfer_object(1, 1)
1059            .transfer_object(2, 1)
1060            .finish_transaction();
1061        let checkpoint1 = builder.build_checkpoint();
1062        let result = CoinBalanceBuckets
1063            .process(&Arc::new(checkpoint1))
1064            .await
1065            .unwrap();
1066        CoinBalanceBuckets::commit(&result, &mut conn)
1067            .await
1068            .unwrap();
1069
1070        builder = builder
1071            .start_transaction(1)
1072            .transfer_object(0, 0)
1073            .transfer_object(1, 0)
1074            .transfer_object(2, 0)
1075            .finish_transaction();
1076        let checkpoint2 = builder.build_checkpoint();
1077        let result = CoinBalanceBuckets
1078            .process(&Arc::new(checkpoint2))
1079            .await
1080            .unwrap();
1081        CoinBalanceBuckets::commit(&result, &mut conn)
1082            .await
1083            .unwrap();
1084
1085        // Verify initial state
1086        let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
1087        assert_eq!(all_balance_buckets.len(), 9); // 3 coins × 3 checkpoints
1088        let all_deletion_references = coin_balance_buckets_deletion_reference::table
1089            .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
1090            .await
1091            .unwrap();
1092        assert_eq!(all_deletion_references.len(), 6);
1093
1094        // Run concurrent pruning operations
1095        let mut handles = Vec::new();
1096
1097        // Clone the store so each spawned task can own its own connection
1098        let store = indexer.store().clone();
1099
1100        // Spawn pruning [2, 3)
1101        let store1 = store.clone();
1102        handles.push(tokio::spawn(async move {
1103            let mut conn = store1.connect().await.unwrap();
1104            CoinBalanceBuckets.prune(2, 3, &mut conn).await
1105        }));
1106
1107        // Spawn pruning [1, 2)
1108        let store2 = store.clone();
1109        handles.push(tokio::spawn(async move {
1110            let mut conn = store2.connect().await.unwrap();
1111            CoinBalanceBuckets.prune(1, 2, &mut conn).await
1112        }));
1113
1114        // Spawn pruning [0, 1)
1115        let store3 = store.clone();
1116        handles.push(tokio::spawn(async move {
1117            let mut conn = store3.connect().await.unwrap();
1118            CoinBalanceBuckets.prune(0, 1, &mut conn).await
1119        }));
1120
1121        // Wait for all pruning operations to complete
1122        let results: Vec<Result<usize, anyhow::Error>> = futures::future::join_all(handles)
1123            .await
1124            .into_iter()
1125            .map(|r| r.unwrap())
1126            .collect();
1127
1128        // Verify all pruning operations succeeded
1129        for result in &results {
1130            assert!(result.is_ok(), "Pruning operation failed: {:?}", result);
1131        }
1132
1133        // Verify final state is consistent
1134        let final_balance_buckets = get_all_balance_buckets(&mut conn).await;
1135        let final_deletion_references = coin_balance_buckets_deletion_reference::table
1136            .load::<StoredCoinBalanceBucketDeletionReference>(&mut conn)
1137            .await
1138            .unwrap();
1139
1140        // After all pruning, we should have only the latest versions (cp_sequence_number = 2)
1141        assert_eq!(final_balance_buckets.len(), 3);
1142        for bucket in &final_balance_buckets {
1143            assert_eq!(bucket.cp_sequence_number, 2);
1144        }
1145
1146        // All deletion references should be cleaned up
1147        assert_eq!(final_deletion_references.len(), 0);
1148
1149        // Verify the total number of pruned rows matches expectations
1150        let total_pruned: usize = results.into_iter().map(|r| r.unwrap()).sum();
1151        assert_eq!(total_pruned, 12);
1152        for bucket in &final_balance_buckets {
1153            assert_eq!(bucket.cp_sequence_number, 2);
1154        }
1155    }
1156}