1use anyhow::{Error, anyhow};
5use async_trait::async_trait;
6use diesel::dsl::now;
7use diesel::{ExpressionMethods, TextExpressionMethods};
8use diesel::{OptionalExtension, QueryDsl, SelectableHelper};
9use diesel_async::AsyncConnection;
10use diesel_async::RunQueryDsl;
11use diesel_async::scoped_futures::ScopedFutureExt;
12use sui_indexer_builder::progress::ProgressSavingPolicy;
13use sui_types::base_types::ObjectID;
14use sui_types::transaction::{Command, TransactionDataAPI};
15use tracing::info;
16
17use sui_indexer_builder::indexer_builder::{DataMapper, IndexerProgressStore, Persistent};
18use sui_indexer_builder::sui_datasource::CheckpointTxnData;
19use sui_indexer_builder::{LIVE_TASK_TARGET_CHECKPOINT, Task, Tasks};
20use sui_types::effects::TransactionEffectsAPI;
21use sui_types::event::Event;
22use sui_types::execution_status::ExecutionStatus;
23use sui_types::full_checkpoint_content::CheckpointTransaction;
24
25use crate::events::{
26 MoveBalanceEvent, MoveFlashLoanBorrowedEvent, MoveOrderCanceledEvent, MoveOrderExpiredEvent,
27 MoveOrderFilledEvent, MoveOrderModifiedEvent, MoveOrderPlacedEvent, MovePriceAddedEvent,
28 MoveProposalEvent, MoveRebateEvent, MoveStakeEvent, MoveTradeParamsUpdateEvent, MoveVoteEvent,
29};
30use crate::metrics::DeepBookIndexerMetrics;
31use crate::postgres_manager::PgPool;
32use crate::schema::progress_store::{columns, dsl};
33use crate::schema::{
34 balances, flashloans, order_fills, order_updates, pool_prices, proposals, rebates, stakes,
35 sui_error_transactions, trade_params_update, votes,
36};
37use crate::types::{
38 Balances, Flashloan, OrderFill, OrderUpdate, OrderUpdateStatus, PoolPrice, ProcessedTxnData,
39 Proposals, Rebates, Stakes, SuiTxnError, TradeParamsUpdate, Votes,
40};
41use crate::{models, schema};
42
43#[derive(Clone)]
45pub struct PgDeepbookPersistent {
46 pub pool: PgPool,
47 save_progress_policy: ProgressSavingPolicy,
48}
49
50impl PgDeepbookPersistent {
51 pub fn new(pool: PgPool, save_progress_policy: ProgressSavingPolicy) -> Self {
52 Self {
53 pool,
54 save_progress_policy,
55 }
56 }
57
58 async fn get_largest_backfill_task_target_checkpoint(
59 &self,
60 prefix: &str,
61 ) -> Result<Option<u64>, Error> {
62 let mut conn = self.pool.get().await?;
63 let cp = dsl::progress_store
64 .select(columns::target_checkpoint)
65 .filter(columns::task_name.like(format!("{prefix} - %")))
67 .filter(columns::target_checkpoint.ne(i64::MAX))
68 .order_by(columns::target_checkpoint.desc())
69 .first::<i64>(&mut conn)
70 .await
71 .optional()?;
72 Ok(cp.map(|c| c as u64))
73 }
74}
75
76#[async_trait]
77impl Persistent<ProcessedTxnData> for PgDeepbookPersistent {
78 async fn write(&self, data: Vec<ProcessedTxnData>) -> Result<(), Error> {
79 if data.is_empty() {
80 return Ok(());
81 }
82 use futures::future;
83
84 let mut order_updates_batch = vec![];
86 let mut order_fills_batch = vec![];
87 let mut flashloans_batch = vec![];
88 let mut pool_prices_batch = vec![];
89 let mut balances_batch = vec![];
90 let mut proposals_batch = vec![];
91 let mut rebates_batch = vec![];
92 let mut stakes_batch = vec![];
93 let mut trade_params_update_batch = vec![];
94 let mut votes_batch = vec![];
95 let mut error_transactions_batch = vec![];
96
97 for d in data {
99 match d {
100 ProcessedTxnData::OrderUpdate(t) => order_updates_batch.push(t.to_db()),
101 ProcessedTxnData::OrderFill(t) => order_fills_batch.push(t.to_db()),
102 ProcessedTxnData::Flashloan(t) => flashloans_batch.push(t.to_db()),
103 ProcessedTxnData::PoolPrice(t) => pool_prices_batch.push(t.to_db()),
104 ProcessedTxnData::Balances(t) => balances_batch.push(t.to_db()),
105 ProcessedTxnData::Proposals(t) => proposals_batch.push(t.to_db()),
106 ProcessedTxnData::Rebates(t) => rebates_batch.push(t.to_db()),
107 ProcessedTxnData::Stakes(t) => stakes_batch.push(t.to_db()),
108 ProcessedTxnData::TradeParamsUpdate(t) => trade_params_update_batch.push(t.to_db()),
109 ProcessedTxnData::Votes(t) => votes_batch.push(t.to_db()),
110 ProcessedTxnData::Error(e) => error_transactions_batch.push(e.to_db()),
111 }
112 }
113
114 let connection = &mut self.pool.get().await?;
115 connection
116 .transaction(|conn| {
117 async move {
118 let mut tasks = Vec::new();
120
121 if !order_updates_batch.is_empty() {
122 tasks.push(
123 diesel::insert_into(order_updates::table)
124 .values(&order_updates_batch)
125 .on_conflict_do_nothing()
126 .execute(conn),
127 );
128 }
129 if !order_fills_batch.is_empty() {
130 tasks.push(
131 diesel::insert_into(order_fills::table)
132 .values(&order_fills_batch)
133 .on_conflict_do_nothing()
134 .execute(conn),
135 );
136 }
137 if !flashloans_batch.is_empty() {
138 tasks.push(
139 diesel::insert_into(flashloans::table)
140 .values(&flashloans_batch)
141 .on_conflict_do_nothing()
142 .execute(conn),
143 );
144 }
145 if !pool_prices_batch.is_empty() {
146 tasks.push(
147 diesel::insert_into(pool_prices::table)
148 .values(&pool_prices_batch)
149 .on_conflict_do_nothing()
150 .execute(conn),
151 );
152 }
153 if !balances_batch.is_empty() {
154 tasks.push(
155 diesel::insert_into(balances::table)
156 .values(&balances_batch)
157 .on_conflict_do_nothing()
158 .execute(conn),
159 );
160 }
161 if !proposals_batch.is_empty() {
162 tasks.push(
163 diesel::insert_into(proposals::table)
164 .values(&proposals_batch)
165 .on_conflict_do_nothing()
166 .execute(conn),
167 );
168 }
169 if !rebates_batch.is_empty() {
170 tasks.push(
171 diesel::insert_into(rebates::table)
172 .values(&rebates_batch)
173 .on_conflict_do_nothing()
174 .execute(conn),
175 );
176 }
177 if !stakes_batch.is_empty() {
178 tasks.push(
179 diesel::insert_into(stakes::table)
180 .values(&stakes_batch)
181 .on_conflict_do_nothing()
182 .execute(conn),
183 );
184 }
185 if !trade_params_update_batch.is_empty() {
186 tasks.push(
187 diesel::insert_into(trade_params_update::table)
188 .values(&trade_params_update_batch)
189 .on_conflict_do_nothing()
190 .execute(conn),
191 );
192 }
193 if !votes_batch.is_empty() {
194 tasks.push(
195 diesel::insert_into(votes::table)
196 .values(&votes_batch)
197 .on_conflict_do_nothing()
198 .execute(conn),
199 );
200 }
201 if !error_transactions_batch.is_empty() {
202 tasks.push(
203 diesel::insert_into(sui_error_transactions::table)
204 .values(&error_transactions_batch)
205 .on_conflict_do_nothing()
206 .execute(conn),
207 );
208 }
209
210 let _: Vec<_> = future::try_join_all(tasks).await?;
212
213 Ok(())
214 }
215 .scope_boxed()
216 })
217 .await
218 }
219}
220
221#[async_trait]
222impl IndexerProgressStore for PgDeepbookPersistent {
223 async fn load_progress(&self, task_name: String) -> anyhow::Result<u64> {
224 let mut conn = self.pool.get().await?;
225 let cp: Option<models::ProgressStore> = dsl::progress_store
226 .find(&task_name)
227 .select(models::ProgressStore::as_select())
228 .first(&mut conn)
229 .await
230 .optional()?;
231 Ok(cp
232 .ok_or(anyhow!("Cannot found progress for task {task_name}"))?
233 .checkpoint as u64)
234 }
235
236 async fn save_progress(
237 &mut self,
238 task: &Task,
239 checkpoint_numbers: &[u64],
240 ) -> anyhow::Result<Option<u64>> {
241 if checkpoint_numbers.is_empty() {
242 return Ok(None);
243 }
244 let task_name = task.task_name.clone();
245 if let Some(checkpoint_to_save) = self
246 .save_progress_policy
247 .cache_progress(task, checkpoint_numbers)
248 {
249 let mut conn = self.pool.get().await?;
250 diesel::insert_into(schema::progress_store::table)
251 .values(&models::ProgressStore {
252 task_name,
253 checkpoint: checkpoint_to_save as i64,
254 target_checkpoint: i64::MAX,
256 timestamp: None,
258 })
259 .on_conflict(dsl::task_name)
260 .do_update()
261 .set((
262 columns::checkpoint.eq(checkpoint_to_save as i64),
263 columns::timestamp.eq(now),
264 ))
265 .execute(&mut conn)
266 .await?;
267 return Ok(Some(checkpoint_to_save));
269 }
270 Ok(None)
271 }
272
273 async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Tasks, anyhow::Error> {
274 let mut conn = self.pool.get().await?;
275 let cp: Vec<models::ProgressStore> = dsl::progress_store
277 .filter(columns::task_name.like(format!("{prefix} - %")))
279 .filter(columns::checkpoint.lt(columns::target_checkpoint))
280 .order_by(columns::target_checkpoint.desc())
281 .load(&mut conn)
282 .await?;
283 let tasks = cp.into_iter().map(|d| d.into()).collect();
284 Ok(Tasks::new(tasks)?)
285 }
286
287 async fn get_largest_indexed_checkpoint(&self, prefix: &str) -> Result<Option<u64>, Error> {
288 let mut conn = self.pool.get().await?;
289 let cp = dsl::progress_store
290 .select(columns::checkpoint)
291 .filter(columns::task_name.like(format!("{prefix} - %")))
293 .filter(columns::target_checkpoint.eq(i64::MAX))
294 .first::<i64>(&mut conn)
295 .await
296 .optional()?;
297
298 if let Some(cp) = cp {
299 Ok(Some(cp as u64))
300 } else {
301 self.get_largest_backfill_task_target_checkpoint(prefix)
303 .await
304 }
305 }
306
307 async fn register_task(
308 &mut self,
309 task_name: String,
310 checkpoint: u64,
311 target_checkpoint: u64,
312 ) -> Result<(), anyhow::Error> {
313 let mut conn = self.pool.get().await?;
314 diesel::insert_into(schema::progress_store::table)
315 .values(models::ProgressStore {
316 task_name,
317 checkpoint: checkpoint as i64,
318 target_checkpoint: target_checkpoint as i64,
319 timestamp: None,
321 })
322 .execute(&mut conn)
323 .await?;
324 Ok(())
325 }
326
327 async fn register_live_task(
329 &mut self,
330 task_name: String,
331 start_checkpoint: u64,
332 ) -> Result<(), anyhow::Error> {
333 let mut conn = self.pool.get().await?;
334 diesel::insert_into(schema::progress_store::table)
335 .values(models::ProgressStore {
336 task_name,
337 checkpoint: start_checkpoint as i64,
338 target_checkpoint: LIVE_TASK_TARGET_CHECKPOINT,
339 timestamp: None,
341 })
342 .execute(&mut conn)
343 .await?;
344 Ok(())
345 }
346
347 async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
348 let mut conn = self.pool.get().await?;
349 diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
350 .set((
351 columns::checkpoint.eq(task.start_checkpoint as i64),
352 columns::target_checkpoint.eq(task.target_checkpoint as i64),
353 columns::timestamp.eq(now),
354 ))
355 .execute(&mut conn)
356 .await?;
357 Ok(())
358 }
359}
360
361#[derive(Clone)]
363pub struct SuiDeepBookDataMapper {
364 pub metrics: DeepBookIndexerMetrics,
365 pub package_id: ObjectID,
366}
367
368impl DataMapper<CheckpointTxnData, ProcessedTxnData> for SuiDeepBookDataMapper {
369 fn map(
370 &self,
371 (data, checkpoint_num, timestamp_ms): CheckpointTxnData,
372 ) -> Result<Vec<ProcessedTxnData>, Error> {
373 if !data.input_objects.iter().any(|obj| {
374 obj.data
375 .type_()
376 .map(|t| t.address() == self.package_id.into())
377 .unwrap_or_default()
378 }) {
379 return Ok(vec![]);
380 }
381
382 self.metrics.total_deepbook_transactions.inc();
383
384 match &data.events {
385 Some(events) => {
386 let processed_sui_events =
387 events
388 .data
389 .iter()
390 .enumerate()
391 .try_fold(vec![], |mut result, (i, ev)| {
392 if let Some(data) = process_sui_event(
393 ev,
394 i,
395 &data,
396 checkpoint_num,
397 timestamp_ms,
398 self.package_id,
399 )? {
400 result.push(data);
401 }
402 Ok::<_, anyhow::Error>(result)
403 })?;
404 if !processed_sui_events.is_empty() {
405 info!(
406 "SUI: Extracted {} deepbook data entries for tx {}.",
407 processed_sui_events.len(),
408 data.transaction.digest()
409 );
410 }
411 Ok(processed_sui_events)
412 }
413 None => {
414 if let ExecutionStatus::Failure { error, command } = data.effects.status() {
415 let txn_kind = data.transaction.transaction_data().clone().into_kind();
416 let first_command = txn_kind.iter_commands().next();
417 let package = if let Some(Command::MoveCall(move_call)) = first_command {
418 move_call.package.to_string()
419 } else {
420 "".to_string()
421 };
422 Ok(vec![ProcessedTxnData::Error(SuiTxnError {
423 tx_digest: *data.transaction.digest(),
424 sender: data.transaction.sender_address(),
425 timestamp_ms,
426 failure_status: error.to_string(),
427 package,
428 cmd_idx: command.map(|idx| idx as u64),
429 })])
430 } else {
431 Ok(vec![])
432 }
433 }
434 }
435 }
436}
437
438fn process_sui_event(
439 ev: &Event,
440 event_index: usize,
441 tx: &CheckpointTransaction,
442 checkpoint: u64,
443 checkpoint_timestamp_ms: u64,
444 package_id: ObjectID,
445) -> Result<Option<ProcessedTxnData>, anyhow::Error> {
446 Ok(if ev.type_.address == *package_id {
447 match ev.type_.name.as_str() {
448 "OrderPlaced" => {
449 let move_event: MoveOrderPlacedEvent = bcs::from_bytes(&ev.contents)?;
450 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
451 let first_command = txn_kind.iter_commands().next();
452 let package = if let Some(Command::MoveCall(move_call)) = first_command {
453 move_call.package.to_string()
454 } else {
455 "".to_string()
456 };
457 let mut event_digest = tx.transaction.digest().to_string();
458 event_digest.push_str(&event_index.to_string());
459 let txn_data = Some(ProcessedTxnData::OrderUpdate(OrderUpdate {
460 digest: tx.transaction.digest().to_string(),
461 sender: tx.transaction.sender_address().to_string(),
462 event_digest,
463 checkpoint,
464 checkpoint_timestamp_ms,
465 package,
466 status: OrderUpdateStatus::Placed,
467 pool_id: move_event.pool_id.to_string(),
468 order_id: move_event.order_id,
469 client_order_id: move_event.client_order_id,
470 price: move_event.price,
471 is_bid: move_event.is_bid,
472 onchain_timestamp: move_event.timestamp,
473 original_quantity: move_event.placed_quantity,
474 quantity: move_event.placed_quantity,
475 filled_quantity: 0,
476 trader: move_event.trader.to_string(),
477 balance_manager_id: move_event.balance_manager_id.to_string(),
478 }));
479 info!("Observed Deepbook Order Placed {:?}", txn_data);
480
481 txn_data
482 }
483 "OrderModified" => {
484 let move_event: MoveOrderModifiedEvent = bcs::from_bytes(&ev.contents)?;
485 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
486 let first_command = txn_kind.iter_commands().next();
487 let package = if let Some(Command::MoveCall(move_call)) = first_command {
488 move_call.package.to_string()
489 } else {
490 "".to_string()
491 };
492 let mut event_digest = tx.transaction.digest().to_string();
493 event_digest.push_str(&event_index.to_string());
494 let txn_data = Some(ProcessedTxnData::OrderUpdate(OrderUpdate {
495 digest: tx.transaction.digest().to_string(),
496 event_digest,
497 sender: tx.transaction.sender_address().to_string(),
498 checkpoint,
499 checkpoint_timestamp_ms,
500 package,
501 status: OrderUpdateStatus::Modified,
502 pool_id: move_event.pool_id.to_string(),
503 order_id: move_event.order_id,
504 client_order_id: move_event.client_order_id,
505 price: move_event.price,
506 is_bid: move_event.is_bid,
507 onchain_timestamp: move_event.timestamp,
508 original_quantity: move_event.previous_quantity,
509 quantity: move_event.new_quantity,
510 filled_quantity: move_event.filled_quantity,
511 trader: move_event.trader.to_string(),
512 balance_manager_id: move_event.balance_manager_id.to_string(),
513 }));
514 info!("Observed Deepbook Order Modified {:?}", txn_data);
515
516 txn_data
517 }
518 "OrderCanceled" => {
519 let move_event: MoveOrderCanceledEvent = bcs::from_bytes(&ev.contents)?;
520 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
521 let first_command = txn_kind.iter_commands().next();
522 let package = if let Some(Command::MoveCall(move_call)) = first_command {
523 move_call.package.to_string()
524 } else {
525 "".to_string()
526 };
527 let mut event_digest = tx.transaction.digest().to_string();
528 event_digest.push_str(&event_index.to_string());
529 let txn_data = Some(ProcessedTxnData::OrderUpdate(OrderUpdate {
530 digest: tx.transaction.digest().to_string(),
531 event_digest,
532 sender: tx.transaction.sender_address().to_string(),
533 checkpoint,
534 checkpoint_timestamp_ms,
535 package,
536 status: OrderUpdateStatus::Canceled,
537 pool_id: move_event.pool_id.to_string(),
538 order_id: move_event.order_id,
539 client_order_id: move_event.client_order_id,
540 price: move_event.price,
541 is_bid: move_event.is_bid,
542 onchain_timestamp: move_event.timestamp,
543 original_quantity: move_event.original_quantity,
544 quantity: move_event.base_asset_quantity_canceled,
545 filled_quantity: move_event.original_quantity
546 - move_event.base_asset_quantity_canceled,
547 trader: move_event.trader.to_string(),
548 balance_manager_id: move_event.balance_manager_id.to_string(),
549 }));
550 info!("Observed Deepbook Order Canceled {:?}", txn_data);
551
552 txn_data
553 }
554 "OrderExpired" => {
555 let move_event: MoveOrderExpiredEvent = bcs::from_bytes(&ev.contents)?;
556 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
557 let first_command = txn_kind.iter_commands().next();
558 let package = if let Some(Command::MoveCall(move_call)) = first_command {
559 move_call.package.to_string()
560 } else {
561 "".to_string()
562 };
563 let mut event_digest = tx.transaction.digest().to_string();
564 event_digest.push_str(&event_index.to_string());
565 let txn_data = Some(ProcessedTxnData::OrderUpdate(OrderUpdate {
566 digest: tx.transaction.digest().to_string(),
567 event_digest,
568 sender: tx.transaction.sender_address().to_string(),
569 checkpoint,
570 checkpoint_timestamp_ms,
571 package,
572 status: OrderUpdateStatus::Expired,
573 pool_id: move_event.pool_id.to_string(),
574 order_id: move_event.order_id,
575 client_order_id: move_event.client_order_id,
576 price: move_event.price,
577 is_bid: move_event.is_bid,
578 onchain_timestamp: move_event.timestamp,
579 original_quantity: move_event.original_quantity,
580 quantity: move_event.base_asset_quantity_canceled,
581 filled_quantity: move_event.original_quantity
582 - move_event.base_asset_quantity_canceled,
583 trader: move_event.trader.to_string(),
584 balance_manager_id: move_event.balance_manager_id.to_string(),
585 }));
586 info!("Observed Deepbook Order Expired {:?}", txn_data);
587
588 txn_data
589 }
590 "OrderFilled" => {
591 let move_event: MoveOrderFilledEvent = bcs::from_bytes(&ev.contents)?;
592 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
593 let first_command = txn_kind.iter_commands().next();
594 let package = if let Some(Command::MoveCall(move_call)) = first_command {
595 move_call.package.to_string()
596 } else {
597 "".to_string()
598 };
599 let mut event_digest = tx.transaction.digest().to_string();
600 event_digest.push_str(&event_index.to_string());
601 let txn_data = Some(ProcessedTxnData::OrderFill(OrderFill {
602 digest: tx.transaction.digest().to_string(),
603 event_digest,
604 sender: tx.transaction.sender_address().to_string(),
605 checkpoint,
606 checkpoint_timestamp_ms,
607 package,
608 pool_id: move_event.pool_id.to_string(),
609 maker_order_id: move_event.maker_order_id,
610 taker_order_id: move_event.taker_order_id,
611 maker_client_order_id: move_event.maker_client_order_id,
612 taker_client_order_id: move_event.taker_client_order_id,
613 price: move_event.price,
614 taker_is_bid: move_event.taker_is_bid,
615 taker_fee: move_event.taker_fee,
616 taker_fee_is_deep: move_event.taker_fee_is_deep,
617 maker_fee: move_event.maker_fee,
618 maker_fee_is_deep: move_event.maker_fee_is_deep,
619 base_quantity: move_event.base_quantity,
620 quote_quantity: move_event.quote_quantity,
621 maker_balance_manager_id: move_event.maker_balance_manager_id.to_string(),
622 taker_balance_manager_id: move_event.taker_balance_manager_id.to_string(),
623 onchain_timestamp: move_event.timestamp,
624 }));
625 info!("Observed Deepbook Order Filled {:?}", txn_data);
626
627 txn_data
628 }
629 "FlashLoanBorrowed" => {
630 let move_event: MoveFlashLoanBorrowedEvent = bcs::from_bytes(&ev.contents)?;
631 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
632 let first_command = txn_kind.iter_commands().next();
633 let package = if let Some(Command::MoveCall(move_call)) = first_command {
634 move_call.package.to_string()
635 } else {
636 "".to_string()
637 };
638 let mut event_digest = tx.transaction.digest().to_string();
639 event_digest.push_str(&event_index.to_string());
640 let txn_data = Some(ProcessedTxnData::Flashloan(Flashloan {
641 digest: tx.transaction.digest().to_string(),
642 event_digest,
643 sender: tx.transaction.sender_address().to_string(),
644 checkpoint,
645 checkpoint_timestamp_ms,
646 package,
647 pool_id: move_event.pool_id.to_string(),
648 borrow_quantity: move_event.borrow_quantity,
649 borrow: true,
650 type_name: move_event.type_name.to_string(),
651 }));
652 info!("Observed Deepbook Flash Loan Borrowed {:?}", txn_data);
653
654 txn_data
655 }
656 "PriceAdded" => {
657 let move_event: MovePriceAddedEvent = bcs::from_bytes(&ev.contents)?;
658 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
659 let first_command = txn_kind.iter_commands().next();
660 let package = if let Some(Command::MoveCall(move_call)) = first_command {
661 move_call.package.to_string()
662 } else {
663 "".to_string()
664 };
665 let mut event_digest = tx.transaction.digest().to_string();
666 event_digest.push_str(&event_index.to_string());
667 let txn_data = Some(ProcessedTxnData::PoolPrice(PoolPrice {
668 digest: tx.transaction.digest().to_string(),
669 event_digest,
670 sender: tx.transaction.sender_address().to_string(),
671 checkpoint,
672 checkpoint_timestamp_ms,
673 package,
674 target_pool: move_event.target_pool.to_string(),
675 conversion_rate: move_event.conversion_rate,
676 reference_pool: move_event.reference_pool.to_string(),
677 }));
678 info!("Observed Deepbook Price Addition {:?}", txn_data);
679
680 txn_data
681 }
682 "BalanceEvent" => {
683 let move_event: MoveBalanceEvent = bcs::from_bytes(&ev.contents)?;
684 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
685 let first_command = txn_kind.iter_commands().next();
686 let package = if let Some(Command::MoveCall(move_call)) = first_command {
687 move_call.package.to_string()
688 } else {
689 "".to_string()
690 };
691 let mut event_digest = tx.transaction.digest().to_string();
692 event_digest.push_str(&event_index.to_string());
693 let txn_data = Some(ProcessedTxnData::Balances(Balances {
694 digest: tx.transaction.digest().to_string(),
695 event_digest,
696 sender: tx.transaction.sender_address().to_string(),
697 checkpoint,
698 checkpoint_timestamp_ms,
699 package,
700 balance_manager_id: move_event.balance_manager_id.to_string(),
701 asset: move_event.asset.to_string(),
702 amount: move_event.amount,
703 deposit: move_event.deposit,
704 }));
705 info!("Observed Deepbook Balance Event {:?}", txn_data);
706
707 txn_data
708 }
709 "ProposalEvent" => {
710 let move_event: MoveProposalEvent = bcs::from_bytes(&ev.contents)?;
711 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
712 let first_command = txn_kind.iter_commands().next();
713 let package = if let Some(Command::MoveCall(move_call)) = first_command {
714 move_call.package.to_string()
715 } else {
716 "".to_string()
717 };
718 let mut event_digest = tx.transaction.digest().to_string();
719 event_digest.push_str(&event_index.to_string());
720 let txn_data = Some(ProcessedTxnData::Proposals(Proposals {
721 digest: tx.transaction.digest().to_string(),
722 event_digest,
723 sender: tx.transaction.sender_address().to_string(),
724 checkpoint,
725 checkpoint_timestamp_ms,
726 package,
727 pool_id: move_event.pool_id.to_string(),
728 balance_manager_id: move_event.balance_manager_id.to_string(),
729 epoch: move_event.epoch,
730 taker_fee: move_event.taker_fee,
731 maker_fee: move_event.maker_fee,
732 stake_required: move_event.stake_required,
733 }));
734 info!("Observed Deepbook Proposal Event {:?}", txn_data);
735
736 txn_data
737 }
738 "RebateEvent" => {
739 let move_event: MoveRebateEvent = bcs::from_bytes(&ev.contents)?;
740 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
741 let first_command = txn_kind.iter_commands().next();
742 let package = if let Some(Command::MoveCall(move_call)) = first_command {
743 move_call.package.to_string()
744 } else {
745 "".to_string()
746 };
747 let mut event_digest = tx.transaction.digest().to_string();
748 event_digest.push_str(&event_index.to_string());
749 let txn_data = Some(ProcessedTxnData::Rebates(Rebates {
750 digest: tx.transaction.digest().to_string(),
751 event_digest,
752 sender: tx.transaction.sender_address().to_string(),
753 checkpoint,
754 checkpoint_timestamp_ms,
755 package,
756 pool_id: move_event.pool_id.to_string(),
757 balance_manager_id: move_event.balance_manager_id.to_string(),
758 epoch: move_event.epoch,
759 claim_amount: move_event.claim_amount,
760 }));
761 info!("Observed Deepbook Rebate Event {:?}", txn_data);
762
763 txn_data
764 }
765 "StakeEvent" => {
766 let move_event: MoveStakeEvent = bcs::from_bytes(&ev.contents)?;
767 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
768 let first_command = txn_kind.iter_commands().next();
769 let package = if let Some(Command::MoveCall(move_call)) = first_command {
770 move_call.package.to_string()
771 } else {
772 "".to_string()
773 };
774 let mut event_digest = tx.transaction.digest().to_string();
775 event_digest.push_str(&event_index.to_string());
776 let txn_data = Some(ProcessedTxnData::Stakes(Stakes {
777 digest: tx.transaction.digest().to_string(),
778 event_digest,
779 sender: tx.transaction.sender_address().to_string(),
780 checkpoint,
781 checkpoint_timestamp_ms,
782 package,
783 pool_id: move_event.pool_id.to_string(),
784 balance_manager_id: move_event.balance_manager_id.to_string(),
785 epoch: move_event.epoch,
786 amount: move_event.amount,
787 stake: move_event.stake,
788 }));
789 info!("Observed Deepbook Stake Event {:?}", txn_data);
790
791 txn_data
792 }
793 "TradeParamsUpdateEvent" => {
794 let move_event: MoveTradeParamsUpdateEvent = bcs::from_bytes(&ev.contents)?;
795 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
796 let first_command = txn_kind.iter_commands().next();
797 let package = if let Some(Command::MoveCall(move_call)) = first_command {
798 move_call.package.to_string()
799 } else {
800 "".to_string()
801 };
802 let mut event_digest = tx.transaction.digest().to_string();
803 event_digest.push_str(&event_index.to_string());
804 let shared_objects = &tx.input_objects;
805 let mut pool_id = "0x0".to_string();
806 for obj in shared_objects.iter() {
807 if let Some(obj_type) = obj.data.type_()
808 && obj_type.module().to_string().eq("pool")
809 && obj_type.address() == *package_id
810 {
811 pool_id = obj_type.address().to_string();
812 break;
813 }
814 }
815 let txn_data = Some(ProcessedTxnData::TradeParamsUpdate(TradeParamsUpdate {
816 digest: tx.transaction.digest().to_string(),
817 event_digest,
818 sender: tx.transaction.sender_address().to_string(),
819 checkpoint,
820 checkpoint_timestamp_ms,
821 package,
822 pool_id,
823 taker_fee: move_event.taker_fee,
824 maker_fee: move_event.maker_fee,
825 stake_required: move_event.stake_required,
826 }));
827 info!("Observed Deepbook Trade Params Update Event {:?}", txn_data);
828
829 txn_data
830 }
831 "VoteEvent" => {
832 let move_event: MoveVoteEvent = bcs::from_bytes(&ev.contents)?;
833 let txn_kind = tx.transaction.transaction_data().clone().into_kind();
834 let first_command = txn_kind.iter_commands().next();
835 let package = if let Some(Command::MoveCall(move_call)) = first_command {
836 move_call.package.to_string()
837 } else {
838 "".to_string()
839 };
840 let mut event_digest = tx.transaction.digest().to_string();
841 event_digest.push_str(&event_index.to_string());
842 let txn_data = Some(ProcessedTxnData::Votes(Votes {
843 digest: tx.transaction.digest().to_string(),
844 event_digest,
845 sender: tx.transaction.sender_address().to_string(),
846 checkpoint,
847 checkpoint_timestamp_ms,
848 package,
849 pool_id: move_event.pool_id.to_string(),
850 balance_manager_id: move_event.balance_manager_id.to_string(),
851 epoch: move_event.epoch,
852 from_proposal_id: move_event.from_proposal_id.map(|id| id.to_string()),
853 to_proposal_id: move_event.to_proposal_id.to_string(),
854 stake: move_event.stake,
855 }));
856 info!("Observed Deepbook Vote Event {:?}", txn_data);
857
858 txn_data
859 }
860 _ => {
861 None
863 }
864 }
865 } else {
866 None
867 })
868}