sui_graphql_rpc/consistency.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use async_graphql::connection::CursorType;
use serde::{Deserialize, Serialize};
use sui_indexer::models::objects::StoredHistoryObject;
use crate::raw_query::RawQuery;
use crate::types::available_range::AvailableRange;
use crate::types::cursor::{JsonCursor, Page, ScanLimited};
use crate::types::object::Cursor;
use crate::{filter, query};
#[derive(Copy, Clone)]
pub(crate) enum View {
/// Return objects that fulfill the filtering criteria, even if there are more recent versions
/// of the object within the checkpoint range. This is used for lookups such as by `object_id`
/// and `version`.
Historical,
/// Return objects that fulfill the filtering criteria and are the most recent version within
/// the checkpoint range.
Consistent,
}
/// The consistent cursor for an index into a `Vec` field is constructed from the index of the
/// element and the checkpoint the cursor was constructed at.
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
pub(crate) struct ConsistentIndexCursor {
#[serde(rename = "i")]
pub ix: usize,
/// The checkpoint sequence number at which the entity corresponding to this cursor was viewed at.
pub c: u64,
}
/// The consistent cursor for an index into a `Map` field is constructed from the name or key of the
/// element and the checkpoint the cursor was constructed at.
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
pub(crate) struct ConsistentNamedCursor {
#[serde(rename = "n")]
pub name: String,
/// The checkpoint sequence number at which the entity corresponding to this cursor was viewed at.
pub c: u64,
}
/// Trait for cursors that have a checkpoint sequence number associated with them.
pub(crate) trait Checkpointed: CursorType {
fn checkpoint_viewed_at(&self) -> u64;
}
impl Checkpointed for JsonCursor<ConsistentIndexCursor> {
fn checkpoint_viewed_at(&self) -> u64 {
self.c
}
}
impl Checkpointed for JsonCursor<ConsistentNamedCursor> {
fn checkpoint_viewed_at(&self) -> u64 {
self.c
}
}
impl ScanLimited for JsonCursor<ConsistentIndexCursor> {}
impl ScanLimited for JsonCursor<ConsistentNamedCursor> {}
/// Constructs a `RawQuery` against the `objects_snapshot` and `objects_history` table to fetch
/// objects that satisfy some filtering criteria `filter_fn` within the provided checkpoint `range`.
/// The `objects_snapshot` table contains the latest versions of objects up to a checkpoint sequence
/// number, and `objects_history` captures changes after that, so a query to both tables is
/// necessary to handle these object states:
/// 1) In snapshot, not in history - occurs when a live object gets snapshotted and then has not been
/// modified since
/// 2) Not in snapshot, in history - occurs when a new object is created or a wrapped object is unwrapped
/// 3) In snapshot and in history - occurs when an object is snapshotted and further modified, the modification
/// can be wrapping or deleting.
///
/// Additionally, even among objects that satisfy the filtering criteria, it is possible that there
/// is a yet more recent version of the object within the checkpoint range, such as when the owner
/// of an object changes. The `LEFT JOIN` against the `objects_history` table handles this and
/// scenario 3. Note that the implementation applies the `LEFT JOIN` to each inner query in
/// conjunction with the `page`'s cursor and limit. If this was instead done once at the end, the
/// query would be drastically inefficient as we would be dealing with a large number of rows from
/// `objects_snapshot`, and potentially `objects_history` as the checkpoint range grows. Instead,
/// the `LEFT JOIN` and limit applied on the inner queries work in conjunction to make the final
/// query noticeably more efficient. The former serves as a filter, and the latter reduces the
/// number of rows that the database needs to work with.
///
/// However, not all queries require this `LEFT JOIN`, such as when no filtering criteria is
/// specified, or if the filter is a lookup at a specific `object_id` and `object_version`. This is
/// controlled by the `view` parameter. If the `view` parameter is set to `Consistent`, this filter
/// is applied, otherwise if the `view` parameter is set to `Historical`, this filter is not
/// applied.
///
/// Finally, the two queries are merged together with `UNION ALL`. We use `UNION ALL` instead of
/// `UNION`; the latter incurs significant overhead as it additionally de-duplicates records from
/// both sources. This dedupe is unnecessary, since we have the fragment `SELECT DISTINCT ON
/// (object_id) ... ORDER BY object_id, object_version DESC`. This is also redundant for the most
/// part, due to the invariant that the `objects_history` captures changes that occur after
/// `objects_snapshot`, but it's a safeguard to handle any possible overlap during snapshot
/// creation.
pub(crate) fn build_objects_query(
view: View,
range: AvailableRange,
page: &Page<Cursor>,
filter_fn: impl Fn(RawQuery) -> RawQuery,
newer_criteria: impl Fn(RawQuery) -> RawQuery,
) -> RawQuery {
// Subquery to be used in `LEFT JOIN` against the inner queries for more recent object versions
let newer = newer_criteria(filter!(
query!("SELECT object_id, object_version FROM objects_history"),
format!(
r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
range.first, range.last
)
));
let mut snapshot_objs_inner = query!("SELECT * FROM objects_snapshot");
snapshot_objs_inner = filter_fn(snapshot_objs_inner);
let mut snapshot_objs = match view {
View::Consistent => {
// The `LEFT JOIN` serves as a filter to remove objects that have a more recent version
let mut snapshot_objs = query!(
r#"SELECT candidates.* FROM ({}) candidates
LEFT JOIN ({}) newer
ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
snapshot_objs_inner,
newer.clone()
);
snapshot_objs = filter!(snapshot_objs, "newer.object_version IS NULL");
snapshot_objs
}
View::Historical => {
// The cursor pagination logic refers to the table with the `candidates` alias
query!(
"SELECT candidates.* FROM ({}) candidates",
snapshot_objs_inner
)
}
};
// Always apply cursor pagination and limit to constrain the number of rows returned, ensure
// that the inner queries are in step, and to handle the scenario where a user provides more
// `objectKeys` than allowed by the maximum page size.
snapshot_objs = page.apply::<StoredHistoryObject>(snapshot_objs);
// Similar to the snapshot query, construct the filtered inner query for the history table.
let mut history_objs_inner = query!("SELECT * FROM objects_history");
history_objs_inner = filter_fn(history_objs_inner);
history_objs_inner = filter!(history_objs_inner, "object_status = 0");
let mut history_objs = match view {
View::Consistent => {
// Additionally bound the inner `objects_history` query by the checkpoint range
history_objs_inner = filter!(
history_objs_inner,
format!(
r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
range.first, range.last
)
);
let mut history_objs = query!(
r#"SELECT candidates.* FROM ({}) candidates
LEFT JOIN ({}) newer
ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
history_objs_inner,
newer
);
history_objs = filter!(history_objs, "newer.object_version IS NULL");
history_objs
}
View::Historical => {
// The cursor pagination logic refers to the table with the `candidates` alias
query!(
"SELECT candidates.* FROM ({}) candidates",
history_objs_inner
)
}
};
// Always apply cursor pagination and limit to constrain the number of rows returned, ensure
// that the inner queries are in step, and to handle the scenario where a user provides more
// `objectKeys` than allowed by the maximum page size.
history_objs = page.apply::<StoredHistoryObject>(history_objs);
// Combine the two queries, and select the most recent version of each object. The result set is
// the most recent version of objects from `objects_snapshot` and `objects_history` that match
// the filter criteria.
let query = query!(
r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) candidates"#,
snapshot_objs,
history_objs
)
.order_by("object_id")
.order_by("object_version DESC");
query!("SELECT * FROM ({}) candidates", query)
}