1use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::NaiveDateTime;
8use diesel::ExpressionMethods;
9use diesel::OptionalExtension;
10use diesel::prelude::*;
11use diesel::sql_types::BigInt;
12use diesel_async::{AsyncConnection, RunQueryDsl};
13use scoped_futures::ScopedBoxFuture;
14use sui_indexer_alt_framework_store_traits as store;
15use sui_sql_macro::sql;
16
17use crate::model::StoredWatermark;
18use crate::schema::watermarks;
19use crate::{Connection, Db};
20
21pub use sui_indexer_alt_framework_store_traits::Store;
22
23#[async_trait]
24impl store::Connection for Connection<'_> {
25 async fn committer_watermark(
26 &mut self,
27 pipeline: &'static str,
28 ) -> anyhow::Result<Option<store::CommitterWatermark>> {
29 let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
30 .select((
31 watermarks::epoch_hi_inclusive,
32 watermarks::checkpoint_hi_inclusive,
33 watermarks::tx_hi,
34 watermarks::timestamp_ms_hi_inclusive,
35 ))
36 .filter(watermarks::pipeline.eq(pipeline))
37 .first(self)
38 .await
39 .optional()?;
40
41 if let Some(watermark) = watermark {
42 Ok(Some(store::CommitterWatermark {
43 epoch_hi_inclusive: watermark.0 as u64,
44 checkpoint_hi_inclusive: watermark.1 as u64,
45 tx_hi: watermark.2 as u64,
46 timestamp_ms_hi_inclusive: watermark.3 as u64,
47 }))
48 } else {
49 Ok(None)
50 }
51 }
52
53 async fn reader_watermark(
54 &mut self,
55 pipeline: &'static str,
56 ) -> anyhow::Result<Option<store::ReaderWatermark>> {
57 let watermark: Option<(i64, i64)> = watermarks::table
58 .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
59 .filter(watermarks::pipeline.eq(pipeline))
60 .first(self)
61 .await
62 .optional()?;
63
64 if let Some(watermark) = watermark {
65 Ok(Some(store::ReaderWatermark {
66 checkpoint_hi_inclusive: watermark.0 as u64,
67 reader_lo: watermark.1 as u64,
68 }))
69 } else {
70 Ok(None)
71 }
72 }
73
74 async fn pruner_watermark(
75 &mut self,
76 pipeline: &'static str,
77 delay: Duration,
78 ) -> anyhow::Result<Option<store::PrunerWatermark>> {
79 let wait_for = sql!(as BigInt,
85 "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
86 delay.as_millis() as i64,
87 );
88
89 let watermark: Option<(i64, i64, i64)> = watermarks::table
90 .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
91 .filter(watermarks::pipeline.eq(pipeline))
92 .first(self)
93 .await
94 .optional()?;
95
96 if let Some(watermark) = watermark {
97 Ok(Some(store::PrunerWatermark {
98 wait_for_ms: watermark.0,
99 pruner_hi: watermark.1 as u64,
100 reader_lo: watermark.2 as u64,
101 }))
102 } else {
103 Ok(None)
104 }
105 }
106
107 async fn set_committer_watermark(
108 &mut self,
109 pipeline: &'static str,
110 watermark: store::CommitterWatermark,
111 ) -> anyhow::Result<bool> {
112 let stored_watermark = StoredWatermark {
114 pipeline: pipeline.to_string(),
115 epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
116 checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
117 tx_hi: watermark.tx_hi as i64,
118 timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
119 reader_lo: 0,
120 pruner_timestamp: NaiveDateTime::UNIX_EPOCH,
121 pruner_hi: 0,
122 };
123
124 use diesel::query_dsl::methods::FilterDsl;
125 Ok(diesel::insert_into(watermarks::table)
126 .values(&stored_watermark)
127 .on_conflict(watermarks::pipeline)
129 .do_update()
130 .set((
131 watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
132 watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
133 watermarks::tx_hi.eq(stored_watermark.tx_hi),
134 watermarks::timestamp_ms_hi_inclusive
135 .eq(stored_watermark.timestamp_ms_hi_inclusive),
136 ))
137 .filter(
138 watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
139 )
140 .execute(self)
141 .await?
142 > 0)
143 }
144
145 async fn set_reader_watermark(
146 &mut self,
147 pipeline: &'static str,
148 reader_lo: u64,
149 ) -> anyhow::Result<bool> {
150 Ok(diesel::update(watermarks::table)
151 .set((
152 watermarks::reader_lo.eq(reader_lo as i64),
153 watermarks::pruner_timestamp.eq(diesel::dsl::now),
154 ))
155 .filter(watermarks::pipeline.eq(pipeline))
156 .filter(watermarks::reader_lo.lt(reader_lo as i64))
157 .execute(self)
158 .await?
159 > 0)
160 }
161
162 async fn set_pruner_watermark(
163 &mut self,
164 pipeline: &'static str,
165 pruner_hi: u64,
166 ) -> anyhow::Result<bool> {
167 Ok(diesel::update(watermarks::table)
168 .set(watermarks::pruner_hi.eq(pruner_hi as i64))
169 .filter(watermarks::pipeline.eq(pipeline))
170 .execute(self)
171 .await?
172 > 0)
173 }
174}
175
176#[async_trait]
177impl store::Store for Db {
178 type Connection<'c> = Connection<'c>;
179
180 async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
181 Ok(Connection(self.0.get().await?))
182 }
183}
184
185#[async_trait]
186impl store::TransactionalStore for Db {
187 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
188 where
189 R: Send + 'a,
190 F: Send + 'a,
191 F: for<'r> FnOnce(
192 &'r mut Self::Connection<'_>,
193 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
194 {
195 let mut conn = self.connect().await?;
196 AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
197 }
198}