sui_rpc_api/grpc/alpha/
list_authenticated_events.rs1#![allow(clippy::field_reassign_with_default)]
5
6use crate::RpcError;
7use crate::RpcService;
8use crate::grpc::alpha::event_service_proto::{
9 AuthenticatedEvent, ListAuthenticatedEventsRequest, ListAuthenticatedEventsResponse,
10};
11use bytes::Bytes;
12use prost::Message;
13use std::str::FromStr;
14use sui_rpc::proto::sui::rpc::v2::{Bcs, Event};
15use sui_types::base_types::SuiAddress;
16
17const MAX_PAGE_SIZE: u32 = 1000;
18const DEFAULT_PAGE_SIZE: u32 = 1000;
19const MAX_PAGE_SIZE_BYTES: usize = 512 * 1024; #[derive(serde::Serialize, serde::Deserialize)]
22struct PageToken {
23 stream_id: SuiAddress,
24 next_checkpoint: u64,
25 next_accumulator_version: u64,
26 next_transaction_idx: u32,
27 next_event_idx: u32,
28}
29
30fn to_grpc_event(ev: &sui_types::event::Event) -> Event {
31 let mut bcs = Bcs::default();
32 bcs.value = Some(ev.contents.clone().into());
33
34 let mut event = Event::default();
35 event.package_id = Some(ev.package_id.to_canonical_string(true));
36 event.module = Some(ev.transaction_module.to_string());
37 event.sender = Some(ev.sender.to_string());
38 event.event_type = Some(ev.type_.to_canonical_string(true));
39 event.contents = Some(bcs);
40 event
41}
42
43fn to_authenticated_event(
44 stream_id: &str,
45 cp: u64,
46 accumulator_version: u64,
47 transaction_idx: u32,
48 idx: u32,
49 ev: &sui_types::event::Event,
50) -> AuthenticatedEvent {
51 let mut authenticated_event = AuthenticatedEvent::default();
52 authenticated_event.checkpoint = Some(cp);
53 authenticated_event.accumulator_version = Some(accumulator_version);
54 authenticated_event.transaction_idx = Some(transaction_idx);
55 authenticated_event.event_idx = Some(idx);
56 authenticated_event.event = Some(to_grpc_event(ev));
57 authenticated_event.stream_id = Some(stream_id.to_string());
58 authenticated_event
59}
60
61fn decode_page_token(page_token: &[u8]) -> Result<PageToken, RpcError> {
62 bcs::from_bytes(page_token).map_err(|_| {
63 RpcError::new(
64 tonic::Code::InvalidArgument,
65 "invalid page_token".to_string(),
66 )
67 })
68}
69
70fn encode_page_token(page_token: PageToken) -> Bytes {
71 bcs::to_bytes(&page_token).unwrap().into()
72}
73
74#[tracing::instrument(skip(service))]
75pub fn list_authenticated_events(
76 service: &RpcService,
77 request: ListAuthenticatedEventsRequest,
78) -> Result<ListAuthenticatedEventsResponse, RpcError> {
79 if !service.config.authenticated_events_indexing() {
80 return Err(RpcError::new(
81 tonic::Code::Unimplemented,
82 "Authenticated events indexing is disabled".to_string(),
83 ));
84 }
85 let stream_id = request.stream_id.ok_or_else(|| {
86 RpcError::new(
87 tonic::Code::InvalidArgument,
88 "missing stream_id".to_string(),
89 )
90 })?;
91
92 if stream_id.trim().is_empty() {
93 return Err(RpcError::new(
94 tonic::Code::InvalidArgument,
95 "stream_id cannot be empty".to_string(),
96 ));
97 }
98
99 let stream_addr = SuiAddress::from_str(&stream_id).map_err(|e| {
100 RpcError::new(
101 tonic::Code::InvalidArgument,
102 format!("invalid stream_id: {e}"),
103 )
104 })?;
105
106 let page_size = request
107 .page_size
108 .map(|s| s.clamp(1, MAX_PAGE_SIZE))
109 .unwrap_or(DEFAULT_PAGE_SIZE);
110
111 let page_token = request
112 .page_token
113 .as_ref()
114 .map(|token| decode_page_token(token))
115 .transpose()?;
116
117 if let Some(token) = &page_token
118 && token.stream_id != stream_addr
119 {
120 return Err(RpcError::new(
121 tonic::Code::InvalidArgument,
122 "page_token stream_id mismatch".to_string(),
123 ));
124 }
125
126 let start = request.start_checkpoint.unwrap_or(0);
127
128 let reader = service.reader.inner();
129 let indexes = reader.indexes().ok_or_else(RpcError::not_found)?;
130
131 let highest_indexed = indexes
132 .get_highest_indexed_checkpoint_seq_number()
133 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?
134 .unwrap_or(0);
135
136 let lowest_available = reader
137 .get_lowest_available_checkpoint_objects()
138 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
139
140 if start < lowest_available {
141 return Err(RpcError::new(
142 tonic::Code::InvalidArgument,
143 format!(
144 "Requested start checkpoint {} has been pruned. Lowest available checkpoint is {}",
145 start, lowest_available
146 ),
147 ));
148 }
149
150 if start > highest_indexed {
151 let mut response = ListAuthenticatedEventsResponse::default();
152 response.events = vec![];
153 response.highest_indexed_checkpoint = Some(highest_indexed);
154 response.next_page_token = None;
155 return Ok(response);
156 }
157
158 let (actual_start, start_accumulator_version, start_transaction_idx, start_event_idx) =
159 if let Some(token) = &page_token {
160 (
161 token.next_checkpoint,
162 Some(token.next_accumulator_version),
163 Some(token.next_transaction_idx),
164 Some(token.next_event_idx),
165 )
166 } else {
167 (start, None, None, None)
168 };
169
170 let iter = indexes
171 .authenticated_event_iter(
172 stream_addr,
173 actual_start,
174 start_accumulator_version,
175 start_transaction_idx,
176 start_event_idx,
177 highest_indexed,
178 page_size + 1,
179 )
180 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
181
182 let mut events = Vec::new();
183 let mut size_bytes = 0;
184 let mut next_page_token = None;
185
186 for (i, event_result) in iter.enumerate() {
187 let (cp, accumulator_version, transaction_idx, event_idx, ev) =
188 event_result.map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
189
190 if i >= page_size as usize {
191 next_page_token = Some(encode_page_token(PageToken {
192 stream_id: stream_addr,
193 next_checkpoint: cp,
194 next_accumulator_version: accumulator_version,
195 next_transaction_idx: transaction_idx,
196 next_event_idx: event_idx,
197 }));
198 break;
199 }
200
201 let authenticated_event = to_authenticated_event(
202 &stream_id,
203 cp,
204 accumulator_version,
205 transaction_idx,
206 event_idx,
207 &ev,
208 );
209 let event_size = authenticated_event.encoded_len();
210
211 if i > 0 && size_bytes + event_size > MAX_PAGE_SIZE_BYTES {
212 next_page_token = Some(encode_page_token(PageToken {
213 stream_id: stream_addr,
214 next_checkpoint: cp,
215 next_accumulator_version: accumulator_version,
216 next_transaction_idx: transaction_idx,
217 next_event_idx: event_idx,
218 }));
219 break;
220 }
221
222 size_bytes += event_size;
223 events.push(authenticated_event);
224 }
225
226 let mut response = ListAuthenticatedEventsResponse::default();
227 response.events = events;
228 response.highest_indexed_checkpoint = Some(highest_indexed);
229 response.next_page_token = next_page_token.map(|t| t.into());
230 Ok(response)
231}