1use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::{NaiveDateTime, Utc};
8use diesel::ExpressionMethods;
9use diesel::OptionalExtension;
10use diesel::prelude::*;
11use diesel::sql_types::BigInt;
12use diesel_async::{AsyncConnection, RunQueryDsl};
13use scoped_futures::ScopedBoxFuture;
14use sui_indexer_alt_framework_store_traits as store;
15use sui_sql_macro::sql;
16
17use crate::model::StoredWatermark;
18use crate::schema::watermarks;
19use crate::{Connection, Db};
20
21pub use sui_indexer_alt_framework_store_traits::Store;
22
23#[async_trait]
24impl store::Connection for Connection<'_> {
25 async fn init_watermark(
26 &mut self,
27 pipeline_task: &str,
28 default_next_checkpoint: u64,
29 ) -> anyhow::Result<Option<u64>> {
30 let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
31 return Ok(self
33 .committer_watermark(pipeline_task)
34 .await?
35 .map(|w| w.checkpoint_hi_inclusive));
36 };
37
38 let stored_watermark = StoredWatermark {
39 pipeline: pipeline_task.to_string(),
40 epoch_hi_inclusive: 0,
41 checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
42 tx_hi: 0,
43 timestamp_ms_hi_inclusive: 0,
44 reader_lo: default_next_checkpoint as i64,
45 pruner_timestamp: Utc::now().naive_utc(),
46 pruner_hi: default_next_checkpoint as i64,
47 };
48
49 use diesel::pg::upsert::excluded;
50 let checkpoint_hi_inclusive: i64 = diesel::insert_into(watermarks::table)
51 .values(&stored_watermark)
52 .on_conflict(watermarks::pipeline)
54 .do_update()
56 .set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
59 .returning(watermarks::checkpoint_hi_inclusive)
60 .get_result(self)
61 .await?;
62
63 Ok(Some(checkpoint_hi_inclusive as u64))
64 }
65
66 async fn committer_watermark(
67 &mut self,
68 pipeline_task: &str,
69 ) -> anyhow::Result<Option<store::CommitterWatermark>> {
70 let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
71 .select((
72 watermarks::epoch_hi_inclusive,
73 watermarks::checkpoint_hi_inclusive,
74 watermarks::tx_hi,
75 watermarks::timestamp_ms_hi_inclusive,
76 ))
77 .filter(watermarks::pipeline.eq(pipeline_task))
78 .first(self)
79 .await
80 .optional()?;
81
82 if let Some(watermark) = watermark {
83 Ok(Some(store::CommitterWatermark {
84 epoch_hi_inclusive: watermark.0 as u64,
85 checkpoint_hi_inclusive: watermark.1 as u64,
86 tx_hi: watermark.2 as u64,
87 timestamp_ms_hi_inclusive: watermark.3 as u64,
88 }))
89 } else {
90 Ok(None)
91 }
92 }
93
94 async fn reader_watermark(
95 &mut self,
96 pipeline: &'static str,
97 ) -> anyhow::Result<Option<store::ReaderWatermark>> {
98 let watermark: Option<(i64, i64)> = watermarks::table
99 .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
100 .filter(watermarks::pipeline.eq(pipeline))
101 .first(self)
102 .await
103 .optional()?;
104
105 if let Some(watermark) = watermark {
106 Ok(Some(store::ReaderWatermark {
107 checkpoint_hi_inclusive: watermark.0 as u64,
108 reader_lo: watermark.1 as u64,
109 }))
110 } else {
111 Ok(None)
112 }
113 }
114
115 async fn pruner_watermark(
116 &mut self,
117 pipeline: &'static str,
118 delay: Duration,
119 ) -> anyhow::Result<Option<store::PrunerWatermark>> {
120 let wait_for = sql!(as BigInt,
126 "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
127 delay.as_millis() as i64,
128 );
129
130 let watermark: Option<(i64, i64, i64)> = watermarks::table
131 .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
132 .filter(watermarks::pipeline.eq(pipeline))
133 .first(self)
134 .await
135 .optional()?;
136
137 if let Some(watermark) = watermark {
138 Ok(Some(store::PrunerWatermark {
139 wait_for_ms: watermark.0,
140 pruner_hi: watermark.1 as u64,
141 reader_lo: watermark.2 as u64,
142 }))
143 } else {
144 Ok(None)
145 }
146 }
147
148 async fn set_committer_watermark(
149 &mut self,
150 pipeline_task: &str,
151 watermark: store::CommitterWatermark,
152 ) -> anyhow::Result<bool> {
153 let stored_watermark = StoredWatermark {
155 pipeline: pipeline_task.to_string(),
156 epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
157 checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
158 tx_hi: watermark.tx_hi as i64,
159 timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
160 reader_lo: 0,
161 pruner_timestamp: NaiveDateTime::UNIX_EPOCH,
162 pruner_hi: 0,
163 };
164
165 use diesel::query_dsl::methods::FilterDsl;
166 Ok(diesel::insert_into(watermarks::table)
167 .values(&stored_watermark)
168 .on_conflict(watermarks::pipeline)
170 .do_update()
171 .set((
172 watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
173 watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
174 watermarks::tx_hi.eq(stored_watermark.tx_hi),
175 watermarks::timestamp_ms_hi_inclusive
176 .eq(stored_watermark.timestamp_ms_hi_inclusive),
177 ))
178 .filter(
179 watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
180 )
181 .execute(self)
182 .await?
183 > 0)
184 }
185
186 async fn set_reader_watermark(
187 &mut self,
188 pipeline: &'static str,
189 reader_lo: u64,
190 ) -> anyhow::Result<bool> {
191 Ok(diesel::update(watermarks::table)
192 .set((
193 watermarks::reader_lo.eq(reader_lo as i64),
194 watermarks::pruner_timestamp.eq(diesel::dsl::now),
195 ))
196 .filter(watermarks::pipeline.eq(pipeline))
197 .filter(watermarks::reader_lo.lt(reader_lo as i64))
198 .execute(self)
199 .await?
200 > 0)
201 }
202
203 async fn set_pruner_watermark(
204 &mut self,
205 pipeline: &'static str,
206 pruner_hi: u64,
207 ) -> anyhow::Result<bool> {
208 Ok(diesel::update(watermarks::table)
209 .set(watermarks::pruner_hi.eq(pruner_hi as i64))
210 .filter(watermarks::pipeline.eq(pipeline))
211 .execute(self)
212 .await?
213 > 0)
214 }
215}
216
217#[async_trait]
218impl store::Store for Db {
219 type Connection<'c> = Connection<'c>;
220
221 async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
222 Ok(Connection(self.0.get().await?))
223 }
224}
225
226#[async_trait]
227impl store::TransactionalStore for Db {
228 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
229 where
230 R: Send + 'a,
231 F: Send + 'a,
232 F: for<'r> FnOnce(
233 &'r mut Self::Connection<'_>,
234 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
235 {
236 let mut conn = self.connect().await?;
237 AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
238 }
239}