sui_rpc_api/grpc/alpha/
list_authenticated_events.rs1#![allow(clippy::field_reassign_with_default)]
5
6use crate::RpcError;
7use crate::RpcService;
8use crate::grpc::alpha::event_service_proto::{
9 AuthenticatedEvent, ListAuthenticatedEventsRequest, ListAuthenticatedEventsResponse,
10};
11use bytes::Bytes;
12use prost::Message;
13use std::str::FromStr;
14use sui_macros::fail_point_if;
15use sui_rpc::proto::sui::rpc::v2::{Bcs, Event};
16use sui_types::base_types::SuiAddress;
17
18const MAX_PAGE_SIZE: u32 = 1000;
19const DEFAULT_PAGE_SIZE: u32 = 1000;
20const MAX_PAGE_SIZE_BYTES: usize = 512 * 1024; #[derive(serde::Serialize, serde::Deserialize)]
23struct PageToken {
24 stream_id: SuiAddress,
25 next_checkpoint: u64,
26 next_accumulator_version: u64,
27 next_transaction_idx: u32,
28 next_event_idx: u32,
29}
30
31fn to_grpc_event(ev: &sui_types::event::Event) -> Event {
32 let mut bcs = Bcs::default();
33 bcs.value = Some(ev.contents.clone().into());
34
35 let mut event = Event::default();
36 event.package_id = Some(ev.package_id.to_canonical_string(true));
37 event.module = Some(ev.transaction_module.to_string());
38 event.sender = Some(ev.sender.to_string());
39 event.event_type = Some(ev.type_.to_canonical_string(true));
40 event.contents = Some(bcs);
41 event
42}
43
44fn to_authenticated_event(
45 stream_id: &str,
46 cp: u64,
47 accumulator_version: u64,
48 transaction_idx: u32,
49 idx: u32,
50 ev: &sui_types::event::Event,
51) -> AuthenticatedEvent {
52 let mut authenticated_event = AuthenticatedEvent::default();
53 authenticated_event.checkpoint = Some(cp);
54 authenticated_event.accumulator_version = Some(accumulator_version);
55 authenticated_event.transaction_idx = Some(transaction_idx);
56 authenticated_event.event_idx = Some(idx);
57 authenticated_event.event = Some(to_grpc_event(ev));
58 authenticated_event.stream_id = Some(stream_id.to_string());
59 authenticated_event
60}
61
62fn decode_page_token(page_token: &[u8]) -> Result<PageToken, RpcError> {
63 bcs::from_bytes(page_token).map_err(|_| {
64 RpcError::new(
65 tonic::Code::InvalidArgument,
66 "invalid page_token".to_string(),
67 )
68 })
69}
70
71fn encode_page_token(page_token: PageToken) -> Bytes {
72 bcs::to_bytes(&page_token).unwrap().into()
73}
74
75#[tracing::instrument(skip(service))]
76pub fn list_authenticated_events(
77 service: &RpcService,
78 request: ListAuthenticatedEventsRequest,
79) -> Result<ListAuthenticatedEventsResponse, RpcError> {
80 if !service.config.authenticated_events_indexing() {
81 return Err(RpcError::new(
82 tonic::Code::Unimplemented,
83 "Authenticated events indexing is disabled".to_string(),
84 ));
85 }
86 let stream_id = request.stream_id.ok_or_else(|| {
87 RpcError::new(
88 tonic::Code::InvalidArgument,
89 "missing stream_id".to_string(),
90 )
91 })?;
92
93 if stream_id.trim().is_empty() {
94 return Err(RpcError::new(
95 tonic::Code::InvalidArgument,
96 "stream_id cannot be empty".to_string(),
97 ));
98 }
99
100 let stream_addr = SuiAddress::from_str(&stream_id).map_err(|e| {
101 RpcError::new(
102 tonic::Code::InvalidArgument,
103 format!("invalid stream_id: {e}"),
104 )
105 })?;
106
107 let page_size = request
108 .page_size
109 .map(|s| s.clamp(1, MAX_PAGE_SIZE))
110 .unwrap_or(DEFAULT_PAGE_SIZE);
111
112 let page_token = request
113 .page_token
114 .as_ref()
115 .map(|token| decode_page_token(token))
116 .transpose()?;
117
118 if let Some(token) = &page_token
119 && token.stream_id != stream_addr
120 {
121 return Err(RpcError::new(
122 tonic::Code::InvalidArgument,
123 "page_token stream_id mismatch".to_string(),
124 ));
125 }
126
127 let start = request.start_checkpoint.unwrap_or(0);
128
129 let reader = service.reader.inner();
130 let indexes = reader.indexes().ok_or_else(RpcError::not_found)?;
131
132 let highest_indexed = indexes
133 .get_highest_indexed_checkpoint_seq_number()
134 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?
135 .unwrap_or(0);
136
137 let lowest_available = reader
138 .get_lowest_available_checkpoint_objects()
139 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
140
141 if start < lowest_available {
142 return Err(RpcError::new(
143 tonic::Code::InvalidArgument,
144 format!(
145 "Requested start checkpoint {} has been pruned. Lowest available checkpoint is {}",
146 start, lowest_available
147 ),
148 ));
149 }
150
151 if start > highest_indexed {
152 let mut response = ListAuthenticatedEventsResponse::default();
153 response.events = vec![];
154 response.highest_indexed_checkpoint = Some(highest_indexed);
155 response.next_page_token = None;
156 return Ok(response);
157 }
158
159 let (actual_start, start_accumulator_version, start_transaction_idx, start_event_idx) =
160 if let Some(token) = &page_token {
161 (
162 token.next_checkpoint,
163 Some(token.next_accumulator_version),
164 Some(token.next_transaction_idx),
165 Some(token.next_event_idx),
166 )
167 } else {
168 (start, None, None, None)
169 };
170
171 let iter = indexes
172 .authenticated_event_iter(
173 stream_addr,
174 actual_start,
175 start_accumulator_version,
176 start_transaction_idx,
177 start_event_idx,
178 highest_indexed,
179 page_size + 1,
180 )
181 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
182
183 let mut events = Vec::new();
184 let mut size_bytes = 0;
185 let mut next_page_token = None;
186
187 for (i, event_result) in iter.enumerate() {
188 let (cp, accumulator_version, transaction_idx, event_idx, ev) =
189 event_result.map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
190
191 if i >= page_size as usize {
192 next_page_token = Some(encode_page_token(PageToken {
193 stream_id: stream_addr,
194 next_checkpoint: cp,
195 next_accumulator_version: accumulator_version,
196 next_transaction_idx: transaction_idx,
197 next_event_idx: event_idx,
198 }));
199 break;
200 }
201
202 #[allow(unused_mut)]
203 let mut authenticated_event = to_authenticated_event(
204 &stream_id,
205 cp,
206 accumulator_version,
207 transaction_idx,
208 event_idx,
209 &ev,
210 );
211
212 fail_point_if!("corrupt_authenticated_event", || {
213 if let Some(event) = authenticated_event.event.as_mut() {
214 if let Some(bcs) = event.contents.as_mut() {
215 bcs.value = Some(vec![0xDE, 0xAD, 0xBE, 0xEF].into());
216 }
217 }
218 });
219
220 let event_size = authenticated_event.encoded_len();
221
222 if i > 0 && size_bytes + event_size > MAX_PAGE_SIZE_BYTES {
223 next_page_token = Some(encode_page_token(PageToken {
224 stream_id: stream_addr,
225 next_checkpoint: cp,
226 next_accumulator_version: accumulator_version,
227 next_transaction_idx: transaction_idx,
228 next_event_idx: event_idx,
229 }));
230 break;
231 }
232
233 size_bytes += event_size;
234 events.push(authenticated_event);
235 }
236
237 let mut response = ListAuthenticatedEventsResponse::default();
238 response.events = events;
239 response.highest_indexed_checkpoint = Some(highest_indexed);
240 response.next_page_token = next_page_token.map(|t| t.into());
241 Ok(response)
242}