sui_graphql_rpc/
consistency.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_graphql::connection::CursorType;
5use serde::{Deserialize, Serialize};
6use sui_indexer::models::objects::StoredHistoryObject;
7
8use crate::raw_query::RawQuery;
9use crate::types::available_range::AvailableRange;
10use crate::types::cursor::{JsonCursor, Page, ScanLimited};
11use crate::types::object::Cursor;
12use crate::{filter, query};
13
14#[derive(Copy, Clone)]
15pub(crate) enum View {
16    /// Return objects that fulfill the filtering criteria, even if there are more recent versions
17    /// of the object within the checkpoint range. This is used for lookups such as by `object_id`
18    /// and `version`.
19    Historical,
20    /// Return objects that fulfill the filtering criteria and are the most recent version within
21    /// the checkpoint range.
22    Consistent,
23}
24
25/// The consistent cursor for an index into a `Vec` field is constructed from the index of the
26/// element and the checkpoint the cursor was constructed at.
27#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
28pub(crate) struct ConsistentIndexCursor {
29    #[serde(rename = "i")]
30    pub ix: usize,
31    /// The checkpoint sequence number at which the entity corresponding to this cursor was viewed at.
32    pub c: u64,
33}
34
35/// The consistent cursor for an index into a `Map` field is constructed from the name or key of the
36/// element and the checkpoint the cursor was constructed at.
37#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
38pub(crate) struct ConsistentNamedCursor {
39    #[serde(rename = "n")]
40    pub name: String,
41    /// The checkpoint sequence number at which the entity corresponding to this cursor was viewed at.
42    pub c: u64,
43}
44
45/// Trait for cursors that have a checkpoint sequence number associated with them.
46pub(crate) trait Checkpointed: CursorType {
47    fn checkpoint_viewed_at(&self) -> u64;
48}
49
50impl Checkpointed for JsonCursor<ConsistentIndexCursor> {
51    fn checkpoint_viewed_at(&self) -> u64 {
52        self.c
53    }
54}
55
56impl Checkpointed for JsonCursor<ConsistentNamedCursor> {
57    fn checkpoint_viewed_at(&self) -> u64 {
58        self.c
59    }
60}
61
62impl ScanLimited for JsonCursor<ConsistentIndexCursor> {}
63
64impl ScanLimited for JsonCursor<ConsistentNamedCursor> {}
65
66/// Constructs a `RawQuery` against the `objects_snapshot` and `objects_history` table to fetch
67/// objects that satisfy some filtering criteria `filter_fn` within the provided checkpoint `range`.
68/// The `objects_snapshot` table contains the latest versions of objects up to a checkpoint sequence
69/// number, and `objects_history` captures changes after that, so a query to both tables is
70/// necessary to handle these object states:
71/// 1) In snapshot, not in history - occurs when a live object gets snapshotted and then has not been
72///    modified since
73/// 2) Not in snapshot, in history - occurs when a new object is created or a wrapped object is unwrapped
74/// 3) In snapshot and in history - occurs when an object is snapshotted and further modified, the modification
75///    can be wrapping or deleting.
76///
77/// Additionally, even among objects that satisfy the filtering criteria, it is possible that there
78/// is a yet more recent version of the object within the checkpoint range, such as when the owner
79/// of an object changes. The `LEFT JOIN` against the `objects_history` table handles this and
80/// scenario 3. Note that the implementation applies the `LEFT JOIN` to each inner query in
81/// conjunction with the `page`'s cursor and limit. If this was instead done once at the end, the
82/// query would be drastically inefficient as we would be dealing with a large number of rows from
83/// `objects_snapshot`, and potentially `objects_history` as the checkpoint range grows. Instead,
84/// the `LEFT JOIN` and limit applied on the inner queries work in conjunction to make the final
85/// query noticeably more efficient. The former serves as a filter, and the latter reduces the
86/// number of rows that the database needs to work with.
87///
88/// However, not all queries require this `LEFT JOIN`, such as when no filtering criteria is
89/// specified, or if the filter is a lookup at a specific `object_id` and `object_version`. This is
90/// controlled by the `view` parameter. If the `view` parameter is set to `Consistent`, this filter
91/// is applied, otherwise if the `view` parameter is set to `Historical`, this filter is not
92/// applied.
93///
94/// Finally, the two queries are merged together with `UNION ALL`. We use `UNION ALL` instead of
95/// `UNION`; the latter incurs significant overhead as it additionally de-duplicates records from
96/// both sources. This dedupe is unnecessary, since we have the fragment `SELECT DISTINCT ON
97/// (object_id) ... ORDER BY object_id, object_version DESC`. This is also redundant for the most
98/// part, due to the invariant that the `objects_history` captures changes that occur after
99/// `objects_snapshot`, but it's a safeguard to handle any possible overlap during snapshot
100/// creation.
101pub(crate) fn build_objects_query(
102    view: View,
103    range: AvailableRange,
104    page: &Page<Cursor>,
105    filter_fn: impl Fn(RawQuery) -> RawQuery,
106    newer_criteria: impl Fn(RawQuery) -> RawQuery,
107) -> RawQuery {
108    // Subquery to be used in `LEFT JOIN` against the inner queries for more recent object versions
109    let newer = newer_criteria(filter!(
110        query!("SELECT object_id, object_version FROM objects_history"),
111        format!(
112            r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
113            range.first, range.last
114        )
115    ));
116
117    let mut snapshot_objs_inner = query!("SELECT * FROM objects_snapshot");
118    snapshot_objs_inner = filter_fn(snapshot_objs_inner);
119
120    let mut snapshot_objs = match view {
121        View::Consistent => {
122            // The `LEFT JOIN` serves as a filter to remove objects that have a more recent version
123            let mut snapshot_objs = query!(
124                r#"SELECT candidates.* FROM ({}) candidates
125                    LEFT JOIN ({}) newer
126                    ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
127                snapshot_objs_inner,
128                newer.clone()
129            );
130            snapshot_objs = filter!(snapshot_objs, "newer.object_version IS NULL");
131            snapshot_objs
132        }
133        View::Historical => {
134            // The cursor pagination logic refers to the table with the `candidates` alias
135            query!(
136                "SELECT candidates.* FROM ({}) candidates",
137                snapshot_objs_inner
138            )
139        }
140    };
141
142    // Always apply cursor pagination and limit to constrain the number of rows returned, ensure
143    // that the inner queries are in step, and to handle the scenario where a user provides more
144    // `objectKeys` than allowed by the maximum page size.
145    snapshot_objs = page.apply::<StoredHistoryObject>(snapshot_objs);
146
147    // Similar to the snapshot query, construct the filtered inner query for the history table.
148    let mut history_objs_inner = query!("SELECT * FROM objects_history");
149    history_objs_inner = filter_fn(history_objs_inner);
150    history_objs_inner = filter!(history_objs_inner, "object_status = 0");
151
152    let mut history_objs = match view {
153        View::Consistent => {
154            // Additionally bound the inner `objects_history` query by the checkpoint range
155            history_objs_inner = filter!(
156                history_objs_inner,
157                format!(
158                    r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
159                    range.first, range.last
160                )
161            );
162
163            let mut history_objs = query!(
164                r#"SELECT candidates.* FROM ({}) candidates
165                    LEFT JOIN ({}) newer
166                    ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
167                history_objs_inner,
168                newer
169            );
170            history_objs = filter!(history_objs, "newer.object_version IS NULL");
171            history_objs
172        }
173        View::Historical => {
174            // The cursor pagination logic refers to the table with the `candidates` alias
175            query!(
176                "SELECT candidates.* FROM ({}) candidates",
177                history_objs_inner
178            )
179        }
180    };
181
182    // Always apply cursor pagination and limit to constrain the number of rows returned, ensure
183    // that the inner queries are in step, and to handle the scenario where a user provides more
184    // `objectKeys` than allowed by the maximum page size.
185    history_objs = page.apply::<StoredHistoryObject>(history_objs);
186
187    // Combine the two queries, and select the most recent version of each object. The result set is
188    // the most recent version of objects from `objects_snapshot` and `objects_history` that match
189    // the filter criteria.
190    let query = query!(
191        r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) candidates"#,
192        snapshot_objs,
193        history_objs
194    )
195    .order_by("object_id")
196    .order_by("object_version DESC");
197
198    query!("SELECT * FROM ({}) candidates", query)
199}