sui_rpc_api/grpc/alpha/
list_authenticated_events.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#![allow(clippy::field_reassign_with_default)]
5
6use crate::RpcError;
7use crate::RpcService;
8use crate::grpc::alpha::event_service_proto::{
9    AuthenticatedEvent, ListAuthenticatedEventsRequest, ListAuthenticatedEventsResponse,
10};
11use bytes::Bytes;
12use prost::Message;
13use std::str::FromStr;
14use sui_macros::fail_point_if;
15use sui_rpc::proto::sui::rpc::v2::{Bcs, Event};
16use sui_types::base_types::SuiAddress;
17
18const MAX_PAGE_SIZE: u32 = 1000;
19const DEFAULT_PAGE_SIZE: u32 = 1000;
20const MAX_PAGE_SIZE_BYTES: usize = 512 * 1024; // 512KiB
21
22#[derive(serde::Serialize, serde::Deserialize)]
23struct PageToken {
24    stream_id: SuiAddress,
25    next_checkpoint: u64,
26    next_accumulator_version: u64,
27    next_transaction_idx: u32,
28    next_event_idx: u32,
29}
30
31fn to_grpc_event(ev: &sui_types::event::Event) -> Event {
32    let mut bcs = Bcs::default();
33    bcs.value = Some(ev.contents.clone().into());
34
35    let mut event = Event::default();
36    event.package_id = Some(ev.package_id.to_canonical_string(true));
37    event.module = Some(ev.transaction_module.to_string());
38    event.sender = Some(ev.sender.to_string());
39    event.event_type = Some(ev.type_.to_canonical_string(true));
40    event.contents = Some(bcs);
41    event
42}
43
44fn to_authenticated_event(
45    stream_id: &str,
46    cp: u64,
47    accumulator_version: u64,
48    transaction_idx: u32,
49    idx: u32,
50    ev: &sui_types::event::Event,
51) -> AuthenticatedEvent {
52    let mut authenticated_event = AuthenticatedEvent::default();
53    authenticated_event.checkpoint = Some(cp);
54    authenticated_event.accumulator_version = Some(accumulator_version);
55    authenticated_event.transaction_idx = Some(transaction_idx);
56    authenticated_event.event_idx = Some(idx);
57    authenticated_event.event = Some(to_grpc_event(ev));
58    authenticated_event.stream_id = Some(stream_id.to_string());
59    authenticated_event
60}
61
62fn decode_page_token(page_token: &[u8]) -> Result<PageToken, RpcError> {
63    bcs::from_bytes(page_token).map_err(|_| {
64        RpcError::new(
65            tonic::Code::InvalidArgument,
66            "invalid page_token".to_string(),
67        )
68    })
69}
70
71fn encode_page_token(page_token: PageToken) -> Bytes {
72    bcs::to_bytes(&page_token).unwrap().into()
73}
74
75#[tracing::instrument(skip(service))]
76pub fn list_authenticated_events(
77    service: &RpcService,
78    request: ListAuthenticatedEventsRequest,
79) -> Result<ListAuthenticatedEventsResponse, RpcError> {
80    if !service.config.authenticated_events_indexing() {
81        return Err(RpcError::new(
82            tonic::Code::Unimplemented,
83            "Authenticated events indexing is disabled".to_string(),
84        ));
85    }
86    let stream_id = request.stream_id.ok_or_else(|| {
87        RpcError::new(
88            tonic::Code::InvalidArgument,
89            "missing stream_id".to_string(),
90        )
91    })?;
92
93    if stream_id.trim().is_empty() {
94        return Err(RpcError::new(
95            tonic::Code::InvalidArgument,
96            "stream_id cannot be empty".to_string(),
97        ));
98    }
99
100    let stream_addr = SuiAddress::from_str(&stream_id).map_err(|e| {
101        RpcError::new(
102            tonic::Code::InvalidArgument,
103            format!("invalid stream_id: {e}"),
104        )
105    })?;
106
107    let page_size = request
108        .page_size
109        .map(|s| s.clamp(1, MAX_PAGE_SIZE))
110        .unwrap_or(DEFAULT_PAGE_SIZE);
111
112    let page_token = request
113        .page_token
114        .as_ref()
115        .map(|token| decode_page_token(token))
116        .transpose()?;
117
118    if let Some(token) = &page_token
119        && token.stream_id != stream_addr
120    {
121        return Err(RpcError::new(
122            tonic::Code::InvalidArgument,
123            "page_token stream_id mismatch".to_string(),
124        ));
125    }
126
127    let start = request.start_checkpoint.unwrap_or(0);
128
129    let reader = service.reader.inner();
130    let indexes = reader.indexes().ok_or_else(RpcError::not_found)?;
131
132    let highest_indexed = indexes
133        .get_highest_indexed_checkpoint_seq_number()
134        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?
135        .unwrap_or(0);
136
137    let lowest_available = reader
138        .get_lowest_available_checkpoint_objects()
139        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
140
141    if start < lowest_available {
142        return Err(RpcError::new(
143            tonic::Code::InvalidArgument,
144            format!(
145                "Requested start checkpoint {} has been pruned. Lowest available checkpoint is {}",
146                start, lowest_available
147            ),
148        ));
149    }
150
151    if start > highest_indexed {
152        let mut response = ListAuthenticatedEventsResponse::default();
153        response.events = vec![];
154        response.highest_indexed_checkpoint = Some(highest_indexed);
155        response.next_page_token = None;
156        return Ok(response);
157    }
158
159    let (actual_start, start_accumulator_version, start_transaction_idx, start_event_idx) =
160        if let Some(token) = &page_token {
161            (
162                token.next_checkpoint,
163                Some(token.next_accumulator_version),
164                Some(token.next_transaction_idx),
165                Some(token.next_event_idx),
166            )
167        } else {
168            (start, None, None, None)
169        };
170
171    let iter = indexes
172        .authenticated_event_iter(
173            stream_addr,
174            actual_start,
175            start_accumulator_version,
176            start_transaction_idx,
177            start_event_idx,
178            highest_indexed,
179            page_size + 1,
180        )
181        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
182
183    let mut events = Vec::new();
184    let mut size_bytes = 0;
185    let mut next_page_token = None;
186
187    for (i, event_result) in iter.enumerate() {
188        let (cp, accumulator_version, transaction_idx, event_idx, ev) =
189            event_result.map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
190
191        if i >= page_size as usize {
192            next_page_token = Some(encode_page_token(PageToken {
193                stream_id: stream_addr,
194                next_checkpoint: cp,
195                next_accumulator_version: accumulator_version,
196                next_transaction_idx: transaction_idx,
197                next_event_idx: event_idx,
198            }));
199            break;
200        }
201
202        #[allow(unused_mut)]
203        let mut authenticated_event = to_authenticated_event(
204            &stream_id,
205            cp,
206            accumulator_version,
207            transaction_idx,
208            event_idx,
209            &ev,
210        );
211
212        fail_point_if!("corrupt_authenticated_event", || {
213            if let Some(event) = authenticated_event.event.as_mut() {
214                if let Some(bcs) = event.contents.as_mut() {
215                    bcs.value = Some(vec![0xDE, 0xAD, 0xBE, 0xEF].into());
216                }
217            }
218        });
219
220        let event_size = authenticated_event.encoded_len();
221
222        if i > 0 && size_bytes + event_size > MAX_PAGE_SIZE_BYTES {
223            next_page_token = Some(encode_page_token(PageToken {
224                stream_id: stream_addr,
225                next_checkpoint: cp,
226                next_accumulator_version: accumulator_version,
227                next_transaction_idx: transaction_idx,
228                next_event_idx: event_idx,
229            }));
230            break;
231        }
232
233        size_bytes += event_size;
234        events.push(authenticated_event);
235    }
236
237    let mut response = ListAuthenticatedEventsResponse::default();
238    response.events = events;
239    response.highest_indexed_checkpoint = Some(highest_indexed);
240    response.next_page_token = next_page_token.map(|t| t.into());
241    Ok(response)
242}