Skip to main content

sui_rpc/light_client/events/
client.rs

1//! `AuthenticatedEventsClient` — the public surface for streaming
2//! authenticated events.
3
4use std::time::Duration;
5
6use futures::Stream;
7use futures::StreamExt;
8use sui_sdk_types::framework::EventStreamHead;
9use sui_sdk_types::framework::derive_event_stream_head_object_id;
10use tokio::sync::mpsc;
11
12use super::config::AuthenticatedEventsConfig;
13use super::envelope::AuthenticatedEvent;
14use super::state::StreamState;
15use super::state::buffer_response_batch;
16use super::state::extract_event_stream_head;
17use super::state::fold_and_reconcile;
18use crate::light_client::CheckpointObjectProof;
19use crate::light_client::LightClient;
20use crate::light_client::error::LightClientError;
21use crate::proto::sui::rpc::v2alpha::AffectedObjectFilter;
22use crate::proto::sui::rpc::v2alpha::EventFilter;
23use crate::proto::sui::rpc::v2alpha::EventLiteral;
24use crate::proto::sui::rpc::v2alpha::EventPredicate;
25use crate::proto::sui::rpc::v2alpha::EventStreamHeadFilter;
26use crate::proto::sui::rpc::v2alpha::EventTerm;
27use crate::proto::sui::rpc::v2alpha::ListEventsRequest;
28use crate::proto::sui::rpc::v2alpha::ListTransactionsRequest;
29use crate::proto::sui::rpc::v2alpha::QueryEndReason;
30use crate::proto::sui::rpc::v2alpha::QueryOptions;
31use crate::proto::sui::rpc::v2alpha::TransactionFilter;
32use crate::proto::sui::rpc::v2alpha::TransactionLiteral;
33use crate::proto::sui::rpc::v2alpha::TransactionPredicate;
34use crate::proto::sui::rpc::v2alpha::TransactionTerm;
35use crate::proto::sui::rpc::v2alpha::event_literal;
36use crate::proto::sui::rpc::v2alpha::event_predicate;
37use crate::proto::sui::rpc::v2alpha::list_events_response;
38use crate::proto::sui::rpc::v2alpha::list_transactions_response;
39use crate::proto::sui::rpc::v2alpha::transaction_literal;
40use crate::proto::sui::rpc::v2alpha::transaction_predicate;
41
42/// A streaming verifier for a single authenticated event stream.
43///
44/// Construct with [`AuthenticatedEventsClient::new`], passing a
45/// configured [`LightClient`] and an [`AuthenticatedEventsConfig`].
46/// Call [`Self::stream`] to spawn the background verifier task and
47/// receive an async stream of cryptographically-authenticated events.
48///
49/// Once [`Self::stream`] is called the client is consumed. The
50/// returned stream pulls events from the spawned task via a bounded
51/// channel; events are yielded only after the periodic reconciliation
52/// confirms the local MMR matches the on-chain
53/// [`EventStreamHead`](sui_sdk_types::framework::EventStreamHead).
54pub struct AuthenticatedEventsClient {
55    light: LightClient,
56    config: AuthenticatedEventsConfig,
57}
58
59impl AuthenticatedEventsClient {
60    /// Construct a new streaming client.
61    pub fn new(light: LightClient, config: AuthenticatedEventsConfig) -> Self {
62        Self { light, config }
63    }
64
65    /// Spawn the verifier task and return a stream of authenticated
66    /// events.
67    ///
68    /// The stream terminates when the spawned task exits — either
69    /// because the consumer dropped the receiver, or because the task
70    /// hit an unrecoverable error (the error is yielded as the last
71    /// item before termination).
72    pub fn stream(
73        self,
74    ) -> impl Stream<Item = Result<AuthenticatedEvent, LightClientError>> + Send + 'static {
75        let (tx, rx) = mpsc::channel(self.config.channel_capacity);
76        tokio::spawn(run_stream_task(self.light, self.config, tx));
77        futures::stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|v| (v, rx)) })
78    }
79}
80
81/// Driver loop for a single stream subscription.
82///
83/// On startup: fetch the latest checkpoint as the race-free floor,
84/// then fetch the on-chain stream head. If the head exists, resume
85/// from `head.checkpoint + 1`; otherwise start from
86/// `latest_checkpoint + 1`. Either way the floor ensures no event can
87/// land in the interval without being picked up by the head check.
88///
89/// Steady state: page through `ListEvents` and buffer the items, then at
90/// each [`AuthenticatedEventsConfig::head_check_interval`] tick fetch
91/// the settlement boundaries for the unconfirmed range via
92/// `ListTransactions(affected_object = event_stream_head)`, fold the
93/// buffered events into the local MMR partitioned by settlement, and
94/// reconcile the resulting head against the on-chain head proven at the
95/// last settled checkpoint. On match, drain the confirmed events
96/// through the channel. On mismatch, send the error and exit.
97async fn run_stream_task(
98    mut light: LightClient,
99    config: AuthenticatedEventsConfig,
100    tx: mpsc::Sender<Result<AuthenticatedEvent, LightClientError>>,
101) {
102    // The two startup helpers can fail; route the error through the
103    // channel and exit cleanly so the consumer sees one final item.
104    let start = match initial_state(&mut light, &config).await {
105        Ok(start) => start,
106        Err(e) => {
107            let _ = tx.send(Err(e)).await;
108            return;
109        }
110    };
111
112    let mut state = StreamState::new(start.initial_head, start.start_checkpoint);
113    let mut next_checkpoint = start
114        .start_checkpoint
115        .checked_add(1)
116        .unwrap_or(start.start_checkpoint);
117    let mut next_cursor: Option<prost::bytes::Bytes> = None;
118    let mut consecutive_failures = 0u32;
119    let mut last_head_check = std::time::Instant::now();
120
121    let stream_head_object_id = derive_event_stream_head_object_id(config.stream_id);
122    let filter = build_filter(&config.stream_id.to_string());
123
124    loop {
125        // Decide whether to fetch the next page or reconcile.
126        //
127        // The "idle" condition (non-empty buffer, no cursor, drained
128        // through the next checkpoint) lets us reconcile as soon as
129        // we've drawn level with the indexed tip rather than waiting
130        // out the full interval.
131        let should_reconcile = last_head_check.elapsed() >= config.head_check_interval
132            || !state.buffer.is_empty()
133                && next_cursor.is_none()
134                && page_drain_done(&state, next_checkpoint);
135
136        if should_reconcile {
137            match reconcile_once(&mut light, &mut state, &stream_head_object_id, &config).await {
138                Ok(released) => {
139                    consecutive_failures = 0;
140                    last_head_check = std::time::Instant::now();
141                    for ev in released {
142                        if tx.send(Ok(ev)).await.is_err() {
143                            return;
144                        }
145                    }
146                }
147                Err(e) if e_is_retryable(&e) => {
148                    if !backoff_or_give_up(&tx, &config, &mut consecutive_failures, e).await {
149                        return;
150                    }
151                }
152                Err(e) => {
153                    let _ = tx.send(Err(e)).await;
154                    return;
155                }
156            }
157            continue;
158        }
159
160        // Fetch the next page of events.
161        let request = ListEventsRequest {
162            read_mask: None,
163            start_checkpoint: Some(next_checkpoint),
164            end_checkpoint: None,
165            filter: Some(filter.clone()),
166            options: Some(QueryOptions {
167                limit_items: Some(config.page_size),
168                after: next_cursor.clone(),
169                before: None,
170                ordering: 0, // ascending (default)
171            }),
172        };
173
174        match fetch_one_page(&mut light, request).await {
175            Ok(page) => {
176                let PageResult {
177                    events,
178                    end_cursor,
179                    end_reason,
180                    watermark_hi,
181                    partial_error,
182                } = page;
183
184                buffer_response_batch(&mut state, events, watermark_hi);
185
186                // Mid-stream transport error: commit whatever items we
187                // got into the buffer (done above), advance
188                // `next_cursor` to the latest watermark the server
189                // sent, then backoff-or-give-up the same way a
190                // pre-stream error would. This is the fix for the
191                // resumption-on-timeout case: without it, repeated
192                // server-side timeouts would each lose the cursor
193                // accumulated during the failed page and the loop
194                // would replay the same stale position forever.
195                if let Some(err) = partial_error {
196                    if e_is_retryable(&err) {
197                        next_cursor = end_cursor;
198                        if !backoff_or_give_up(&tx, &config, &mut consecutive_failures, err).await {
199                            return;
200                        }
201                    } else {
202                        let _ = tx.send(Err(err)).await;
203                        return;
204                    }
205                    continue;
206                }
207
208                consecutive_failures = 0;
209                match end_reason {
210                    // The server stopped mid-range with unscanned work
211                    // remaining — resume from the latest in-stream
212                    // cursor (item or watermark) so we don't skip the
213                    // unscanned tail. Same shape as ItemLimit: keep
214                    // advancing the cursor, no checkpoint bump, no
215                    // backoff sleep.
216                    Some(QueryEndReason::ItemLimit | QueryEndReason::ScanLimit) => {
217                        next_cursor = end_cursor;
218                    }
219                    Some(_) => {
220                        // Server reached the indexed tip, a requested
221                        // checkpoint range bound, or a cursor bound —
222                        // no more events available in this scan. Reset
223                        // cursor and bump start checkpoint to one past
224                        // the watermark / last buffered event so the
225                        // next page picks up new events as they
226                        // arrive. We can't use `local_head.checkpoint_seq`
227                        // here because folding is deferred to
228                        // reconciliation — it can lag the scan by an
229                        // entire interval.
230                        next_cursor = None;
231                        next_checkpoint = state
232                            .events_scanned_through
233                            .checked_add(1)
234                            .unwrap_or(next_checkpoint)
235                            .max(next_checkpoint);
236                        // Sleep briefly so we don't spin when at the tip.
237                        tokio::time::sleep(config.retry_backoff).await;
238                    }
239                    None => {
240                        // Stream ended cleanly (no transport error) but
241                        // without an `End` frame. This shouldn't happen
242                        // in normal operation; preserve the latest
243                        // cursor and back off briefly rather than
244                        // assume the scan finished and skip ahead.
245                        next_cursor = end_cursor;
246                        tokio::time::sleep(config.retry_backoff).await;
247                    }
248                }
249            }
250            Err(e) if e_is_retryable(&e) => {
251                if !backoff_or_give_up(&tx, &config, &mut consecutive_failures, e).await {
252                    return;
253                }
254            }
255            Err(e) => {
256                let _ = tx.send(Err(e)).await;
257                return;
258            }
259        }
260    }
261}
262
263struct InitialState {
264    initial_head: EventStreamHead,
265    start_checkpoint: u64,
266}
267
268/// Establish the starting position. Fetch the latest checkpoint
269/// *before* the head so that if an event lands in between, the head
270/// fetch observes it and we resume from `head.checkpoint + 1` rather
271/// than the now-stale tip.
272///
273/// Known limitation: an `EventStreamHead` object that exists on chain
274/// but was *not modified* in the checkpoint we query produces an
275/// authenticated `NonInclusion` result, which this function treats the
276/// same as "no head yet" — it falls back to the tip. The OCS-per-
277/// checkpoint commitment scheme attests only to objects modified at a
278/// given checkpoint, not to the contents of unmodified objects, so we
279/// cannot distinguish the two cases through this API alone. In
280/// practice this is rarely an issue because new events on a stream
281/// modify its head, so a head that hasn't been touched at the tip
282/// means the stream has been quiet — and starting from the tip is the
283/// correct resume point in both cases.
284async fn initial_state(
285    light: &mut LightClient,
286    config: &AuthenticatedEventsConfig,
287) -> Result<InitialState, LightClientError> {
288    let latest_tip = light.latest_checkpoint_seq().await?;
289    let stream_head_object_id = derive_event_stream_head_object_id(config.stream_id);
290
291    let proof = light
292        .prove_object_at_checkpoint(&stream_head_object_id, latest_tip)
293        .await?;
294
295    let (initial_head, start_checkpoint) = match proof {
296        CheckpointObjectProof::Inclusion {
297            object: Some(object),
298            ..
299        } => {
300            let head = extract_event_stream_head(&object)?;
301            let cp = head.checkpoint_seq;
302            (head, cp)
303        }
304        CheckpointObjectProof::Inclusion { object: None, .. } => {
305            // The head object was deleted or wrapped at the tip.
306            // For an authenticated event stream this is unrecoverable
307            // — the on-chain anchor is gone, so no future reconciliation
308            // can succeed.
309            return Err(LightClientError::UnexpectedObjectShape {
310                reason: "event stream head was deleted or wrapped at the initial tip",
311            });
312        }
313        CheckpointObjectProof::NonInclusion => (
314            EventStreamHead::default(),
315            config.start_checkpoint.unwrap_or(latest_tip),
316        ),
317    };
318
319    let start_checkpoint = config.start_checkpoint.unwrap_or(start_checkpoint);
320    Ok(InitialState {
321        initial_head,
322        start_checkpoint,
323    })
324}
325
326struct PageResult {
327    events: Vec<AuthenticatedEvent>,
328    /// Latest opaque cursor delivered during the page — most recent
329    /// `Watermark.cursor` or per-item cursor, whichever arrived later in
330    /// stream order. This is the safe resume point on any termination:
331    /// the server no longer carries a cursor on `QueryEnd`.
332    end_cursor: Option<prost::bytes::Bytes>,
333    end_reason: Option<QueryEndReason>,
334    /// Most recent `Watermark.checkpoint_hi` observed in this page,
335    /// across both standalone watermarks and per-item watermarks. The
336    /// streaming state uses this as the "events scanned through" floor
337    /// when picking the settlement-fetch range.
338    watermark_hi: Option<u64>,
339    /// Mid-stream transport error that interrupted page accumulation.
340    /// When present, `events` and `end_cursor` reflect what was
341    /// received before the error. The caller advances `next_cursor` to
342    /// `end_cursor` before dispatching retry vs. terminal — without
343    /// this, propagating the error via `?` would discard the cursor
344    /// progress and the retry loop would replay the same stale
345    /// position on every server-side timeout.
346    partial_error: Option<LightClientError>,
347}
348
349async fn fetch_one_page(
350    light: &mut LightClient,
351    request: ListEventsRequest,
352) -> Result<PageResult, LightClientError> {
353    let mut stream = light
354        .rpc()
355        .ledger_client_alpha()
356        .list_events(request)
357        .await?
358        .into_inner();
359
360    let mut events = Vec::new();
361    let mut end_cursor: Option<prost::bytes::Bytes> = None;
362    let mut end_reason = None;
363    let mut watermark_hi: Option<u64> = None;
364    let mut partial_error: Option<LightClientError> = None;
365
366    while let Some(frame) = stream.next().await {
367        let frame = match frame {
368            Ok(f) => f,
369            Err(status) => {
370                partial_error = Some(status.into());
371                break;
372            }
373        };
374        match frame.response {
375            Some(list_events_response::Response::Item(item)) => {
376                if let Some(w) = item.watermark.as_ref() {
377                    if let Some(c) = w.cursor.clone() {
378                        end_cursor = Some(c);
379                    }
380                    if let Some(hi) = w.checkpoint_hi {
381                        watermark_hi = Some(watermark_hi.map_or(hi, |prev| prev.max(hi)));
382                    }
383                }
384                let ev = AuthenticatedEvent::try_from(&item)?;
385                events.push(ev);
386            }
387            Some(list_events_response::Response::Watermark(w)) => {
388                if let Some(c) = w.cursor {
389                    end_cursor = Some(c);
390                }
391                if let Some(hi) = w.checkpoint_hi {
392                    watermark_hi = Some(watermark_hi.map_or(hi, |prev| prev.max(hi)));
393                }
394            }
395            Some(list_events_response::Response::End(end)) => {
396                end_reason = QueryEndReason::try_from(end.reason).ok();
397                break;
398            }
399            None => break,
400        }
401    }
402
403    Ok(PageResult {
404        events,
405        end_cursor,
406        end_reason,
407        watermark_hi,
408        partial_error,
409    })
410}
411
412/// Drive one reconciliation tick: fetch settlements for the unconfirmed
413/// range, fold the buffered events through the latest settled
414/// checkpoint into the local MMR, prove the on-chain head at that
415/// checkpoint, and release the folded events on a match.
416///
417/// Returns the released events on success. Returns `Ok(Vec::new())`
418/// when there's nothing to reconcile yet (the head wasn't modified
419/// anywhere in the unconfirmed range, so there's no chain anchor to
420/// compare against).
421async fn reconcile_once(
422    light: &mut LightClient,
423    state: &mut StreamState,
424    stream_head_object_id: &sui_sdk_types::Address,
425    config: &AuthenticatedEventsConfig,
426) -> Result<Vec<AuthenticatedEvent>, LightClientError> {
427    // Settlements only happen up through events we've fully scanned —
428    // querying past `events_scanned_through` risks partitioning into a
429    // settlement bucket whose events haven't all been buffered yet.
430    let settlement_upper_inclusive = state.events_scanned_through;
431    if settlement_upper_inclusive <= state.confirmed_through {
432        return Ok(Vec::new());
433    }
434
435    let settlements = fetch_settlements_for_range(
436        light,
437        stream_head_object_id,
438        state.confirmed_through.saturating_add(1),
439        settlement_upper_inclusive.saturating_add(1),
440        config.page_size,
441    )
442    .await?;
443
444    let Some((reconcile_cp, _)) = settlements.last().copied() else {
445        // No settlement in the unconfirmed range — the head wasn't
446        // modified, so there's no anchor to reconcile against. Pending
447        // events stay buffered; the next reconciliation tick will
448        // retry once a settlement lands.
449        return Ok(Vec::new());
450    };
451
452    let proof = light
453        .prove_object_at_checkpoint(stream_head_object_id, reconcile_cp)
454        .await?;
455    let chain_head = match proof {
456        CheckpointObjectProof::Inclusion {
457            object: Some(object),
458            ..
459        } => extract_event_stream_head(&object)?,
460        CheckpointObjectProof::Inclusion { object: None, .. } => {
461            // The on-chain head was deleted or wrapped at the
462            // settlement checkpoint. The local replay cannot be
463            // reconciled against a missing head; this is terminal for
464            // the stream.
465            return Err(LightClientError::UnexpectedObjectShape {
466                reason: "event stream head was deleted or wrapped at the reconciliation tip",
467            });
468        }
469        CheckpointObjectProof::NonInclusion => {
470            // `ListTransactions(affected_object)` placed a settlement
471            // at this checkpoint, but the OCS proof says the head
472            // wasn't modified there — the two indexes are
473            // inconsistent.
474            return Err(LightClientError::UnexpectedObjectShape {
475                reason: "settlement transaction listed at checkpoint but OCS proof reports \
476                         the event stream head was not modified",
477            });
478        }
479    };
480
481    fold_and_reconcile(state, &settlements, chain_head, reconcile_cp)
482}
483
484/// Page through `ListTransactions` filtered on `affected_object =
485/// stream_head_object_id` and return the ascending `(checkpoint,
486/// transaction_offset)` settlement boundaries for `[start, end)`.
487///
488/// Each `settle_events` transaction mutates the stream's head object,
489/// so this filter returns exactly the per-stream settlement boundaries.
490/// `start` is inclusive and `end` is exclusive — matching the proto
491/// `start_checkpoint` / `end_checkpoint` semantics — so the caller
492/// passes `confirmed_through + 1` and `events_scanned_through + 1`.
493async fn fetch_settlements_for_range(
494    light: &mut LightClient,
495    stream_head_object_id: &sui_sdk_types::Address,
496    start_checkpoint: u64,
497    end_checkpoint_exclusive: u64,
498    page_size: u32,
499) -> Result<Vec<(u64, u64)>, LightClientError> {
500    if end_checkpoint_exclusive <= start_checkpoint {
501        return Ok(Vec::new());
502    }
503
504    let filter = build_affected_object_filter(stream_head_object_id);
505    let mut settlements: Vec<(u64, u64)> = Vec::new();
506    let mut cursor: Option<prost::bytes::Bytes> = None;
507
508    loop {
509        let request = ListTransactionsRequest {
510            read_mask: None,
511            start_checkpoint: Some(start_checkpoint),
512            end_checkpoint: Some(end_checkpoint_exclusive),
513            filter: Some(filter.clone()),
514            options: Some(QueryOptions {
515                limit_items: Some(page_size),
516                after: cursor.clone(),
517                before: None,
518                ordering: 0, // ascending
519            }),
520        };
521
522        let page = fetch_settlements_page(light, request).await?;
523        settlements.extend(page.entries);
524
525        match page.end_reason {
526            // Server hit its per-request bound but the range may have
527            // more — keep paging from the latest cursor it gave us.
528            Some(QueryEndReason::ItemLimit | QueryEndReason::ScanLimit) => {
529                if page.end_cursor.is_none() {
530                    // Defensive: no cursor advance means we'd loop
531                    // forever. Treat as done; the next reconciliation
532                    // tick will retry with a fresh window.
533                    break;
534                }
535                cursor = page.end_cursor;
536            }
537            // Range fully scanned (CheckpointBound / CursorBound /
538            // LedgerTip / Unspecified) or end frame missing: nothing
539            // more to fetch in this window.
540            _ => break,
541        }
542    }
543
544    Ok(settlements)
545}
546
547struct SettlementsPage {
548    entries: Vec<(u64, u64)>,
549    end_cursor: Option<prost::bytes::Bytes>,
550    end_reason: Option<QueryEndReason>,
551}
552
553async fn fetch_settlements_page(
554    light: &mut LightClient,
555    request: ListTransactionsRequest,
556) -> Result<SettlementsPage, LightClientError> {
557    let mut stream = light
558        .rpc()
559        .ledger_client_alpha()
560        .list_transactions(request)
561        .await?
562        .into_inner();
563
564    let mut entries = Vec::new();
565    let mut end_cursor: Option<prost::bytes::Bytes> = None;
566    let mut end_reason = None;
567
568    while let Some(frame) = stream.next().await {
569        let frame = frame?;
570        match frame.response {
571            Some(list_transactions_response::Response::Item(item)) => {
572                if let Some(c) = item.watermark.as_ref().and_then(|w| w.cursor.clone()) {
573                    end_cursor = Some(c);
574                }
575                let checkpoint = item
576                    .transaction
577                    .as_ref()
578                    .and_then(|tx| tx.checkpoint)
579                    .ok_or(LightClientError::UnexpectedObjectShape {
580                        reason: "settlement transaction missing checkpoint",
581                    })?;
582                let tx_offset =
583                    item.transaction_offset
584                        .ok_or(LightClientError::UnexpectedObjectShape {
585                            reason: "settlement transaction missing transaction_offset",
586                        })?;
587                entries.push((checkpoint, tx_offset));
588            }
589            Some(list_transactions_response::Response::Watermark(w)) => {
590                if let Some(c) = w.cursor {
591                    end_cursor = Some(c);
592                }
593            }
594            Some(list_transactions_response::Response::End(end)) => {
595                end_reason = QueryEndReason::try_from(end.reason).ok();
596                break;
597            }
598            None => break,
599        }
600    }
601
602    Ok(SettlementsPage {
603        entries,
604        end_cursor,
605        end_reason,
606    })
607}
608
609fn build_filter(stream_id_hex: &str) -> EventFilter {
610    EventFilter {
611        terms: vec![EventTerm {
612            literals: vec![EventLiteral {
613                polarity: Some(event_literal::Polarity::Include(EventPredicate {
614                    predicate: Some(event_predicate::Predicate::EventStreamHead(
615                        EventStreamHeadFilter {
616                            stream_id: Some(stream_id_hex.to_string()),
617                        },
618                    )),
619                })),
620            }],
621        }],
622    }
623}
624
625fn build_affected_object_filter(object_id: &sui_sdk_types::Address) -> TransactionFilter {
626    TransactionFilter {
627        terms: vec![TransactionTerm {
628            literals: vec![TransactionLiteral {
629                polarity: Some(transaction_literal::Polarity::Include(
630                    TransactionPredicate {
631                        predicate: Some(transaction_predicate::Predicate::AffectedObject(
632                            AffectedObjectFilter {
633                                object_id: Some(object_id.to_string()),
634                            },
635                        )),
636                    },
637                )),
638            }],
639        }],
640    }
641}
642
643/// True if buffered events were already scanned up through `next_checkpoint`,
644/// so the page-fetch loop is idle and may as well reconcile sooner.
645fn page_drain_done(state: &StreamState, next_checkpoint: u64) -> bool {
646    state.events_scanned_through.saturating_add(1) >= next_checkpoint
647}
648
649fn e_is_retryable(err: &LightClientError) -> bool {
650    matches!(
651        err,
652        LightClientError::Rpc(status)
653            if matches!(
654                status.code(),
655                tonic::Code::Unavailable
656                    | tonic::Code::DeadlineExceeded
657                    | tonic::Code::ResourceExhausted
658                    | tonic::Code::Aborted
659            )
660    )
661}
662
663/// Backoff for `retry_backoff * attempts + jitter`. Returns `true` if
664/// the task should continue, `false` if `max_connect_retries` has been
665/// exhausted (in which case the error has been forwarded to the
666/// channel).
667async fn backoff_or_give_up(
668    tx: &mpsc::Sender<Result<AuthenticatedEvent, LightClientError>>,
669    config: &AuthenticatedEventsConfig,
670    consecutive_failures: &mut u32,
671    err: LightClientError,
672) -> bool {
673    *consecutive_failures += 1;
674    if *consecutive_failures > config.max_connect_retries {
675        let _ = tx.send(Err(err)).await;
676        return false;
677    }
678    let base = config.retry_backoff.saturating_mul(*consecutive_failures);
679    let jitter = pseudo_jitter(*consecutive_failures, config.retry_jitter);
680    tokio::time::sleep(base.saturating_add(jitter)).await;
681    true
682}
683
684/// Deterministic jitter derived from the attempt count. Avoids
685/// pulling in `rand` for a single call; the goal is just to avoid
686/// lockstep retries across many concurrent streams, not cryptographic
687/// randomness.
688fn pseudo_jitter(attempts: u32, ceiling: Duration) -> Duration {
689    if ceiling.is_zero() {
690        return Duration::ZERO;
691    }
692    // Mix the attempt count with a fixed prime to spread values.
693    let mix = (u64::from(attempts).wrapping_mul(0x9E3779B97F4A7C15)) as u128;
694    let ceiling_ms = ceiling.as_millis().max(1);
695    let offset_ms = (mix % ceiling_ms) as u64;
696    Duration::from_millis(offset_ms)
697}