sui_deepbook_indexer/
sui_deepbook_indexer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::{Error, anyhow};
5use async_trait::async_trait;
6use diesel::dsl::now;
7use diesel::{ExpressionMethods, TextExpressionMethods};
8use diesel::{OptionalExtension, QueryDsl, SelectableHelper};
9use diesel_async::AsyncConnection;
10use diesel_async::RunQueryDsl;
11use diesel_async::scoped_futures::ScopedFutureExt;
12use sui_indexer_builder::progress::ProgressSavingPolicy;
13use sui_types::base_types::ObjectID;
14use sui_types::transaction::{Command, TransactionDataAPI};
15use tracing::info;
16
17use sui_indexer_builder::indexer_builder::{DataMapper, IndexerProgressStore, Persistent};
18use sui_indexer_builder::sui_datasource::CheckpointTxnData;
19use sui_indexer_builder::{LIVE_TASK_TARGET_CHECKPOINT, Task, Tasks};
20use sui_types::effects::TransactionEffectsAPI;
21use sui_types::event::Event;
22use sui_types::execution_status::ExecutionStatus;
23use sui_types::full_checkpoint_content::CheckpointTransaction;
24
25use crate::events::{
26    MoveBalanceEvent, MoveFlashLoanBorrowedEvent, MoveOrderCanceledEvent, MoveOrderExpiredEvent,
27    MoveOrderFilledEvent, MoveOrderModifiedEvent, MoveOrderPlacedEvent, MovePriceAddedEvent,
28    MoveProposalEvent, MoveRebateEvent, MoveStakeEvent, MoveTradeParamsUpdateEvent, MoveVoteEvent,
29};
30use crate::metrics::DeepBookIndexerMetrics;
31use crate::postgres_manager::PgPool;
32use crate::schema::progress_store::{columns, dsl};
33use crate::schema::{
34    balances, flashloans, order_fills, order_updates, pool_prices, proposals, rebates, stakes,
35    sui_error_transactions, trade_params_update, votes,
36};
37use crate::types::{
38    Balances, Flashloan, OrderFill, OrderUpdate, OrderUpdateStatus, PoolPrice, ProcessedTxnData,
39    Proposals, Rebates, Stakes, SuiTxnError, TradeParamsUpdate, Votes,
40};
41use crate::{models, schema};
42
43/// Persistent layer impl
44#[derive(Clone)]
45pub struct PgDeepbookPersistent {
46    pub pool: PgPool,
47    save_progress_policy: ProgressSavingPolicy,
48}
49
50impl PgDeepbookPersistent {
51    pub fn new(pool: PgPool, save_progress_policy: ProgressSavingPolicy) -> Self {
52        Self {
53            pool,
54            save_progress_policy,
55        }
56    }
57
58    async fn get_largest_backfill_task_target_checkpoint(
59        &self,
60        prefix: &str,
61    ) -> Result<Option<u64>, Error> {
62        let mut conn = self.pool.get().await?;
63        let cp = dsl::progress_store
64            .select(columns::target_checkpoint)
65            // TODO: using like could be error prone, change the progress store schema to stare the task name properly.
66            .filter(columns::task_name.like(format!("{prefix} - %")))
67            .filter(columns::target_checkpoint.ne(i64::MAX))
68            .order_by(columns::target_checkpoint.desc())
69            .first::<i64>(&mut conn)
70            .await
71            .optional()?;
72        Ok(cp.map(|c| c as u64))
73    }
74}
75
76#[async_trait]
77impl Persistent<ProcessedTxnData> for PgDeepbookPersistent {
78    async fn write(&self, data: Vec<ProcessedTxnData>) -> Result<(), Error> {
79        if data.is_empty() {
80            return Ok(());
81        }
82        use futures::future;
83
84        // Group data by type
85        let mut order_updates_batch = vec![];
86        let mut order_fills_batch = vec![];
87        let mut flashloans_batch = vec![];
88        let mut pool_prices_batch = vec![];
89        let mut balances_batch = vec![];
90        let mut proposals_batch = vec![];
91        let mut rebates_batch = vec![];
92        let mut stakes_batch = vec![];
93        let mut trade_params_update_batch = vec![];
94        let mut votes_batch = vec![];
95        let mut error_transactions_batch = vec![];
96
97        // Collect the data into batches
98        for d in data {
99            match d {
100                ProcessedTxnData::OrderUpdate(t) => order_updates_batch.push(t.to_db()),
101                ProcessedTxnData::OrderFill(t) => order_fills_batch.push(t.to_db()),
102                ProcessedTxnData::Flashloan(t) => flashloans_batch.push(t.to_db()),
103                ProcessedTxnData::PoolPrice(t) => pool_prices_batch.push(t.to_db()),
104                ProcessedTxnData::Balances(t) => balances_batch.push(t.to_db()),
105                ProcessedTxnData::Proposals(t) => proposals_batch.push(t.to_db()),
106                ProcessedTxnData::Rebates(t) => rebates_batch.push(t.to_db()),
107                ProcessedTxnData::Stakes(t) => stakes_batch.push(t.to_db()),
108                ProcessedTxnData::TradeParamsUpdate(t) => trade_params_update_batch.push(t.to_db()),
109                ProcessedTxnData::Votes(t) => votes_batch.push(t.to_db()),
110                ProcessedTxnData::Error(e) => error_transactions_batch.push(e.to_db()),
111            }
112        }
113
114        let connection = &mut self.pool.get().await?;
115        connection
116            .transaction(|conn| {
117                async move {
118                    // Create async tasks for each batch insert
119                    let mut tasks = Vec::new();
120
121                    if !order_updates_batch.is_empty() {
122                        tasks.push(
123                            diesel::insert_into(order_updates::table)
124                                .values(&order_updates_batch)
125                                .on_conflict_do_nothing()
126                                .execute(conn),
127                        );
128                    }
129                    if !order_fills_batch.is_empty() {
130                        tasks.push(
131                            diesel::insert_into(order_fills::table)
132                                .values(&order_fills_batch)
133                                .on_conflict_do_nothing()
134                                .execute(conn),
135                        );
136                    }
137                    if !flashloans_batch.is_empty() {
138                        tasks.push(
139                            diesel::insert_into(flashloans::table)
140                                .values(&flashloans_batch)
141                                .on_conflict_do_nothing()
142                                .execute(conn),
143                        );
144                    }
145                    if !pool_prices_batch.is_empty() {
146                        tasks.push(
147                            diesel::insert_into(pool_prices::table)
148                                .values(&pool_prices_batch)
149                                .on_conflict_do_nothing()
150                                .execute(conn),
151                        );
152                    }
153                    if !balances_batch.is_empty() {
154                        tasks.push(
155                            diesel::insert_into(balances::table)
156                                .values(&balances_batch)
157                                .on_conflict_do_nothing()
158                                .execute(conn),
159                        );
160                    }
161                    if !proposals_batch.is_empty() {
162                        tasks.push(
163                            diesel::insert_into(proposals::table)
164                                .values(&proposals_batch)
165                                .on_conflict_do_nothing()
166                                .execute(conn),
167                        );
168                    }
169                    if !rebates_batch.is_empty() {
170                        tasks.push(
171                            diesel::insert_into(rebates::table)
172                                .values(&rebates_batch)
173                                .on_conflict_do_nothing()
174                                .execute(conn),
175                        );
176                    }
177                    if !stakes_batch.is_empty() {
178                        tasks.push(
179                            diesel::insert_into(stakes::table)
180                                .values(&stakes_batch)
181                                .on_conflict_do_nothing()
182                                .execute(conn),
183                        );
184                    }
185                    if !trade_params_update_batch.is_empty() {
186                        tasks.push(
187                            diesel::insert_into(trade_params_update::table)
188                                .values(&trade_params_update_batch)
189                                .on_conflict_do_nothing()
190                                .execute(conn),
191                        );
192                    }
193                    if !votes_batch.is_empty() {
194                        tasks.push(
195                            diesel::insert_into(votes::table)
196                                .values(&votes_batch)
197                                .on_conflict_do_nothing()
198                                .execute(conn),
199                        );
200                    }
201                    if !error_transactions_batch.is_empty() {
202                        tasks.push(
203                            diesel::insert_into(sui_error_transactions::table)
204                                .values(&error_transactions_batch)
205                                .on_conflict_do_nothing()
206                                .execute(conn),
207                        );
208                    }
209
210                    // Execute all tasks concurrently
211                    let _: Vec<_> = future::try_join_all(tasks).await?;
212
213                    Ok(())
214                }
215                .scope_boxed()
216            })
217            .await
218    }
219}
220
221#[async_trait]
222impl IndexerProgressStore for PgDeepbookPersistent {
223    async fn load_progress(&self, task_name: String) -> anyhow::Result<u64> {
224        let mut conn = self.pool.get().await?;
225        let cp: Option<models::ProgressStore> = dsl::progress_store
226            .find(&task_name)
227            .select(models::ProgressStore::as_select())
228            .first(&mut conn)
229            .await
230            .optional()?;
231        Ok(cp
232            .ok_or(anyhow!("Cannot found progress for task {task_name}"))?
233            .checkpoint as u64)
234    }
235
236    async fn save_progress(
237        &mut self,
238        task: &Task,
239        checkpoint_numbers: &[u64],
240    ) -> anyhow::Result<Option<u64>> {
241        if checkpoint_numbers.is_empty() {
242            return Ok(None);
243        }
244        let task_name = task.task_name.clone();
245        if let Some(checkpoint_to_save) = self
246            .save_progress_policy
247            .cache_progress(task, checkpoint_numbers)
248        {
249            let mut conn = self.pool.get().await?;
250            diesel::insert_into(schema::progress_store::table)
251                .values(&models::ProgressStore {
252                    task_name,
253                    checkpoint: checkpoint_to_save as i64,
254                    // Target checkpoint and timestamp will only be written for new entries
255                    target_checkpoint: i64::MAX,
256                    // Timestamp is defaulted to current time in DB if None
257                    timestamp: None,
258                })
259                .on_conflict(dsl::task_name)
260                .do_update()
261                .set((
262                    columns::checkpoint.eq(checkpoint_to_save as i64),
263                    columns::timestamp.eq(now),
264                ))
265                .execute(&mut conn)
266                .await?;
267            // TODO: add metrics here
268            return Ok(Some(checkpoint_to_save));
269        }
270        Ok(None)
271    }
272
273    async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Tasks, anyhow::Error> {
274        let mut conn = self.pool.get().await?;
275        // get all unfinished tasks
276        let cp: Vec<models::ProgressStore> = dsl::progress_store
277            // TODO: using like could be error prone, change the progress store schema to stare the task name properly.
278            .filter(columns::task_name.like(format!("{prefix} - %")))
279            .filter(columns::checkpoint.lt(columns::target_checkpoint))
280            .order_by(columns::target_checkpoint.desc())
281            .load(&mut conn)
282            .await?;
283        let tasks = cp.into_iter().map(|d| d.into()).collect();
284        Ok(Tasks::new(tasks)?)
285    }
286
287    async fn get_largest_indexed_checkpoint(&self, prefix: &str) -> Result<Option<u64>, Error> {
288        let mut conn = self.pool.get().await?;
289        let cp = dsl::progress_store
290            .select(columns::checkpoint)
291            // TODO: using like could be error prone, change the progress store schema to stare the task name properly.
292            .filter(columns::task_name.like(format!("{prefix} - %")))
293            .filter(columns::target_checkpoint.eq(i64::MAX))
294            .first::<i64>(&mut conn)
295            .await
296            .optional()?;
297
298        if let Some(cp) = cp {
299            Ok(Some(cp as u64))
300        } else {
301            // Use the largest backfill target checkpoint as a fallback
302            self.get_largest_backfill_task_target_checkpoint(prefix)
303                .await
304        }
305    }
306
307    async fn register_task(
308        &mut self,
309        task_name: String,
310        checkpoint: u64,
311        target_checkpoint: u64,
312    ) -> Result<(), anyhow::Error> {
313        let mut conn = self.pool.get().await?;
314        diesel::insert_into(schema::progress_store::table)
315            .values(models::ProgressStore {
316                task_name,
317                checkpoint: checkpoint as i64,
318                target_checkpoint: target_checkpoint as i64,
319                // Timestamp is defaulted to current time in DB if None
320                timestamp: None,
321            })
322            .execute(&mut conn)
323            .await?;
324        Ok(())
325    }
326
327    /// Register a live task to progress store with a start checkpoint.
328    async fn register_live_task(
329        &mut self,
330        task_name: String,
331        start_checkpoint: u64,
332    ) -> Result<(), anyhow::Error> {
333        let mut conn = self.pool.get().await?;
334        diesel::insert_into(schema::progress_store::table)
335            .values(models::ProgressStore {
336                task_name,
337                checkpoint: start_checkpoint as i64,
338                target_checkpoint: LIVE_TASK_TARGET_CHECKPOINT,
339                // Timestamp is defaulted to current time in DB if None
340                timestamp: None,
341            })
342            .execute(&mut conn)
343            .await?;
344        Ok(())
345    }
346
347    async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
348        let mut conn = self.pool.get().await?;
349        diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
350            .set((
351                columns::checkpoint.eq(task.start_checkpoint as i64),
352                columns::target_checkpoint.eq(task.target_checkpoint as i64),
353                columns::timestamp.eq(now),
354            ))
355            .execute(&mut conn)
356            .await?;
357        Ok(())
358    }
359}
360
361/// Data mapper impl
362#[derive(Clone)]
363pub struct SuiDeepBookDataMapper {
364    pub metrics: DeepBookIndexerMetrics,
365    pub package_id: ObjectID,
366}
367
368impl DataMapper<CheckpointTxnData, ProcessedTxnData> for SuiDeepBookDataMapper {
369    fn map(
370        &self,
371        (data, checkpoint_num, timestamp_ms): CheckpointTxnData,
372    ) -> Result<Vec<ProcessedTxnData>, Error> {
373        if !data.input_objects.iter().any(|obj| {
374            obj.data
375                .type_()
376                .map(|t| t.address() == self.package_id.into())
377                .unwrap_or_default()
378        }) {
379            return Ok(vec![]);
380        }
381
382        self.metrics.total_deepbook_transactions.inc();
383
384        match &data.events {
385            Some(events) => {
386                let processed_sui_events =
387                    events
388                        .data
389                        .iter()
390                        .enumerate()
391                        .try_fold(vec![], |mut result, (i, ev)| {
392                            if let Some(data) = process_sui_event(
393                                ev,
394                                i,
395                                &data,
396                                checkpoint_num,
397                                timestamp_ms,
398                                self.package_id,
399                            )? {
400                                result.push(data);
401                            }
402                            Ok::<_, anyhow::Error>(result)
403                        })?;
404                if !processed_sui_events.is_empty() {
405                    info!(
406                        "SUI: Extracted {} deepbook data entries for tx {}.",
407                        processed_sui_events.len(),
408                        data.transaction.digest()
409                    );
410                }
411                Ok(processed_sui_events)
412            }
413            None => {
414                if let ExecutionStatus::Failure { error, command } = data.effects.status() {
415                    let txn_kind = data.transaction.transaction_data().clone().into_kind();
416                    let first_command = txn_kind.iter_commands().next();
417                    let package = if let Some(Command::MoveCall(move_call)) = first_command {
418                        move_call.package.to_string()
419                    } else {
420                        "".to_string()
421                    };
422                    Ok(vec![ProcessedTxnData::Error(SuiTxnError {
423                        tx_digest: *data.transaction.digest(),
424                        sender: data.transaction.sender_address(),
425                        timestamp_ms,
426                        failure_status: error.to_string(),
427                        package,
428                        cmd_idx: command.map(|idx| idx as u64),
429                    })])
430                } else {
431                    Ok(vec![])
432                }
433            }
434        }
435    }
436}
437
438fn process_sui_event(
439    ev: &Event,
440    event_index: usize,
441    tx: &CheckpointTransaction,
442    checkpoint: u64,
443    checkpoint_timestamp_ms: u64,
444    package_id: ObjectID,
445) -> Result<Option<ProcessedTxnData>, anyhow::Error> {
446    Ok(if ev.type_.address == *package_id {
447        match ev.type_.name.as_str() {
448            "OrderPlaced" => {
449                let move_event: MoveOrderPlacedEvent = bcs::from_bytes(&ev.contents)?;
450                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
451                let first_command = txn_kind.iter_commands().next();
452                let package = if let Some(Command::MoveCall(move_call)) = first_command {
453                    move_call.package.to_string()
454                } else {
455                    "".to_string()
456                };
457                let mut event_digest = tx.transaction.digest().to_string();
458                event_digest.push_str(&event_index.to_string());
459                let txn_data = Some(ProcessedTxnData::OrderUpdate(OrderUpdate {
460                    digest: tx.transaction.digest().to_string(),
461                    sender: tx.transaction.sender_address().to_string(),
462                    event_digest,
463                    checkpoint,
464                    checkpoint_timestamp_ms,
465                    package,
466                    status: OrderUpdateStatus::Placed,
467                    pool_id: move_event.pool_id.to_string(),
468                    order_id: move_event.order_id,
469                    client_order_id: move_event.client_order_id,
470                    price: move_event.price,
471                    is_bid: move_event.is_bid,
472                    onchain_timestamp: move_event.timestamp,
473                    original_quantity: move_event.placed_quantity,
474                    quantity: move_event.placed_quantity,
475                    filled_quantity: 0,
476                    trader: move_event.trader.to_string(),
477                    balance_manager_id: move_event.balance_manager_id.to_string(),
478                }));
479                info!("Observed Deepbook Order Placed {:?}", txn_data);
480
481                txn_data
482            }
483            "OrderModified" => {
484                let move_event: MoveOrderModifiedEvent = bcs::from_bytes(&ev.contents)?;
485                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
486                let first_command = txn_kind.iter_commands().next();
487                let package = if let Some(Command::MoveCall(move_call)) = first_command {
488                    move_call.package.to_string()
489                } else {
490                    "".to_string()
491                };
492                let mut event_digest = tx.transaction.digest().to_string();
493                event_digest.push_str(&event_index.to_string());
494                let txn_data = Some(ProcessedTxnData::OrderUpdate(OrderUpdate {
495                    digest: tx.transaction.digest().to_string(),
496                    event_digest,
497                    sender: tx.transaction.sender_address().to_string(),
498                    checkpoint,
499                    checkpoint_timestamp_ms,
500                    package,
501                    status: OrderUpdateStatus::Modified,
502                    pool_id: move_event.pool_id.to_string(),
503                    order_id: move_event.order_id,
504                    client_order_id: move_event.client_order_id,
505                    price: move_event.price,
506                    is_bid: move_event.is_bid,
507                    onchain_timestamp: move_event.timestamp,
508                    original_quantity: move_event.previous_quantity,
509                    quantity: move_event.new_quantity,
510                    filled_quantity: move_event.filled_quantity,
511                    trader: move_event.trader.to_string(),
512                    balance_manager_id: move_event.balance_manager_id.to_string(),
513                }));
514                info!("Observed Deepbook Order Modified {:?}", txn_data);
515
516                txn_data
517            }
518            "OrderCanceled" => {
519                let move_event: MoveOrderCanceledEvent = bcs::from_bytes(&ev.contents)?;
520                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
521                let first_command = txn_kind.iter_commands().next();
522                let package = if let Some(Command::MoveCall(move_call)) = first_command {
523                    move_call.package.to_string()
524                } else {
525                    "".to_string()
526                };
527                let mut event_digest = tx.transaction.digest().to_string();
528                event_digest.push_str(&event_index.to_string());
529                let txn_data = Some(ProcessedTxnData::OrderUpdate(OrderUpdate {
530                    digest: tx.transaction.digest().to_string(),
531                    event_digest,
532                    sender: tx.transaction.sender_address().to_string(),
533                    checkpoint,
534                    checkpoint_timestamp_ms,
535                    package,
536                    status: OrderUpdateStatus::Canceled,
537                    pool_id: move_event.pool_id.to_string(),
538                    order_id: move_event.order_id,
539                    client_order_id: move_event.client_order_id,
540                    price: move_event.price,
541                    is_bid: move_event.is_bid,
542                    onchain_timestamp: move_event.timestamp,
543                    original_quantity: move_event.original_quantity,
544                    quantity: move_event.base_asset_quantity_canceled,
545                    filled_quantity: move_event.original_quantity
546                        - move_event.base_asset_quantity_canceled,
547                    trader: move_event.trader.to_string(),
548                    balance_manager_id: move_event.balance_manager_id.to_string(),
549                }));
550                info!("Observed Deepbook Order Canceled {:?}", txn_data);
551
552                txn_data
553            }
554            "OrderExpired" => {
555                let move_event: MoveOrderExpiredEvent = bcs::from_bytes(&ev.contents)?;
556                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
557                let first_command = txn_kind.iter_commands().next();
558                let package = if let Some(Command::MoveCall(move_call)) = first_command {
559                    move_call.package.to_string()
560                } else {
561                    "".to_string()
562                };
563                let mut event_digest = tx.transaction.digest().to_string();
564                event_digest.push_str(&event_index.to_string());
565                let txn_data = Some(ProcessedTxnData::OrderUpdate(OrderUpdate {
566                    digest: tx.transaction.digest().to_string(),
567                    event_digest,
568                    sender: tx.transaction.sender_address().to_string(),
569                    checkpoint,
570                    checkpoint_timestamp_ms,
571                    package,
572                    status: OrderUpdateStatus::Expired,
573                    pool_id: move_event.pool_id.to_string(),
574                    order_id: move_event.order_id,
575                    client_order_id: move_event.client_order_id,
576                    price: move_event.price,
577                    is_bid: move_event.is_bid,
578                    onchain_timestamp: move_event.timestamp,
579                    original_quantity: move_event.original_quantity,
580                    quantity: move_event.base_asset_quantity_canceled,
581                    filled_quantity: move_event.original_quantity
582                        - move_event.base_asset_quantity_canceled,
583                    trader: move_event.trader.to_string(),
584                    balance_manager_id: move_event.balance_manager_id.to_string(),
585                }));
586                info!("Observed Deepbook Order Expired {:?}", txn_data);
587
588                txn_data
589            }
590            "OrderFilled" => {
591                let move_event: MoveOrderFilledEvent = bcs::from_bytes(&ev.contents)?;
592                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
593                let first_command = txn_kind.iter_commands().next();
594                let package = if let Some(Command::MoveCall(move_call)) = first_command {
595                    move_call.package.to_string()
596                } else {
597                    "".to_string()
598                };
599                let mut event_digest = tx.transaction.digest().to_string();
600                event_digest.push_str(&event_index.to_string());
601                let txn_data = Some(ProcessedTxnData::OrderFill(OrderFill {
602                    digest: tx.transaction.digest().to_string(),
603                    event_digest,
604                    sender: tx.transaction.sender_address().to_string(),
605                    checkpoint,
606                    checkpoint_timestamp_ms,
607                    package,
608                    pool_id: move_event.pool_id.to_string(),
609                    maker_order_id: move_event.maker_order_id,
610                    taker_order_id: move_event.taker_order_id,
611                    maker_client_order_id: move_event.maker_client_order_id,
612                    taker_client_order_id: move_event.taker_client_order_id,
613                    price: move_event.price,
614                    taker_is_bid: move_event.taker_is_bid,
615                    taker_fee: move_event.taker_fee,
616                    taker_fee_is_deep: move_event.taker_fee_is_deep,
617                    maker_fee: move_event.maker_fee,
618                    maker_fee_is_deep: move_event.maker_fee_is_deep,
619                    base_quantity: move_event.base_quantity,
620                    quote_quantity: move_event.quote_quantity,
621                    maker_balance_manager_id: move_event.maker_balance_manager_id.to_string(),
622                    taker_balance_manager_id: move_event.taker_balance_manager_id.to_string(),
623                    onchain_timestamp: move_event.timestamp,
624                }));
625                info!("Observed Deepbook Order Filled {:?}", txn_data);
626
627                txn_data
628            }
629            "FlashLoanBorrowed" => {
630                let move_event: MoveFlashLoanBorrowedEvent = bcs::from_bytes(&ev.contents)?;
631                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
632                let first_command = txn_kind.iter_commands().next();
633                let package = if let Some(Command::MoveCall(move_call)) = first_command {
634                    move_call.package.to_string()
635                } else {
636                    "".to_string()
637                };
638                let mut event_digest = tx.transaction.digest().to_string();
639                event_digest.push_str(&event_index.to_string());
640                let txn_data = Some(ProcessedTxnData::Flashloan(Flashloan {
641                    digest: tx.transaction.digest().to_string(),
642                    event_digest,
643                    sender: tx.transaction.sender_address().to_string(),
644                    checkpoint,
645                    checkpoint_timestamp_ms,
646                    package,
647                    pool_id: move_event.pool_id.to_string(),
648                    borrow_quantity: move_event.borrow_quantity,
649                    borrow: true,
650                    type_name: move_event.type_name.to_string(),
651                }));
652                info!("Observed Deepbook Flash Loan Borrowed {:?}", txn_data);
653
654                txn_data
655            }
656            "PriceAdded" => {
657                let move_event: MovePriceAddedEvent = bcs::from_bytes(&ev.contents)?;
658                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
659                let first_command = txn_kind.iter_commands().next();
660                let package = if let Some(Command::MoveCall(move_call)) = first_command {
661                    move_call.package.to_string()
662                } else {
663                    "".to_string()
664                };
665                let mut event_digest = tx.transaction.digest().to_string();
666                event_digest.push_str(&event_index.to_string());
667                let txn_data = Some(ProcessedTxnData::PoolPrice(PoolPrice {
668                    digest: tx.transaction.digest().to_string(),
669                    event_digest,
670                    sender: tx.transaction.sender_address().to_string(),
671                    checkpoint,
672                    checkpoint_timestamp_ms,
673                    package,
674                    target_pool: move_event.target_pool.to_string(),
675                    conversion_rate: move_event.conversion_rate,
676                    reference_pool: move_event.reference_pool.to_string(),
677                }));
678                info!("Observed Deepbook Price Addition {:?}", txn_data);
679
680                txn_data
681            }
682            "BalanceEvent" => {
683                let move_event: MoveBalanceEvent = bcs::from_bytes(&ev.contents)?;
684                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
685                let first_command = txn_kind.iter_commands().next();
686                let package = if let Some(Command::MoveCall(move_call)) = first_command {
687                    move_call.package.to_string()
688                } else {
689                    "".to_string()
690                };
691                let mut event_digest = tx.transaction.digest().to_string();
692                event_digest.push_str(&event_index.to_string());
693                let txn_data = Some(ProcessedTxnData::Balances(Balances {
694                    digest: tx.transaction.digest().to_string(),
695                    event_digest,
696                    sender: tx.transaction.sender_address().to_string(),
697                    checkpoint,
698                    checkpoint_timestamp_ms,
699                    package,
700                    balance_manager_id: move_event.balance_manager_id.to_string(),
701                    asset: move_event.asset.to_string(),
702                    amount: move_event.amount,
703                    deposit: move_event.deposit,
704                }));
705                info!("Observed Deepbook Balance Event {:?}", txn_data);
706
707                txn_data
708            }
709            "ProposalEvent" => {
710                let move_event: MoveProposalEvent = bcs::from_bytes(&ev.contents)?;
711                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
712                let first_command = txn_kind.iter_commands().next();
713                let package = if let Some(Command::MoveCall(move_call)) = first_command {
714                    move_call.package.to_string()
715                } else {
716                    "".to_string()
717                };
718                let mut event_digest = tx.transaction.digest().to_string();
719                event_digest.push_str(&event_index.to_string());
720                let txn_data = Some(ProcessedTxnData::Proposals(Proposals {
721                    digest: tx.transaction.digest().to_string(),
722                    event_digest,
723                    sender: tx.transaction.sender_address().to_string(),
724                    checkpoint,
725                    checkpoint_timestamp_ms,
726                    package,
727                    pool_id: move_event.pool_id.to_string(),
728                    balance_manager_id: move_event.balance_manager_id.to_string(),
729                    epoch: move_event.epoch,
730                    taker_fee: move_event.taker_fee,
731                    maker_fee: move_event.maker_fee,
732                    stake_required: move_event.stake_required,
733                }));
734                info!("Observed Deepbook Proposal Event {:?}", txn_data);
735
736                txn_data
737            }
738            "RebateEvent" => {
739                let move_event: MoveRebateEvent = bcs::from_bytes(&ev.contents)?;
740                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
741                let first_command = txn_kind.iter_commands().next();
742                let package = if let Some(Command::MoveCall(move_call)) = first_command {
743                    move_call.package.to_string()
744                } else {
745                    "".to_string()
746                };
747                let mut event_digest = tx.transaction.digest().to_string();
748                event_digest.push_str(&event_index.to_string());
749                let txn_data = Some(ProcessedTxnData::Rebates(Rebates {
750                    digest: tx.transaction.digest().to_string(),
751                    event_digest,
752                    sender: tx.transaction.sender_address().to_string(),
753                    checkpoint,
754                    checkpoint_timestamp_ms,
755                    package,
756                    pool_id: move_event.pool_id.to_string(),
757                    balance_manager_id: move_event.balance_manager_id.to_string(),
758                    epoch: move_event.epoch,
759                    claim_amount: move_event.claim_amount,
760                }));
761                info!("Observed Deepbook Rebate Event {:?}", txn_data);
762
763                txn_data
764            }
765            "StakeEvent" => {
766                let move_event: MoveStakeEvent = bcs::from_bytes(&ev.contents)?;
767                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
768                let first_command = txn_kind.iter_commands().next();
769                let package = if let Some(Command::MoveCall(move_call)) = first_command {
770                    move_call.package.to_string()
771                } else {
772                    "".to_string()
773                };
774                let mut event_digest = tx.transaction.digest().to_string();
775                event_digest.push_str(&event_index.to_string());
776                let txn_data = Some(ProcessedTxnData::Stakes(Stakes {
777                    digest: tx.transaction.digest().to_string(),
778                    event_digest,
779                    sender: tx.transaction.sender_address().to_string(),
780                    checkpoint,
781                    checkpoint_timestamp_ms,
782                    package,
783                    pool_id: move_event.pool_id.to_string(),
784                    balance_manager_id: move_event.balance_manager_id.to_string(),
785                    epoch: move_event.epoch,
786                    amount: move_event.amount,
787                    stake: move_event.stake,
788                }));
789                info!("Observed Deepbook Stake Event {:?}", txn_data);
790
791                txn_data
792            }
793            "TradeParamsUpdateEvent" => {
794                let move_event: MoveTradeParamsUpdateEvent = bcs::from_bytes(&ev.contents)?;
795                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
796                let first_command = txn_kind.iter_commands().next();
797                let package = if let Some(Command::MoveCall(move_call)) = first_command {
798                    move_call.package.to_string()
799                } else {
800                    "".to_string()
801                };
802                let mut event_digest = tx.transaction.digest().to_string();
803                event_digest.push_str(&event_index.to_string());
804                let shared_objects = &tx.input_objects;
805                let mut pool_id = "0x0".to_string();
806                for obj in shared_objects.iter() {
807                    if let Some(obj_type) = obj.data.type_()
808                        && obj_type.module().to_string().eq("pool")
809                        && obj_type.address() == *package_id
810                    {
811                        pool_id = obj_type.address().to_string();
812                        break;
813                    }
814                }
815                let txn_data = Some(ProcessedTxnData::TradeParamsUpdate(TradeParamsUpdate {
816                    digest: tx.transaction.digest().to_string(),
817                    event_digest,
818                    sender: tx.transaction.sender_address().to_string(),
819                    checkpoint,
820                    checkpoint_timestamp_ms,
821                    package,
822                    pool_id,
823                    taker_fee: move_event.taker_fee,
824                    maker_fee: move_event.maker_fee,
825                    stake_required: move_event.stake_required,
826                }));
827                info!("Observed Deepbook Trade Params Update Event {:?}", txn_data);
828
829                txn_data
830            }
831            "VoteEvent" => {
832                let move_event: MoveVoteEvent = bcs::from_bytes(&ev.contents)?;
833                let txn_kind = tx.transaction.transaction_data().clone().into_kind();
834                let first_command = txn_kind.iter_commands().next();
835                let package = if let Some(Command::MoveCall(move_call)) = first_command {
836                    move_call.package.to_string()
837                } else {
838                    "".to_string()
839                };
840                let mut event_digest = tx.transaction.digest().to_string();
841                event_digest.push_str(&event_index.to_string());
842                let txn_data = Some(ProcessedTxnData::Votes(Votes {
843                    digest: tx.transaction.digest().to_string(),
844                    event_digest,
845                    sender: tx.transaction.sender_address().to_string(),
846                    checkpoint,
847                    checkpoint_timestamp_ms,
848                    package,
849                    pool_id: move_event.pool_id.to_string(),
850                    balance_manager_id: move_event.balance_manager_id.to_string(),
851                    epoch: move_event.epoch,
852                    from_proposal_id: move_event.from_proposal_id.map(|id| id.to_string()),
853                    to_proposal_id: move_event.to_proposal_id.to_string(),
854                    stake: move_event.stake,
855                }));
856                info!("Observed Deepbook Vote Event {:?}", txn_data);
857
858                txn_data
859            }
860            _ => {
861                // todo: metrics.total_sui_bridge_txn_other.inc();
862                None
863            }
864        }
865    } else {
866        None
867    })
868}