1use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::NaiveDateTime;
8use chrono::Utc;
9use diesel::ExpressionMethods;
10use diesel::OptionalExtension;
11use diesel::prelude::*;
12use diesel::sql_types::BigInt;
13use diesel_async::AsyncConnection;
14use diesel_async::RunQueryDsl;
15use scoped_futures::ScopedBoxFuture;
16use sui_indexer_alt_framework_store_traits as store;
17use sui_sql_macro::sql;
18
19use crate::Connection;
20use crate::Db;
21use crate::model::StoredWatermark;
22use crate::schema::watermarks;
23
24pub use sui_indexer_alt_framework_store_traits::Store;
25
26#[async_trait]
27impl store::Connection for Connection<'_> {
28 async fn init_watermark(
29 &mut self,
30 pipeline_task: &str,
31 default_next_checkpoint: u64,
32 ) -> anyhow::Result<Option<u64>> {
33 let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
34 return Ok(self
36 .committer_watermark(pipeline_task)
37 .await?
38 .map(|w| w.checkpoint_hi_inclusive));
39 };
40
41 let stored_watermark = StoredWatermark {
42 pipeline: pipeline_task.to_string(),
43 epoch_hi_inclusive: 0,
44 checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
45 tx_hi: 0,
46 timestamp_ms_hi_inclusive: 0,
47 reader_lo: default_next_checkpoint as i64,
48 pruner_timestamp: Utc::now().naive_utc(),
49 pruner_hi: default_next_checkpoint as i64,
50 };
51
52 use diesel::pg::upsert::excluded;
53 let checkpoint_hi_inclusive: i64 = diesel::insert_into(watermarks::table)
54 .values(&stored_watermark)
55 .on_conflict(watermarks::pipeline)
57 .do_update()
59 .set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
62 .returning(watermarks::checkpoint_hi_inclusive)
63 .get_result(self)
64 .await?;
65
66 Ok(Some(checkpoint_hi_inclusive as u64))
67 }
68
69 async fn committer_watermark(
70 &mut self,
71 pipeline_task: &str,
72 ) -> anyhow::Result<Option<store::CommitterWatermark>> {
73 let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
74 .select((
75 watermarks::epoch_hi_inclusive,
76 watermarks::checkpoint_hi_inclusive,
77 watermarks::tx_hi,
78 watermarks::timestamp_ms_hi_inclusive,
79 ))
80 .filter(watermarks::pipeline.eq(pipeline_task))
81 .first(self)
82 .await
83 .optional()?;
84
85 if let Some(watermark) = watermark {
86 Ok(Some(store::CommitterWatermark {
87 epoch_hi_inclusive: watermark.0 as u64,
88 checkpoint_hi_inclusive: watermark.1 as u64,
89 tx_hi: watermark.2 as u64,
90 timestamp_ms_hi_inclusive: watermark.3 as u64,
91 }))
92 } else {
93 Ok(None)
94 }
95 }
96
97 async fn reader_watermark(
98 &mut self,
99 pipeline: &'static str,
100 ) -> anyhow::Result<Option<store::ReaderWatermark>> {
101 let watermark: Option<(i64, i64)> = watermarks::table
102 .select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
103 .filter(watermarks::pipeline.eq(pipeline))
104 .first(self)
105 .await
106 .optional()?;
107
108 if let Some(watermark) = watermark {
109 Ok(Some(store::ReaderWatermark {
110 checkpoint_hi_inclusive: watermark.0 as u64,
111 reader_lo: watermark.1 as u64,
112 }))
113 } else {
114 Ok(None)
115 }
116 }
117
118 async fn pruner_watermark(
119 &mut self,
120 pipeline: &'static str,
121 delay: Duration,
122 ) -> anyhow::Result<Option<store::PrunerWatermark>> {
123 let wait_for = sql!(as BigInt,
129 "CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
130 delay.as_millis() as i64,
131 );
132
133 let watermark: Option<(i64, i64, i64)> = watermarks::table
134 .select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
135 .filter(watermarks::pipeline.eq(pipeline))
136 .first(self)
137 .await
138 .optional()?;
139
140 if let Some(watermark) = watermark {
141 Ok(Some(store::PrunerWatermark {
142 wait_for_ms: watermark.0,
143 pruner_hi: watermark.1 as u64,
144 reader_lo: watermark.2 as u64,
145 }))
146 } else {
147 Ok(None)
148 }
149 }
150
151 async fn set_committer_watermark(
152 &mut self,
153 pipeline_task: &str,
154 watermark: store::CommitterWatermark,
155 ) -> anyhow::Result<bool> {
156 let stored_watermark = StoredWatermark {
158 pipeline: pipeline_task.to_string(),
159 epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
160 checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
161 tx_hi: watermark.tx_hi as i64,
162 timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
163 reader_lo: 0,
164 pruner_timestamp: NaiveDateTime::UNIX_EPOCH,
165 pruner_hi: 0,
166 };
167
168 use diesel::query_dsl::methods::FilterDsl;
169 Ok(diesel::insert_into(watermarks::table)
170 .values(&stored_watermark)
171 .on_conflict(watermarks::pipeline)
173 .do_update()
174 .set((
175 watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
176 watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
177 watermarks::tx_hi.eq(stored_watermark.tx_hi),
178 watermarks::timestamp_ms_hi_inclusive
179 .eq(stored_watermark.timestamp_ms_hi_inclusive),
180 ))
181 .filter(
182 watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
183 )
184 .execute(self)
185 .await?
186 > 0)
187 }
188
189 async fn set_reader_watermark(
190 &mut self,
191 pipeline: &'static str,
192 reader_lo: u64,
193 ) -> anyhow::Result<bool> {
194 Ok(diesel::update(watermarks::table)
195 .set((
196 watermarks::reader_lo.eq(reader_lo as i64),
197 watermarks::pruner_timestamp.eq(diesel::dsl::now),
198 ))
199 .filter(watermarks::pipeline.eq(pipeline))
200 .filter(watermarks::reader_lo.lt(reader_lo as i64))
201 .execute(self)
202 .await?
203 > 0)
204 }
205
206 async fn set_pruner_watermark(
207 &mut self,
208 pipeline: &'static str,
209 pruner_hi: u64,
210 ) -> anyhow::Result<bool> {
211 Ok(diesel::update(watermarks::table)
212 .set(watermarks::pruner_hi.eq(pruner_hi as i64))
213 .filter(watermarks::pipeline.eq(pipeline))
214 .execute(self)
215 .await?
216 > 0)
217 }
218}
219
220#[async_trait]
221impl store::Store for Db {
222 type Connection<'c> = Connection<'c>;
223
224 async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
225 Ok(Connection(self.0.get().await?))
226 }
227}
228
229#[async_trait]
230impl store::TransactionalStore for Db {
231 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
232 where
233 R: Send + 'a,
234 F: Send + 'a,
235 F: for<'r> FnOnce(
236 &'r mut Self::Connection<'_>,
237 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
238 {
239 let mut conn = self.connect().await?;
240 AsyncConnection::transaction(&mut conn, |conn| f(conn)).await
241 }
242}