sui_rpc_api/grpc/alpha/
list_authenticated_events.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#![allow(clippy::field_reassign_with_default)]
5
6use crate::RpcError;
7use crate::RpcService;
8use crate::grpc::alpha::event_service_proto::{
9    AuthenticatedEvent, ListAuthenticatedEventsRequest, ListAuthenticatedEventsResponse,
10};
11use bytes::Bytes;
12use prost::Message;
13use std::str::FromStr;
14use sui_rpc::proto::sui::rpc::v2::{Bcs, Event};
15use sui_types::base_types::SuiAddress;
16
17const MAX_PAGE_SIZE: u32 = 1000;
18const DEFAULT_PAGE_SIZE: u32 = 1000;
19const MAX_PAGE_SIZE_BYTES: usize = 512 * 1024; // 512KiB
20
21#[derive(serde::Serialize, serde::Deserialize)]
22struct PageToken {
23    stream_id: SuiAddress,
24    start_checkpoint: u64,
25    last_event_checkpoint: u64,
26    last_event_transaction_idx: u32,
27    last_event_index: u32,
28}
29
30fn to_grpc_event(ev: &sui_types::event::Event) -> Event {
31    let mut bcs = Bcs::default();
32    bcs.value = Some(ev.contents.clone().into());
33
34    let mut event = Event::default();
35    event.package_id = Some(ev.package_id.to_canonical_string(true));
36    event.module = Some(ev.transaction_module.to_string());
37    event.sender = Some(ev.sender.to_string());
38    event.event_type = Some(ev.type_.to_canonical_string(true));
39    event.contents = Some(bcs);
40    event
41}
42
43fn to_authenticated_event(
44    stream_id: &str,
45    cp: u64,
46    transaction_idx: u32,
47    idx: u32,
48    ev: &sui_types::event::Event,
49) -> AuthenticatedEvent {
50    let mut authenticated_event = AuthenticatedEvent::default();
51    authenticated_event.checkpoint = Some(cp);
52    authenticated_event.transaction_idx = Some(transaction_idx);
53    authenticated_event.event_idx = Some(idx);
54    authenticated_event.event = Some(to_grpc_event(ev));
55    authenticated_event.stream_id = Some(stream_id.to_string());
56    authenticated_event
57}
58
59fn decode_page_token(page_token: &[u8]) -> Result<PageToken, RpcError> {
60    bcs::from_bytes(page_token).map_err(|_| {
61        RpcError::new(
62            tonic::Code::InvalidArgument,
63            "invalid page_token".to_string(),
64        )
65    })
66}
67
68fn encode_page_token(page_token: PageToken) -> Bytes {
69    bcs::to_bytes(&page_token).unwrap().into()
70}
71
72#[tracing::instrument(skip(service))]
73pub fn list_authenticated_events(
74    service: &RpcService,
75    request: ListAuthenticatedEventsRequest,
76) -> Result<ListAuthenticatedEventsResponse, RpcError> {
77    if !service.config.authenticated_events_indexing() {
78        return Err(RpcError::new(
79            tonic::Code::Unimplemented,
80            "Authenticated events indexing is disabled".to_string(),
81        ));
82    }
83    let stream_id = request.stream_id.ok_or_else(|| {
84        RpcError::new(
85            tonic::Code::InvalidArgument,
86            "missing stream_id".to_string(),
87        )
88    })?;
89
90    if stream_id.trim().is_empty() {
91        return Err(RpcError::new(
92            tonic::Code::InvalidArgument,
93            "stream_id cannot be empty".to_string(),
94        ));
95    }
96
97    let stream_addr = SuiAddress::from_str(&stream_id).map_err(|e| {
98        RpcError::new(
99            tonic::Code::InvalidArgument,
100            format!("invalid stream_id: {e}"),
101        )
102    })?;
103
104    let page_size = request
105        .page_size
106        .map(|s| s.clamp(1, MAX_PAGE_SIZE))
107        .unwrap_or(DEFAULT_PAGE_SIZE);
108
109    let page_token = request
110        .page_token
111        .as_ref()
112        .map(|token| decode_page_token(token))
113        .transpose()?;
114
115    if let Some(token) = &page_token
116        && token.stream_id != stream_addr
117    {
118        return Err(RpcError::new(
119            tonic::Code::InvalidArgument,
120            "page_token stream_id mismatch".to_string(),
121        ));
122    }
123
124    let start = request.start_checkpoint.unwrap_or(0);
125
126    let reader = service.reader.inner();
127    let indexes = reader.indexes().ok_or_else(RpcError::not_found)?;
128
129    let highest_indexed = indexes
130        .get_highest_indexed_checkpoint_seq_number()
131        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?
132        .unwrap_or(0);
133
134    let lowest_available = reader
135        .get_lowest_available_checkpoint_objects()
136        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
137
138    if start < lowest_available {
139        return Err(RpcError::new(
140            tonic::Code::InvalidArgument,
141            format!(
142                "Requested start checkpoint {} has been pruned. Lowest available checkpoint is {}",
143                start, lowest_available
144            ),
145        ));
146    }
147
148    if start > highest_indexed {
149        let mut response = ListAuthenticatedEventsResponse::default();
150        response.events = vec![];
151        response.highest_indexed_checkpoint = Some(highest_indexed);
152        response.next_page_token = None;
153        return Ok(response);
154    }
155
156    let start_transaction_idx = page_token.as_ref().map(|t| t.last_event_transaction_idx);
157    let start_event_idx = page_token.as_ref().map(|t| t.last_event_index);
158
159    let iter = indexes
160        .authenticated_event_iter(
161            stream_addr,
162            start,
163            start_transaction_idx,
164            start_event_idx,
165            highest_indexed,
166            page_size,
167        )
168        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
169
170    let mut events = Vec::new();
171    let mut size_bytes = 0;
172    let mut events_processed: u32 = 0;
173    let mut last_event_info: Option<(u64, u32, u32)> = None;
174
175    for event_result in iter {
176        let (cp, transaction_idx, event_idx, ev) =
177            event_result.map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
178
179        // Check if we've reached our size limit
180        if size_bytes >= MAX_PAGE_SIZE_BYTES {
181            break;
182        }
183
184        let authenticated_event =
185            to_authenticated_event(&stream_id, cp, transaction_idx, event_idx, &ev);
186        size_bytes += authenticated_event.encoded_len();
187        events.push(authenticated_event);
188        last_event_info = Some((cp, transaction_idx, event_idx));
189        events_processed += 1;
190    }
191
192    let next_page_token = if events_processed == page_size {
193        last_event_info.map(|(last_cp, last_tx_idx, last_ev_idx)| {
194            encode_page_token(PageToken {
195                stream_id: stream_addr,
196                start_checkpoint: start,
197                last_event_checkpoint: last_cp,
198                last_event_transaction_idx: last_tx_idx,
199                last_event_index: last_ev_idx,
200            })
201        })
202    } else {
203        None
204    };
205
206    let mut response = ListAuthenticatedEventsResponse::default();
207    response.events = events;
208    response.highest_indexed_checkpoint = Some(highest_indexed);
209    response.next_page_token = next_page_token.map(|token| token.to_vec());
210    Ok(response)
211}