sui_graphql_rpc/consistency.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_graphql::connection::CursorType;
5use serde::{Deserialize, Serialize};
6use sui_indexer::models::objects::StoredHistoryObject;
7
8use crate::raw_query::RawQuery;
9use crate::types::available_range::AvailableRange;
10use crate::types::cursor::{JsonCursor, Page, ScanLimited};
11use crate::types::object::Cursor;
12use crate::{filter, query};
13
14#[derive(Copy, Clone)]
15pub(crate) enum View {
16 /// Return objects that fulfill the filtering criteria, even if there are more recent versions
17 /// of the object within the checkpoint range. This is used for lookups such as by `object_id`
18 /// and `version`.
19 Historical,
20 /// Return objects that fulfill the filtering criteria and are the most recent version within
21 /// the checkpoint range.
22 Consistent,
23}
24
25/// The consistent cursor for an index into a `Vec` field is constructed from the index of the
26/// element and the checkpoint the cursor was constructed at.
27#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
28pub(crate) struct ConsistentIndexCursor {
29 #[serde(rename = "i")]
30 pub ix: usize,
31 /// The checkpoint sequence number at which the entity corresponding to this cursor was viewed at.
32 pub c: u64,
33}
34
35/// The consistent cursor for an index into a `Map` field is constructed from the name or key of the
36/// element and the checkpoint the cursor was constructed at.
37#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
38pub(crate) struct ConsistentNamedCursor {
39 #[serde(rename = "n")]
40 pub name: String,
41 /// The checkpoint sequence number at which the entity corresponding to this cursor was viewed at.
42 pub c: u64,
43}
44
45/// Trait for cursors that have a checkpoint sequence number associated with them.
46pub(crate) trait Checkpointed: CursorType {
47 fn checkpoint_viewed_at(&self) -> u64;
48}
49
50impl Checkpointed for JsonCursor<ConsistentIndexCursor> {
51 fn checkpoint_viewed_at(&self) -> u64 {
52 self.c
53 }
54}
55
56impl Checkpointed for JsonCursor<ConsistentNamedCursor> {
57 fn checkpoint_viewed_at(&self) -> u64 {
58 self.c
59 }
60}
61
62impl ScanLimited for JsonCursor<ConsistentIndexCursor> {}
63
64impl ScanLimited for JsonCursor<ConsistentNamedCursor> {}
65
66/// Constructs a `RawQuery` against the `objects_snapshot` and `objects_history` table to fetch
67/// objects that satisfy some filtering criteria `filter_fn` within the provided checkpoint `range`.
68/// The `objects_snapshot` table contains the latest versions of objects up to a checkpoint sequence
69/// number, and `objects_history` captures changes after that, so a query to both tables is
70/// necessary to handle these object states:
71/// 1) In snapshot, not in history - occurs when a live object gets snapshotted and then has not been
72/// modified since
73/// 2) Not in snapshot, in history - occurs when a new object is created or a wrapped object is unwrapped
74/// 3) In snapshot and in history - occurs when an object is snapshotted and further modified, the modification
75/// can be wrapping or deleting.
76///
77/// Additionally, even among objects that satisfy the filtering criteria, it is possible that there
78/// is a yet more recent version of the object within the checkpoint range, such as when the owner
79/// of an object changes. The `LEFT JOIN` against the `objects_history` table handles this and
80/// scenario 3. Note that the implementation applies the `LEFT JOIN` to each inner query in
81/// conjunction with the `page`'s cursor and limit. If this was instead done once at the end, the
82/// query would be drastically inefficient as we would be dealing with a large number of rows from
83/// `objects_snapshot`, and potentially `objects_history` as the checkpoint range grows. Instead,
84/// the `LEFT JOIN` and limit applied on the inner queries work in conjunction to make the final
85/// query noticeably more efficient. The former serves as a filter, and the latter reduces the
86/// number of rows that the database needs to work with.
87///
88/// However, not all queries require this `LEFT JOIN`, such as when no filtering criteria is
89/// specified, or if the filter is a lookup at a specific `object_id` and `object_version`. This is
90/// controlled by the `view` parameter. If the `view` parameter is set to `Consistent`, this filter
91/// is applied, otherwise if the `view` parameter is set to `Historical`, this filter is not
92/// applied.
93///
94/// Finally, the two queries are merged together with `UNION ALL`. We use `UNION ALL` instead of
95/// `UNION`; the latter incurs significant overhead as it additionally de-duplicates records from
96/// both sources. This dedupe is unnecessary, since we have the fragment `SELECT DISTINCT ON
97/// (object_id) ... ORDER BY object_id, object_version DESC`. This is also redundant for the most
98/// part, due to the invariant that the `objects_history` captures changes that occur after
99/// `objects_snapshot`, but it's a safeguard to handle any possible overlap during snapshot
100/// creation.
101pub(crate) fn build_objects_query(
102 view: View,
103 range: AvailableRange,
104 page: &Page<Cursor>,
105 filter_fn: impl Fn(RawQuery) -> RawQuery,
106 newer_criteria: impl Fn(RawQuery) -> RawQuery,
107) -> RawQuery {
108 // Subquery to be used in `LEFT JOIN` against the inner queries for more recent object versions
109 let newer = newer_criteria(filter!(
110 query!("SELECT object_id, object_version FROM objects_history"),
111 format!(
112 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
113 range.first, range.last
114 )
115 ));
116
117 let mut snapshot_objs_inner = query!("SELECT * FROM objects_snapshot");
118 snapshot_objs_inner = filter_fn(snapshot_objs_inner);
119
120 let mut snapshot_objs = match view {
121 View::Consistent => {
122 // The `LEFT JOIN` serves as a filter to remove objects that have a more recent version
123 let mut snapshot_objs = query!(
124 r#"SELECT candidates.* FROM ({}) candidates
125 LEFT JOIN ({}) newer
126 ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
127 snapshot_objs_inner,
128 newer.clone()
129 );
130 snapshot_objs = filter!(snapshot_objs, "newer.object_version IS NULL");
131 snapshot_objs
132 }
133 View::Historical => {
134 // The cursor pagination logic refers to the table with the `candidates` alias
135 query!(
136 "SELECT candidates.* FROM ({}) candidates",
137 snapshot_objs_inner
138 )
139 }
140 };
141
142 // Always apply cursor pagination and limit to constrain the number of rows returned, ensure
143 // that the inner queries are in step, and to handle the scenario where a user provides more
144 // `objectKeys` than allowed by the maximum page size.
145 snapshot_objs = page.apply::<StoredHistoryObject>(snapshot_objs);
146
147 // Similar to the snapshot query, construct the filtered inner query for the history table.
148 let mut history_objs_inner = query!("SELECT * FROM objects_history");
149 history_objs_inner = filter_fn(history_objs_inner);
150 history_objs_inner = filter!(history_objs_inner, "object_status = 0");
151
152 let mut history_objs = match view {
153 View::Consistent => {
154 // Additionally bound the inner `objects_history` query by the checkpoint range
155 history_objs_inner = filter!(
156 history_objs_inner,
157 format!(
158 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
159 range.first, range.last
160 )
161 );
162
163 let mut history_objs = query!(
164 r#"SELECT candidates.* FROM ({}) candidates
165 LEFT JOIN ({}) newer
166 ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
167 history_objs_inner,
168 newer
169 );
170 history_objs = filter!(history_objs, "newer.object_version IS NULL");
171 history_objs
172 }
173 View::Historical => {
174 // The cursor pagination logic refers to the table with the `candidates` alias
175 query!(
176 "SELECT candidates.* FROM ({}) candidates",
177 history_objs_inner
178 )
179 }
180 };
181
182 // Always apply cursor pagination and limit to constrain the number of rows returned, ensure
183 // that the inner queries are in step, and to handle the scenario where a user provides more
184 // `objectKeys` than allowed by the maximum page size.
185 history_objs = page.apply::<StoredHistoryObject>(history_objs);
186
187 // Combine the two queries, and select the most recent version of each object. The result set is
188 // the most recent version of objects from `objects_snapshot` and `objects_history` that match
189 // the filter criteria.
190 let query = query!(
191 r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) candidates"#,
192 snapshot_objs,
193 history_objs
194 )
195 .order_by("object_id")
196 .order_by("object_version DESC");
197
198 query!("SELECT * FROM ({}) candidates", query)
199}