1use std::time::Duration;
5
6use anyhow::Context;
7use async_trait::async_trait;
8use diesel::ExpressionMethods;
9use diesel::prelude::*;
10use diesel::sql_types::BigInt;
11use diesel::sql_types::Nullable;
12use diesel::sql_types::SingleValue;
13use diesel::sql_types::SqlType;
14use diesel_async::AsyncConnection;
15use diesel_async::RunQueryDsl;
16use scoped_futures::ScopedBoxFuture;
17use sui_indexer_alt_framework_store_traits as store;
18use sui_sql_macro::sql;
19
20use crate::Connection;
21use crate::Db;
22use crate::model::StoredWatermark;
23use crate::schema::watermarks;
24
25pub use sui_indexer_alt_framework_store_traits::Store;
26
27define_sql_function! {
28 fn coalesce<T: SqlType + SingleValue>(x: Nullable<T>, y: T) -> Nullable<T>;
29}
30
31#[async_trait]
32impl store::Connection for Connection<'_> {
33 async fn init_watermark(
34 &mut self,
35 pipeline_task: &str,
36 checkpoint_hi_inclusive: Option<u64>,
37 ) -> anyhow::Result<Option<store::InitWatermark>> {
38 let checkpoint_hi_inclusive = checkpoint_hi_inclusive.map_or(-1, |c| c as i64);
39 let stored_watermark = StoredWatermark::for_init(
40 pipeline_task,
41 checkpoint_hi_inclusive,
42 checkpoint_hi_inclusive + 1,
43 );
44
45 use diesel::pg::upsert::excluded;
46 let (checkpoint_hi_inclusive, reader_lo): (i64, i64) =
47 diesel::insert_into(watermarks::table)
48 .values(&stored_watermark)
49 .on_conflict(watermarks::pipeline)
51 .do_update()
53 .set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
56 .returning((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
57 .get_result(self)
58 .await?;
59
60 Ok(Some(store::InitWatermark {
61 checkpoint_hi_inclusive: u64::try_from(checkpoint_hi_inclusive).ok(),
62 reader_lo: Some(reader_lo as u64),
63 }))
64 }
65
66 async fn accepts_chain_id(
67 &mut self,
68 pipeline_task: &str,
69 chain_id: [u8; 32],
70 ) -> anyhow::Result<bool> {
71 let stored_chain_id: Option<Vec<u8>> = diesel::update(watermarks::table)
72 .filter(watermarks::pipeline.eq(pipeline_task))
73 .set(watermarks::chain_id.eq(coalesce(watermarks::chain_id, chain_id)))
75 .returning(watermarks::chain_id)
76 .get_result(self)
77 .await?;
78
79 let stored_chain_id = stored_chain_id.context("missing chain id after update")?;
80 let stored_chain_id: [u8; 32] = stored_chain_id
81 .try_into()
82 .map_err(|v: Vec<u8>| anyhow::anyhow!("chain id has wrong length: {}", v.len()))?;
83 Ok(stored_chain_id == chain_id)
84 }
85
86 async fn committer_watermark(
87 &mut self,
88 pipeline_task: &str,
89 ) -> anyhow::Result<Option<store::CommitterWatermark>> {
90 let (
91 epoch_hi_inclusive,
92 checkpoint_hi_inclusive,
93 tx_hi,
94 timestamp_ms_hi_inclusive,
95 reader_lo,
96 ): (i64, i64, i64, i64, i64) = watermarks::table
97 .select((
98 watermarks::epoch_hi_inclusive,
99 watermarks::checkpoint_hi_inclusive,
100 watermarks::tx_hi,
101 watermarks::timestamp_ms_hi_inclusive,
102 watermarks::reader_lo,
103 ))
104 .filter(watermarks::pipeline.eq(pipeline_task))
105 .first(self)
106 .await?;
107
108 Ok(
109 (reader_lo <= checkpoint_hi_inclusive).then_some(store::CommitterWatermark {
110 epoch_hi_inclusive: epoch_hi_inclusive as u64,
111 checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
112 tx_hi: tx_hi as u64,
113 timestamp_ms_hi_inclusive: timestamp_ms_hi_inclusive as u64,
114 }),
115 )
116 }
117
118 async fn set_committer_watermark(
119 &mut self,
120 pipeline_task: &str,
121 watermark: store::CommitterWatermark,
122 ) -> anyhow::Result<bool> {
123 Ok(diesel::update(watermarks::table)
124 .set((
125 watermarks::epoch_hi_inclusive.eq(watermark.epoch_hi_inclusive as i64),
126 watermarks::checkpoint_hi_inclusive.eq(watermark.checkpoint_hi_inclusive as i64),
127 watermarks::tx_hi.eq(watermark.tx_hi as i64),
128 watermarks::timestamp_ms_hi_inclusive
129 .eq(watermark.timestamp_ms_hi_inclusive as i64),
130 ))
131 .filter(watermarks::pipeline.eq(pipeline_task))
132 .filter(
133 watermarks::checkpoint_hi_inclusive.lt(watermark.checkpoint_hi_inclusive as i64),
134 )
135 .execute(self)
136 .await?
137 > 0)
138 }
139}
140
141#[async_trait]
142impl store::ConcurrentConnection for Connection<'_> {
143 async fn reader_watermark(
144 &mut self,
145 pipeline: &str,
146 ) -> anyhow::Result<Option<store::ReaderWatermark>> {
147 let (checkpoint_hi_inclusive, reader_lo): (i64, i64) = watermarks::table
148 .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
149 .filter(watermarks::pipeline.eq(pipeline))
150 .first(self)
151 .await?;
152
153 Ok(
154 (reader_lo <= checkpoint_hi_inclusive).then_some(store::ReaderWatermark {
155 checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
156 reader_lo: reader_lo as u64,
157 }),
158 )
159 }
160
161 async fn pruner_watermark(
162 &mut self,
163 pipeline: &'static str,
164 delay: Duration,
165 ) -> anyhow::Result<Option<store::PrunerWatermark>> {
166 let wait_for = sql!(as BigInt,
172 "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
173 delay.as_millis() as i64,
174 );
175
176 let (wait_for_ms, pruner_hi, reader_lo): (i64, i64, i64) = watermarks::table
177 .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
178 .filter(watermarks::pipeline.eq(pipeline))
179 .first(self)
180 .await?;
181
182 Ok(Some(store::PrunerWatermark {
183 wait_for_ms,
184 pruner_hi: pruner_hi as u64,
185 reader_lo: reader_lo as u64,
186 }))
187 }
188
189 async fn set_reader_watermark(
190 &mut self,
191 pipeline: &'static str,
192 reader_lo: u64,
193 ) -> anyhow::Result<bool> {
194 Ok(diesel::update(watermarks::table)
195 .set((
196 watermarks::reader_lo.eq(reader_lo as i64),
197 watermarks::pruner_timestamp.eq(diesel::dsl::now),
198 ))
199 .filter(watermarks::pipeline.eq(pipeline))
200 .filter(watermarks::reader_lo.lt(reader_lo as i64))
201 .execute(self)
202 .await?
203 > 0)
204 }
205
206 async fn set_pruner_watermark(
207 &mut self,
208 pipeline: &'static str,
209 pruner_hi: u64,
210 ) -> anyhow::Result<bool> {
211 Ok(diesel::update(watermarks::table)
212 .set(watermarks::pruner_hi.eq(pruner_hi as i64))
213 .filter(watermarks::pipeline.eq(pipeline))
214 .filter(watermarks::pruner_hi.lt(pruner_hi as i64))
215 .execute(self)
216 .await?
217 > 0)
218 }
219}
220
221#[async_trait]
222impl store::SequentialConnection for Connection<'_> {}
223
224#[async_trait]
225impl store::Store for Db {
226 type Connection<'c> = Connection<'c>;
227
228 async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
229 self.connect().await
230 }
231}
232
233#[async_trait]
234impl store::ConcurrentStore for Db {
235 type ConcurrentConnection<'c> = Connection<'c>;
236}
237
238#[async_trait]
239impl store::SequentialStore for Db {
240 type SequentialConnection<'c> = Connection<'c>;
241 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
242 where
243 R: Send + 'a,
244 F: Send + 'a,
245 F: for<'r> FnOnce(
246 &'r mut Self::Connection<'_>,
247 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
248 {
249 let mut conn = self.connect().await?;
250 AsyncConnection::transaction(&mut conn, async |conn| f(conn).await).await
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use sui_indexer_alt_framework_store_traits::concurrent_connection_tests;
257 use sui_indexer_alt_framework_store_traits::connection_tests;
258 use sui_indexer_alt_framework_store_traits::sequential_connection_tests;
259 use sui_indexer_alt_framework_store_traits::testing::Harness;
260
261 use crate::Db;
262 use crate::DbArgs;
263 use crate::MIGRATIONS;
264 use crate::temp::TempDb;
265
266 struct PgDbHarness {
267 store: Db,
268 _temp_db: TempDb,
269 }
270
271 #[async_trait::async_trait(?Send)]
272 impl Harness for PgDbHarness {
273 type Store = Db;
274
275 async fn new() -> Self {
276 let temp_db = TempDb::new().unwrap();
277 let db = Db::for_write(temp_db.database().url().clone(), DbArgs::default())
278 .await
279 .unwrap();
280 db.run_migrations(Some(&MIGRATIONS)).await.unwrap();
281 Self {
282 store: db,
283 _temp_db: temp_db,
284 }
285 }
286
287 fn store(&self) -> &Self::Store {
288 &self.store
289 }
290 }
291
292 connection_tests!(PgDbHarness);
293 concurrent_connection_tests!(PgDbHarness);
294 sequential_connection_tests!(PgDbHarness);
295}