sui_rpc/light_client/events/
client.rs1use std::time::Duration;
5
6use futures::Stream;
7use futures::StreamExt;
8use sui_sdk_types::framework::EventStreamHead;
9use sui_sdk_types::framework::derive_event_stream_head_object_id;
10use tokio::sync::mpsc;
11
12use super::config::AuthenticatedEventsConfig;
13use super::envelope::AuthenticatedEvent;
14use super::state::StreamState;
15use super::state::buffer_response_batch;
16use super::state::extract_event_stream_head;
17use super::state::fold_and_reconcile;
18use crate::light_client::CheckpointObjectProof;
19use crate::light_client::LightClient;
20use crate::light_client::error::LightClientError;
21use crate::proto::sui::rpc::v2alpha::AffectedObjectFilter;
22use crate::proto::sui::rpc::v2alpha::EventFilter;
23use crate::proto::sui::rpc::v2alpha::EventLiteral;
24use crate::proto::sui::rpc::v2alpha::EventPredicate;
25use crate::proto::sui::rpc::v2alpha::EventStreamHeadFilter;
26use crate::proto::sui::rpc::v2alpha::EventTerm;
27use crate::proto::sui::rpc::v2alpha::ListEventsRequest;
28use crate::proto::sui::rpc::v2alpha::ListTransactionsRequest;
29use crate::proto::sui::rpc::v2alpha::QueryEndReason;
30use crate::proto::sui::rpc::v2alpha::QueryOptions;
31use crate::proto::sui::rpc::v2alpha::TransactionFilter;
32use crate::proto::sui::rpc::v2alpha::TransactionLiteral;
33use crate::proto::sui::rpc::v2alpha::TransactionPredicate;
34use crate::proto::sui::rpc::v2alpha::TransactionTerm;
35use crate::proto::sui::rpc::v2alpha::event_literal;
36use crate::proto::sui::rpc::v2alpha::event_predicate;
37use crate::proto::sui::rpc::v2alpha::list_events_response;
38use crate::proto::sui::rpc::v2alpha::list_transactions_response;
39use crate::proto::sui::rpc::v2alpha::transaction_literal;
40use crate::proto::sui::rpc::v2alpha::transaction_predicate;
41
42pub struct AuthenticatedEventsClient {
55 light: LightClient,
56 config: AuthenticatedEventsConfig,
57}
58
59impl AuthenticatedEventsClient {
60 pub fn new(light: LightClient, config: AuthenticatedEventsConfig) -> Self {
62 Self { light, config }
63 }
64
65 pub fn stream(
73 self,
74 ) -> impl Stream<Item = Result<AuthenticatedEvent, LightClientError>> + Send + 'static {
75 let (tx, rx) = mpsc::channel(self.config.channel_capacity);
76 tokio::spawn(run_stream_task(self.light, self.config, tx));
77 futures::stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|v| (v, rx)) })
78 }
79}
80
81async fn run_stream_task(
98 mut light: LightClient,
99 config: AuthenticatedEventsConfig,
100 tx: mpsc::Sender<Result<AuthenticatedEvent, LightClientError>>,
101) {
102 let start = match initial_state(&mut light, &config).await {
105 Ok(start) => start,
106 Err(e) => {
107 let _ = tx.send(Err(e)).await;
108 return;
109 }
110 };
111
112 let mut state = StreamState::new(start.initial_head, start.start_checkpoint);
113 let mut next_checkpoint = start
114 .start_checkpoint
115 .checked_add(1)
116 .unwrap_or(start.start_checkpoint);
117 let mut next_cursor: Option<prost::bytes::Bytes> = None;
118 let mut consecutive_failures = 0u32;
119 let mut last_head_check = std::time::Instant::now();
120
121 let stream_head_object_id = derive_event_stream_head_object_id(config.stream_id);
122 let filter = build_filter(&config.stream_id.to_string());
123
124 loop {
125 let should_reconcile = last_head_check.elapsed() >= config.head_check_interval
132 || !state.buffer.is_empty()
133 && next_cursor.is_none()
134 && page_drain_done(&state, next_checkpoint);
135
136 if should_reconcile {
137 match reconcile_once(&mut light, &mut state, &stream_head_object_id, &config).await {
138 Ok(released) => {
139 consecutive_failures = 0;
140 last_head_check = std::time::Instant::now();
141 for ev in released {
142 if tx.send(Ok(ev)).await.is_err() {
143 return;
144 }
145 }
146 }
147 Err(e) if e_is_retryable(&e) => {
148 if !backoff_or_give_up(&tx, &config, &mut consecutive_failures, e).await {
149 return;
150 }
151 }
152 Err(e) => {
153 let _ = tx.send(Err(e)).await;
154 return;
155 }
156 }
157 continue;
158 }
159
160 let request = ListEventsRequest {
162 read_mask: None,
163 start_checkpoint: Some(next_checkpoint),
164 end_checkpoint: None,
165 filter: Some(filter.clone()),
166 options: Some(QueryOptions {
167 limit_items: Some(config.page_size),
168 after: next_cursor.clone(),
169 before: None,
170 ordering: 0, }),
172 };
173
174 match fetch_one_page(&mut light, request).await {
175 Ok(page) => {
176 let PageResult {
177 events,
178 end_cursor,
179 end_reason,
180 watermark_hi,
181 partial_error,
182 } = page;
183
184 buffer_response_batch(&mut state, events, watermark_hi);
185
186 if let Some(err) = partial_error {
196 if e_is_retryable(&err) {
197 next_cursor = end_cursor;
198 if !backoff_or_give_up(&tx, &config, &mut consecutive_failures, err).await {
199 return;
200 }
201 } else {
202 let _ = tx.send(Err(err)).await;
203 return;
204 }
205 continue;
206 }
207
208 consecutive_failures = 0;
209 match end_reason {
210 Some(QueryEndReason::ItemLimit | QueryEndReason::ScanLimit) => {
217 next_cursor = end_cursor;
218 }
219 Some(_) => {
220 next_cursor = None;
231 next_checkpoint = state
232 .events_scanned_through
233 .checked_add(1)
234 .unwrap_or(next_checkpoint)
235 .max(next_checkpoint);
236 tokio::time::sleep(config.retry_backoff).await;
238 }
239 None => {
240 next_cursor = end_cursor;
246 tokio::time::sleep(config.retry_backoff).await;
247 }
248 }
249 }
250 Err(e) if e_is_retryable(&e) => {
251 if !backoff_or_give_up(&tx, &config, &mut consecutive_failures, e).await {
252 return;
253 }
254 }
255 Err(e) => {
256 let _ = tx.send(Err(e)).await;
257 return;
258 }
259 }
260 }
261}
262
263struct InitialState {
264 initial_head: EventStreamHead,
265 start_checkpoint: u64,
266}
267
268async fn initial_state(
285 light: &mut LightClient,
286 config: &AuthenticatedEventsConfig,
287) -> Result<InitialState, LightClientError> {
288 let latest_tip = light.latest_checkpoint_seq().await?;
289 let stream_head_object_id = derive_event_stream_head_object_id(config.stream_id);
290
291 let proof = light
292 .prove_object_at_checkpoint(&stream_head_object_id, latest_tip)
293 .await?;
294
295 let (initial_head, start_checkpoint) = match proof {
296 CheckpointObjectProof::Inclusion {
297 object: Some(object),
298 ..
299 } => {
300 let head = extract_event_stream_head(&object)?;
301 let cp = head.checkpoint_seq;
302 (head, cp)
303 }
304 CheckpointObjectProof::Inclusion { object: None, .. } => {
305 return Err(LightClientError::UnexpectedObjectShape {
310 reason: "event stream head was deleted or wrapped at the initial tip",
311 });
312 }
313 CheckpointObjectProof::NonInclusion => (
314 EventStreamHead::default(),
315 config.start_checkpoint.unwrap_or(latest_tip),
316 ),
317 };
318
319 let start_checkpoint = config.start_checkpoint.unwrap_or(start_checkpoint);
320 Ok(InitialState {
321 initial_head,
322 start_checkpoint,
323 })
324}
325
326struct PageResult {
327 events: Vec<AuthenticatedEvent>,
328 end_cursor: Option<prost::bytes::Bytes>,
333 end_reason: Option<QueryEndReason>,
334 watermark_hi: Option<u64>,
339 partial_error: Option<LightClientError>,
347}
348
349async fn fetch_one_page(
350 light: &mut LightClient,
351 request: ListEventsRequest,
352) -> Result<PageResult, LightClientError> {
353 let mut stream = light
354 .rpc()
355 .ledger_client_alpha()
356 .list_events(request)
357 .await?
358 .into_inner();
359
360 let mut events = Vec::new();
361 let mut end_cursor: Option<prost::bytes::Bytes> = None;
362 let mut end_reason = None;
363 let mut watermark_hi: Option<u64> = None;
364 let mut partial_error: Option<LightClientError> = None;
365
366 while let Some(frame) = stream.next().await {
367 let frame = match frame {
368 Ok(f) => f,
369 Err(status) => {
370 partial_error = Some(status.into());
371 break;
372 }
373 };
374 match frame.response {
375 Some(list_events_response::Response::Item(item)) => {
376 if let Some(w) = item.watermark.as_ref() {
377 if let Some(c) = w.cursor.clone() {
378 end_cursor = Some(c);
379 }
380 if let Some(hi) = w.checkpoint_hi {
381 watermark_hi = Some(watermark_hi.map_or(hi, |prev| prev.max(hi)));
382 }
383 }
384 let ev = AuthenticatedEvent::try_from(&item)?;
385 events.push(ev);
386 }
387 Some(list_events_response::Response::Watermark(w)) => {
388 if let Some(c) = w.cursor {
389 end_cursor = Some(c);
390 }
391 if let Some(hi) = w.checkpoint_hi {
392 watermark_hi = Some(watermark_hi.map_or(hi, |prev| prev.max(hi)));
393 }
394 }
395 Some(list_events_response::Response::End(end)) => {
396 end_reason = QueryEndReason::try_from(end.reason).ok();
397 break;
398 }
399 None => break,
400 }
401 }
402
403 Ok(PageResult {
404 events,
405 end_cursor,
406 end_reason,
407 watermark_hi,
408 partial_error,
409 })
410}
411
412async fn reconcile_once(
422 light: &mut LightClient,
423 state: &mut StreamState,
424 stream_head_object_id: &sui_sdk_types::Address,
425 config: &AuthenticatedEventsConfig,
426) -> Result<Vec<AuthenticatedEvent>, LightClientError> {
427 let settlement_upper_inclusive = state.events_scanned_through;
431 if settlement_upper_inclusive <= state.confirmed_through {
432 return Ok(Vec::new());
433 }
434
435 let settlements = fetch_settlements_for_range(
436 light,
437 stream_head_object_id,
438 state.confirmed_through.saturating_add(1),
439 settlement_upper_inclusive.saturating_add(1),
440 config.page_size,
441 )
442 .await?;
443
444 let Some((reconcile_cp, _)) = settlements.last().copied() else {
445 return Ok(Vec::new());
450 };
451
452 let proof = light
453 .prove_object_at_checkpoint(stream_head_object_id, reconcile_cp)
454 .await?;
455 let chain_head = match proof {
456 CheckpointObjectProof::Inclusion {
457 object: Some(object),
458 ..
459 } => extract_event_stream_head(&object)?,
460 CheckpointObjectProof::Inclusion { object: None, .. } => {
461 return Err(LightClientError::UnexpectedObjectShape {
466 reason: "event stream head was deleted or wrapped at the reconciliation tip",
467 });
468 }
469 CheckpointObjectProof::NonInclusion => {
470 return Err(LightClientError::UnexpectedObjectShape {
475 reason: "settlement transaction listed at checkpoint but OCS proof reports \
476 the event stream head was not modified",
477 });
478 }
479 };
480
481 fold_and_reconcile(state, &settlements, chain_head, reconcile_cp)
482}
483
484async fn fetch_settlements_for_range(
494 light: &mut LightClient,
495 stream_head_object_id: &sui_sdk_types::Address,
496 start_checkpoint: u64,
497 end_checkpoint_exclusive: u64,
498 page_size: u32,
499) -> Result<Vec<(u64, u64)>, LightClientError> {
500 if end_checkpoint_exclusive <= start_checkpoint {
501 return Ok(Vec::new());
502 }
503
504 let filter = build_affected_object_filter(stream_head_object_id);
505 let mut settlements: Vec<(u64, u64)> = Vec::new();
506 let mut cursor: Option<prost::bytes::Bytes> = None;
507
508 loop {
509 let request = ListTransactionsRequest {
510 read_mask: None,
511 start_checkpoint: Some(start_checkpoint),
512 end_checkpoint: Some(end_checkpoint_exclusive),
513 filter: Some(filter.clone()),
514 options: Some(QueryOptions {
515 limit_items: Some(page_size),
516 after: cursor.clone(),
517 before: None,
518 ordering: 0, }),
520 };
521
522 let page = fetch_settlements_page(light, request).await?;
523 settlements.extend(page.entries);
524
525 match page.end_reason {
526 Some(QueryEndReason::ItemLimit | QueryEndReason::ScanLimit) => {
529 if page.end_cursor.is_none() {
530 break;
534 }
535 cursor = page.end_cursor;
536 }
537 _ => break,
541 }
542 }
543
544 Ok(settlements)
545}
546
547struct SettlementsPage {
548 entries: Vec<(u64, u64)>,
549 end_cursor: Option<prost::bytes::Bytes>,
550 end_reason: Option<QueryEndReason>,
551}
552
553async fn fetch_settlements_page(
554 light: &mut LightClient,
555 request: ListTransactionsRequest,
556) -> Result<SettlementsPage, LightClientError> {
557 let mut stream = light
558 .rpc()
559 .ledger_client_alpha()
560 .list_transactions(request)
561 .await?
562 .into_inner();
563
564 let mut entries = Vec::new();
565 let mut end_cursor: Option<prost::bytes::Bytes> = None;
566 let mut end_reason = None;
567
568 while let Some(frame) = stream.next().await {
569 let frame = frame?;
570 match frame.response {
571 Some(list_transactions_response::Response::Item(item)) => {
572 if let Some(c) = item.watermark.as_ref().and_then(|w| w.cursor.clone()) {
573 end_cursor = Some(c);
574 }
575 let checkpoint = item
576 .transaction
577 .as_ref()
578 .and_then(|tx| tx.checkpoint)
579 .ok_or(LightClientError::UnexpectedObjectShape {
580 reason: "settlement transaction missing checkpoint",
581 })?;
582 let tx_offset =
583 item.transaction_offset
584 .ok_or(LightClientError::UnexpectedObjectShape {
585 reason: "settlement transaction missing transaction_offset",
586 })?;
587 entries.push((checkpoint, tx_offset));
588 }
589 Some(list_transactions_response::Response::Watermark(w)) => {
590 if let Some(c) = w.cursor {
591 end_cursor = Some(c);
592 }
593 }
594 Some(list_transactions_response::Response::End(end)) => {
595 end_reason = QueryEndReason::try_from(end.reason).ok();
596 break;
597 }
598 None => break,
599 }
600 }
601
602 Ok(SettlementsPage {
603 entries,
604 end_cursor,
605 end_reason,
606 })
607}
608
609fn build_filter(stream_id_hex: &str) -> EventFilter {
610 EventFilter {
611 terms: vec![EventTerm {
612 literals: vec![EventLiteral {
613 polarity: Some(event_literal::Polarity::Include(EventPredicate {
614 predicate: Some(event_predicate::Predicate::EventStreamHead(
615 EventStreamHeadFilter {
616 stream_id: Some(stream_id_hex.to_string()),
617 },
618 )),
619 })),
620 }],
621 }],
622 }
623}
624
625fn build_affected_object_filter(object_id: &sui_sdk_types::Address) -> TransactionFilter {
626 TransactionFilter {
627 terms: vec![TransactionTerm {
628 literals: vec![TransactionLiteral {
629 polarity: Some(transaction_literal::Polarity::Include(
630 TransactionPredicate {
631 predicate: Some(transaction_predicate::Predicate::AffectedObject(
632 AffectedObjectFilter {
633 object_id: Some(object_id.to_string()),
634 },
635 )),
636 },
637 )),
638 }],
639 }],
640 }
641}
642
643fn page_drain_done(state: &StreamState, next_checkpoint: u64) -> bool {
646 state.events_scanned_through.saturating_add(1) >= next_checkpoint
647}
648
649fn e_is_retryable(err: &LightClientError) -> bool {
650 matches!(
651 err,
652 LightClientError::Rpc(status)
653 if matches!(
654 status.code(),
655 tonic::Code::Unavailable
656 | tonic::Code::DeadlineExceeded
657 | tonic::Code::ResourceExhausted
658 | tonic::Code::Aborted
659 )
660 )
661}
662
663async fn backoff_or_give_up(
668 tx: &mpsc::Sender<Result<AuthenticatedEvent, LightClientError>>,
669 config: &AuthenticatedEventsConfig,
670 consecutive_failures: &mut u32,
671 err: LightClientError,
672) -> bool {
673 *consecutive_failures += 1;
674 if *consecutive_failures > config.max_connect_retries {
675 let _ = tx.send(Err(err)).await;
676 return false;
677 }
678 let base = config.retry_backoff.saturating_mul(*consecutive_failures);
679 let jitter = pseudo_jitter(*consecutive_failures, config.retry_jitter);
680 tokio::time::sleep(base.saturating_add(jitter)).await;
681 true
682}
683
684fn pseudo_jitter(attempts: u32, ceiling: Duration) -> Duration {
689 if ceiling.is_zero() {
690 return Duration::ZERO;
691 }
692 let mix = (u64::from(attempts).wrapping_mul(0x9E3779B97F4A7C15)) as u128;
694 let ceiling_ms = ceiling.as_millis().max(1);
695 let offset_ms = (mix % ceiling_ms) as u64;
696 Duration::from_millis(offset_ms)
697}