sui_core/
admission_queue.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::consensus_adapter::ConsensusAdapter;
6use arc_swap::ArcSwap;
7use mysten_common::debug_fatal;
8use mysten_metrics::spawn_monitored_task;
9use prometheus::{
10    Histogram, IntCounter, IntGauge, Registry, register_histogram_with_registry,
11    register_int_counter_with_registry, register_int_gauge_with_registry,
12};
13use std::collections::{BTreeMap, HashMap, VecDeque};
14use std::net::IpAddr;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, Instant};
18use sui_macros::handle_fail_point_if;
19use sui_network::tonic;
20use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
21use sui_types::messages_consensus::{
22    ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey,
23};
24use tokio::sync::{mpsc, oneshot};
25use tracing::debug;
26
27/// A transaction (or soft bundle) waiting in the admission queue for consensus submission.
28pub struct QueueEntry {
29    pub gas_price: u64,
30    pub transactions: Vec<ConsensusTransaction>,
31    pub position_sender: oneshot::Sender<Result<Vec<ConsensusPosition>, tonic::Status>>,
32    pub submitter_client_addr: Option<IpAddr>,
33    pub enqueue_time: Instant,
34}
35
36impl QueueEntry {
37    #[cfg(test)]
38    pub fn new_for_test(
39        gas_price: u64,
40        position_sender: oneshot::Sender<Result<Vec<ConsensusPosition>, tonic::Status>>,
41    ) -> Self {
42        Self {
43            gas_price,
44            transactions: vec![],
45            position_sender,
46            submitter_client_addr: None,
47            enqueue_time: Instant::now(),
48        }
49    }
50}
51
52/// Prometheus metrics for the admission queue.
53pub struct AdmissionQueueMetrics {
54    pub queue_depth: IntGauge,
55    pub queue_wait_latency: Histogram,
56    pub evictions: IntCounter,
57    pub rejections: IntCounter,
58    pub duplicate_inserts: IntCounter,
59}
60
61impl AdmissionQueueMetrics {
62    pub fn new(registry: &Registry) -> Self {
63        Self {
64            queue_depth: register_int_gauge_with_registry!(
65                "admission_queue_depth",
66                "Current number of entries in the admission priority queue",
67                registry,
68            )
69            .unwrap(),
70            queue_wait_latency: register_histogram_with_registry!(
71                "admission_queue_wait_latency",
72                "Time a transaction spends waiting in the admission queue before being drained",
73                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
74                registry,
75            )
76            .unwrap(),
77            evictions: register_int_counter_with_registry!(
78                "admission_queue_evictions",
79                "Number of entries evicted from the admission queue by higher gas price transactions",
80                registry,
81            )
82            .unwrap(),
83            rejections: register_int_counter_with_registry!(
84                "admission_queue_rejections",
85                "Number of transactions rejected because the queue was full and their gas price was too low",
86                registry,
87            )
88            .unwrap(),
89            duplicate_inserts: register_int_counter_with_registry!(
90                "admission_queue_duplicate_inserts",
91                "Transactions admitted to the queue whose ConsensusTransactionKey duplicated an entry already present. Tallied as spam for DoS protection.",
92                registry,
93            )
94            .unwrap(),
95        }
96    }
97
98    pub fn new_for_tests() -> Self {
99        Self::new(&Registry::new())
100    }
101}
102
103/// Bounded priority queue that orders transactions by gas price. Uses a BTreeMap
104/// for efficient access at both ends: lowest gas price (for eviction) and highest
105/// gas price (for draining to consensus). Entries at the same gas price are FIFO.
106pub struct PriorityAdmissionQueue {
107    capacity: usize,
108    map: BTreeMap<u64, VecDeque<QueueEntry>>,
109    /// Number of queue entries per transaction key, for duplicate detection.
110    queued_keys: HashMap<ConsensusTransactionKey, u32>,
111    total_len: usize,
112    metrics: Arc<AdmissionQueueMetrics>,
113}
114
115impl PriorityAdmissionQueue {
116    pub fn new(capacity: usize, metrics: Arc<AdmissionQueueMetrics>) -> Self {
117        Self {
118            capacity,
119            map: BTreeMap::new(),
120            queued_keys: HashMap::new(),
121            total_len: 0,
122            metrics,
123        }
124    }
125
126    pub fn len(&self) -> usize {
127        self.total_len
128    }
129
130    pub fn min_gas_price(&self) -> Option<u64> {
131        self.map.first_key_value().map(|(&k, _)| k)
132    }
133
134    /// On success, returns `Ok(true)` or `Ok(false)` to indicate whether the
135    /// value was newly inserted. Returns `Err` if the queue was full and the
136    /// tx's gas price was not high enough to evict an existing entry.
137    pub fn insert(&mut self, entry: QueueEntry) -> SuiResult<bool> {
138        let keys: Vec<_> = entry.transactions.iter().map(|t| t.key()).collect();
139        let newly_inserted = !keys.iter().any(|k| self.queued_keys.contains_key(k));
140        if !newly_inserted {
141            self.metrics.duplicate_inserts.inc();
142        }
143
144        if self.total_len < self.capacity {
145            self.push_entry(entry, keys);
146            self.metrics.queue_depth.set(self.total_len as i64);
147            return Ok(newly_inserted);
148        }
149
150        let min_price = self.min_gas_price().unwrap();
151        if entry.gas_price > min_price {
152            let evicter_price = entry.gas_price;
153            let evicted = self.evict_lowest();
154            self.push_entry(entry, keys);
155            self.metrics.evictions.inc();
156            // Signal the evicted entry's caller so `position_rx.await` returns
157            // a distinct outbid error rather than a generic RecvError.
158            let _ = evicted
159                .position_sender
160                .send(Err(tonic::Status::from(SuiError::from(
161                    SuiErrorKind::TransactionRejectedDueToOutbiddingDuringCongestion {
162                        min_gas_price: evicter_price,
163                    },
164                ))));
165            return Ok(newly_inserted);
166        }
167
168        self.metrics.rejections.inc();
169        Err(
170            SuiErrorKind::TransactionRejectedDueToOutbiddingDuringCongestion {
171                min_gas_price: min_price,
172            }
173            .into(),
174        )
175    }
176
177    /// Pop up to `count` entries, highest gas price first.
178    /// Within the same gas price, entries are returned in FIFO order.
179    pub fn pop_batch(&mut self, count: usize) -> Vec<QueueEntry> {
180        let mut remaining = count.min(self.total_len);
181        let mut entries = Vec::with_capacity(remaining);
182        while remaining > 0 {
183            let Some(mut last) = self.map.last_entry() else {
184                break;
185            };
186            let deque = last.get_mut();
187            if deque.len() <= remaining {
188                // Drain the entire price level at once.
189                remaining -= deque.len();
190                self.total_len -= deque.len();
191                entries.extend(last.remove());
192            } else {
193                // Partial drain from this price level.
194                self.total_len -= remaining;
195                entries.extend(deque.drain(..remaining));
196                remaining = 0;
197            }
198        }
199        for entry in &entries {
200            self.remove_keys(entry);
201        }
202        self.metrics.queue_depth.set(self.total_len as i64);
203        entries
204    }
205
206    pub fn is_empty(&self) -> bool {
207        self.total_len == 0
208    }
209
210    fn push_entry(&mut self, entry: QueueEntry, keys: Vec<ConsensusTransactionKey>) {
211        for key in keys {
212            *self.queued_keys.entry(key).or_insert(0) += 1;
213        }
214        self.map
215            .entry(entry.gas_price)
216            .or_default()
217            .push_back(entry);
218        self.total_len += 1;
219    }
220
221    fn evict_lowest(&mut self) -> QueueEntry {
222        let evicted = {
223            let mut first = self
224                .map
225                .first_entry()
226                .expect("evict_lowest called on empty queue");
227            let deque = first.get_mut();
228            let evicted = deque.pop_front().unwrap();
229            if deque.is_empty() {
230                first.remove();
231            }
232            evicted
233        };
234        self.remove_keys(&evicted);
235        self.total_len -= 1;
236        evicted
237    }
238
239    fn remove_keys(&mut self, entry: &QueueEntry) {
240        for tx in &entry.transactions {
241            let key = tx.key();
242            let std::collections::hash_map::Entry::Occupied(mut slot) = self.queued_keys.entry(key)
243            else {
244                debug_fatal!("remove_keys on absent key");
245                continue;
246            };
247            *slot.get_mut() -= 1;
248            if *slot.get() == 0 {
249                slot.remove();
250            }
251        }
252    }
253}
254
255/// Command sent from RPC handlers to the admission queue actor via mpsc channel.
256struct InsertCommand {
257    entry: QueueEntry,
258    response: oneshot::Sender<SuiResult<bool>>,
259}
260
261/// Cloneable handle for submitting transactions to the admission queue actor.
262/// Held by RPC handlers; the actor runs in a separate spawned task.
263#[derive(Clone)]
264pub struct AdmissionQueueHandle {
265    sender: mpsc::Sender<InsertCommand>,
266    /// The moment the queue last submitted an entry to consensus.
267    last_drain: Arc<Mutex<Instant>>,
268    queue_depth: Arc<AtomicUsize>,
269    failover_timeout: Duration,
270}
271
272impl AdmissionQueueHandle {
273    /// Returns true if the queue has been non-empty for longer than
274    /// `failover_timeout` without any drain to consensus. Callers should
275    /// bypass the queue entirely when this is true.
276    pub fn failover_tripped(&self) -> bool {
277        if self.queue_depth.load(Ordering::Relaxed) == 0 {
278            return false;
279        }
280        self.last_drain.lock().unwrap().elapsed() > self.failover_timeout
281    }
282
283    /// Returns `(position_receiver, newly_inserted)` on admission. Returns `Err` on outbid
284    /// rejection.
285    pub async fn try_insert(
286        &self,
287        gas_price: u64,
288        transactions: Vec<ConsensusTransaction>,
289        submitter_client_addr: Option<IpAddr>,
290    ) -> SuiResult<(
291        oneshot::Receiver<Result<Vec<ConsensusPosition>, tonic::Status>>,
292        bool,
293    )> {
294        let (position_tx, position_rx) = oneshot::channel();
295        let entry = QueueEntry {
296            gas_price,
297            transactions,
298            position_sender: position_tx,
299            submitter_client_addr,
300            enqueue_time: Instant::now(),
301        };
302
303        let (resp_tx, resp_rx) = oneshot::channel();
304        let cmd = InsertCommand {
305            entry,
306            response: resp_tx,
307        };
308
309        self.sender
310            .send(cmd)
311            .await
312            .map_err(|_| SuiError::from(SuiErrorKind::TooManyTransactionsPendingConsensus))?;
313
314        let newly_inserted = resp_rx
315            .await
316            .map_err(|_| SuiError::from(SuiErrorKind::TooManyTransactionsPendingConsensus))??;
317
318        Ok((position_rx, newly_inserted))
319    }
320}
321
322/// Manages the lifecycle of per-epoch admission queue actors.
323/// Holds immutable config and shared metrics; call `spawn()` each epoch
324/// with the new epoch store to create a fresh actor and handle.
325pub struct AdmissionQueueManager {
326    capacity: usize,
327    bypass_threshold: usize,
328    failover_timeout: Duration,
329    metrics: Arc<AdmissionQueueMetrics>,
330    consensus_adapter: Arc<ConsensusAdapter>,
331    slot_freed_notify: Arc<tokio::sync::Notify>,
332}
333
334impl AdmissionQueueManager {
335    pub fn new(
336        consensus_adapter: Arc<ConsensusAdapter>,
337        metrics: Arc<AdmissionQueueMetrics>,
338        capacity_fraction: f64,
339        bypass_fraction: f64,
340        failover_timeout: Duration,
341        slot_freed_notify: Arc<tokio::sync::Notify>,
342    ) -> Self {
343        let max_pending = consensus_adapter.max_pending_transactions();
344        let capacity = (max_pending as f64 * capacity_fraction) as usize;
345        assert!(
346            capacity > 0,
347            "admission_queue_capacity_fraction ({capacity_fraction}) * max_pending_transactions ({max_pending}) must be > 0"
348        );
349        Self {
350            capacity,
351            bypass_threshold: (max_pending as f64 * bypass_fraction) as usize,
352            failover_timeout,
353            metrics,
354            consensus_adapter,
355            slot_freed_notify,
356        }
357    }
358
359    pub fn new_for_tests(
360        consensus_adapter: Arc<ConsensusAdapter>,
361        slot_freed_notify: Arc<tokio::sync::Notify>,
362    ) -> Self {
363        Self {
364            capacity: 10_000,
365            bypass_threshold: usize::MAX,
366            failover_timeout: Duration::from_secs(30),
367            metrics: Arc::new(AdmissionQueueMetrics::new_for_tests()),
368            consensus_adapter,
369            slot_freed_notify,
370        }
371    }
372
373    pub fn metrics(&self) -> &Arc<AdmissionQueueMetrics> {
374        &self.metrics
375    }
376
377    pub(crate) fn bypass_threshold(&self) -> usize {
378        self.bypass_threshold
379    }
380
381    /// Spawns a new per-epoch admission queue actor and returns a handle.
382    /// The previous actor shuts down when its handle is dropped.
383    pub fn spawn(&self, epoch_store: Arc<AuthorityPerEpochStore>) -> AdmissionQueueHandle {
384        let last_drain = Arc::new(Mutex::new(Instant::now()));
385        let queue_depth = Arc::new(AtomicUsize::new(0));
386
387        let (sender, receiver) = mpsc::channel(self.capacity.max(1024));
388
389        let event_loop = AdmissionQueueEventLoop {
390            receiver,
391            queue: PriorityAdmissionQueue::new(self.capacity, self.metrics.clone()),
392            consensus_adapter: self.consensus_adapter.clone(),
393            slot_freed_notify: self.slot_freed_notify.clone(),
394            epoch_store,
395            last_drain: last_drain.clone(),
396            queue_depth: queue_depth.clone(),
397            last_published_depth: 0,
398        };
399        spawn_monitored_task!(event_loop.run());
400
401        AdmissionQueueHandle {
402            sender,
403            last_drain,
404            queue_depth,
405            failover_timeout: self.failover_timeout,
406        }
407    }
408}
409
410/// Shared handle to a live admission queue. Holds the manager (for spawning a
411/// fresh per-epoch actor on reconfig), the per-epoch `ArcSwap` handle, and the
412/// cached (config-derived) bypass threshold. Cloned cheaply by `Arc`; passed
413/// both to `ValidatorService` (for hot-path routing) and through
414/// `ValidatorComponents` (for epoch rotation).
415#[derive(Clone)]
416pub struct AdmissionQueueContext {
417    manager: Arc<AdmissionQueueManager>,
418    swap: Arc<ArcSwap<AdmissionQueueHandle>>,
419}
420
421impl AdmissionQueueContext {
422    pub fn spawn(
423        manager: Arc<AdmissionQueueManager>,
424        epoch_store: Arc<AuthorityPerEpochStore>,
425    ) -> Self {
426        let initial_handle = manager.spawn(epoch_store);
427        let swap = Arc::new(ArcSwap::new(Arc::new(initial_handle)));
428        Self { manager, swap }
429    }
430
431    /// Spawns a new per-epoch actor and atomically replaces the current handle.
432    /// The old actor shuts down when its handle is dropped.
433    pub fn rotate_for_epoch(&self, epoch_store: Arc<AuthorityPerEpochStore>) {
434        self.swap.store(Arc::new(self.manager.spawn(epoch_store)));
435    }
436
437    pub(crate) fn bypass_threshold(&self) -> usize {
438        self.manager.bypass_threshold()
439    }
440
441    pub(crate) fn load(&self) -> arc_swap::Guard<Arc<AdmissionQueueHandle>> {
442        self.swap.load()
443    }
444}
445
446/// Per-epoch event loop that owns the priority queue and drains entries
447/// to consensus as capacity becomes available.
448struct AdmissionQueueEventLoop {
449    receiver: mpsc::Receiver<InsertCommand>,
450    queue: PriorityAdmissionQueue,
451    consensus_adapter: Arc<ConsensusAdapter>,
452    slot_freed_notify: Arc<tokio::sync::Notify>,
453    epoch_store: Arc<AuthorityPerEpochStore>,
454    last_drain: Arc<Mutex<Instant>>,
455    queue_depth: Arc<AtomicUsize>,
456    last_published_depth: usize,
457}
458
459impl AdmissionQueueEventLoop {
460    pub async fn run(mut self) {
461        loop {
462            self.process_pending_inserts();
463            self.publish_queue_depth();
464
465            if !handle_fail_point_if("admission_queue_disable_drain")
466                && !self.queue.is_empty()
467                && self.has_consensus_capacity()
468            {
469                self.drain_batch();
470                self.publish_queue_depth();
471                continue;
472            }
473
474            if self.queue.is_empty() {
475                // Nothing to drain — just wait for a new insert.
476                match self.receiver.recv().await {
477                    Some(cmd) => self.handle_insert(cmd),
478                    None => {
479                        debug!("Admission queue actor shutting down");
480                        break;
481                    }
482                }
483                continue;
484            }
485
486            // Queue has entries but consensus is at capacity. Wait for either
487            // a new insert or a freed inflight slot.
488            // Register the notified future BEFORE re-checking capacity to avoid
489            // missing notifications.
490            let notify = self.slot_freed_notify.clone();
491            let slot_freed = notify.notified();
492            tokio::pin!(slot_freed);
493
494            self.process_pending_inserts();
495            if !handle_fail_point_if("admission_queue_disable_drain")
496                && !self.queue.is_empty()
497                && self.has_consensus_capacity()
498            {
499                continue;
500            }
501
502            tokio::select! {
503                biased;
504
505                result = self.receiver.recv() => {
506                    match result {
507                        Some(cmd) => self.handle_insert(cmd),
508                        None => {
509                            debug!("Admission queue actor shutting down");
510                            break;
511                        }
512                    }
513                }
514
515                _ = &mut slot_freed => {}
516            }
517        }
518    }
519
520    fn publish_queue_depth(&mut self) {
521        let len = self.queue.len();
522        if len != self.last_published_depth {
523            self.queue_depth.store(len, Ordering::Relaxed);
524            self.last_published_depth = len;
525        }
526    }
527
528    fn process_pending_inserts(&mut self) {
529        while let Ok(cmd) = self.receiver.try_recv() {
530            self.handle_insert(cmd);
531        }
532    }
533
534    fn has_consensus_capacity(&self) -> bool {
535        self.consensus_adapter.num_inflight_transactions()
536            < u64::try_from(self.consensus_adapter.max_pending_transactions()).unwrap()
537    }
538
539    fn drain_batch(&mut self) {
540        let max_pending = u64::try_from(self.consensus_adapter.max_pending_transactions()).unwrap();
541        let available =
542            max_pending.saturating_sub(self.consensus_adapter.num_inflight_transactions());
543        let entries = self.queue.pop_batch(usize::try_from(available).unwrap());
544        if entries.is_empty() {
545            return;
546        }
547        for entry in entries {
548            self.queue
549                .metrics
550                .queue_wait_latency
551                .observe(entry.enqueue_time.elapsed().as_secs_f64());
552            let adapter = self.consensus_adapter.clone();
553            let es = self.epoch_store.clone();
554            spawn_monitored_task!(submit_queue_entry(entry, adapter, es));
555        }
556        *self.last_drain.lock().unwrap() = Instant::now();
557    }
558
559    fn handle_insert(&mut self, cmd: InsertCommand) {
560        let _ = cmd.response.send(self.queue.insert(cmd.entry));
561    }
562}
563
564async fn submit_queue_entry(
565    entry: QueueEntry,
566    consensus_adapter: Arc<ConsensusAdapter>,
567    epoch_store: Arc<AuthorityPerEpochStore>,
568) {
569    let _ = entry.position_sender.send(
570        consensus_adapter
571            .submit_and_get_positions(
572                entry.transactions,
573                &epoch_store,
574                entry.submitter_client_addr,
575            )
576            .await
577            .map_err(tonic::Status::from),
578    );
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584
585    fn make_test_entry(
586        gas_price: u64,
587    ) -> (
588        QueueEntry,
589        oneshot::Receiver<Result<Vec<ConsensusPosition>, tonic::Status>>,
590    ) {
591        let (tx, rx) = oneshot::channel();
592        (QueueEntry::new_for_test(gas_price, tx), rx)
593    }
594
595    fn build_queue(capacity: usize, gas_prices: &[u64]) -> PriorityAdmissionQueue {
596        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
597        let mut q = PriorityAdmissionQueue::new(capacity, metrics);
598        for &gp in gas_prices {
599            let (entry, _) = make_test_entry(gp);
600            q.insert(entry).unwrap();
601        }
602        q
603    }
604
605    #[test]
606    fn test_insert_within_capacity() {
607        let q = build_queue(3, &[100, 200, 50]);
608        assert_eq!(q.len(), 3);
609    }
610
611    #[test]
612    fn test_eviction_when_full() {
613        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
614        let mut q = PriorityAdmissionQueue::new(2, metrics);
615
616        let (e1, mut r1) = make_test_entry(100);
617        let (e2, _) = make_test_entry(200);
618        let (e3, _) = make_test_entry(300);
619
620        q.insert(e1).unwrap();
621        q.insert(e2).unwrap();
622        assert_eq!(q.len(), 2);
623
624        assert!(q.insert(e3).is_ok());
625        assert_eq!(q.len(), 2);
626        // Evicted entry's caller receives an explicit outbid error.
627        let r1_result = r1.try_recv().expect("evicted entry must be signalled");
628        assert!(matches!(r1_result, Err(ref status) if status.message().contains("outbid")));
629    }
630
631    #[test]
632    fn test_rejection_when_full_and_low_price() {
633        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
634        let mut q = PriorityAdmissionQueue::new(2, metrics);
635
636        let (e1, _) = make_test_entry(100);
637        let (e2, _) = make_test_entry(200);
638        let (e3, mut r3) = make_test_entry(50);
639
640        q.insert(e1).unwrap();
641        q.insert(e2).unwrap();
642
643        assert!(matches!(
644            q.insert(e3).unwrap_err().as_inner(),
645            SuiErrorKind::TransactionRejectedDueToOutbiddingDuringCongestion { min_gas_price: 100 }
646        ));
647        assert_eq!(q.len(), 2);
648        assert!(r3.try_recv().is_err());
649    }
650
651    #[test]
652    fn test_pop_batch() {
653        let mut q = build_queue(5, &[100, 300, 200]);
654        let batch = q.pop_batch(2);
655        assert_eq!(batch.len(), 2);
656        assert_eq!(q.len(), 1);
657    }
658
659    #[test]
660    fn test_min_gas_price() {
661        let q = build_queue(5, &[200, 100, 300]);
662        assert_eq!(q.min_gas_price(), Some(100));
663    }
664
665    #[test]
666    fn test_gasless_tx_evicted_first() {
667        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
668        let mut q = PriorityAdmissionQueue::new(2, metrics);
669
670        let (gasless, mut r_gasless) = make_test_entry(0);
671        let (normal, _) = make_test_entry(1000);
672        let (high, _) = make_test_entry(2000);
673
674        q.insert(gasless).unwrap();
675        q.insert(normal).unwrap();
676
677        assert!(q.insert(high).is_ok());
678        let gasless_result = r_gasless
679            .try_recv()
680            .expect("evicted gasless entry must be signalled");
681        assert!(matches!(gasless_result, Err(ref status) if status.message().contains("outbid")));
682        assert_eq!(q.min_gas_price(), Some(1000));
683    }
684
685    #[test]
686    fn test_pop_batch_returns_highest_gas_price_first() {
687        let mut q = build_queue(5, &[100, 500, 200, 400, 300]);
688        let batch = q.pop_batch(5);
689        let gas_prices: Vec<u64> = batch.iter().map(|e| e.gas_price).collect();
690        assert_eq!(gas_prices, vec![500, 400, 300, 200, 100]);
691    }
692
693    #[test]
694    fn test_equal_gas_price_rejected_when_full() {
695        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
696        let mut q = PriorityAdmissionQueue::new(1, metrics);
697
698        let (e1, _) = make_test_entry(100);
699        let (e2, _) = make_test_entry(100);
700
701        q.insert(e1).unwrap();
702        assert!(matches!(
703            q.insert(e2).unwrap_err().as_inner(),
704            SuiErrorKind::TransactionRejectedDueToOutbiddingDuringCongestion { min_gas_price: 100 }
705        ));
706    }
707
708    fn make_dup_entry(
709        gas_price: u64,
710        tx: ConsensusTransaction,
711    ) -> (
712        QueueEntry,
713        oneshot::Receiver<Result<Vec<ConsensusPosition>, tonic::Status>>,
714    ) {
715        let (position_tx, position_rx) = oneshot::channel();
716        let entry = QueueEntry {
717            gas_price,
718            transactions: vec![tx],
719            position_sender: position_tx,
720            submitter_client_addr: None,
721            enqueue_time: Instant::now(),
722        };
723        (entry, position_rx)
724    }
725
726    #[test]
727    fn test_duplicate_transaction_admitted_and_flagged() {
728        use sui_types::base_types::AuthorityName;
729
730        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
731        let mut q = PriorityAdmissionQueue::new(10, metrics);
732
733        let tx = ConsensusTransaction::new_end_of_publish(AuthorityName::ZERO);
734
735        let (entry1, _rx1) = make_dup_entry(100, tx.clone());
736        assert!(q.insert(entry1).unwrap());
737        assert_eq!(q.len(), 1);
738
739        // Same transaction again — admitted, but flagged as not-fresh so the
740        // RPC layer can tally it as spam for DoS protection.
741        let (entry2, _rx2) = make_dup_entry(100, tx.clone());
742        assert!(!q.insert(entry2).unwrap());
743        assert_eq!(q.len(), 2);
744    }
745
746    #[test]
747    fn test_duplicate_key_counter_decrements_on_pop() {
748        use sui_types::base_types::AuthorityName;
749
750        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
751        let mut q = PriorityAdmissionQueue::new(10, metrics);
752
753        let tx = ConsensusTransaction::new_end_of_publish(AuthorityName::ZERO);
754
755        // Insert two copies of the same tx.
756        let (entry1, _rx1) = make_dup_entry(100, tx.clone());
757        q.insert(entry1).unwrap();
758        let (entry2, _rx2) = make_dup_entry(100, tx.clone());
759        assert!(!q.insert(entry2).unwrap());
760
761        // Pop one copy. The key's counter should drop to 1 — a fresh insert
762        // should still be flagged as not-fresh against the remaining copy.
763        let batch = q.pop_batch(1);
764        assert_eq!(batch.len(), 1);
765        let (entry3, _rx3) = make_dup_entry(100, tx.clone());
766        assert!(!q.insert(entry3).unwrap());
767        assert_eq!(q.len(), 2);
768
769        // Drain both remaining entries. The counter should hit 0 and the key
770        // should be removed — a subsequent insert is fresh again.
771        let _ = q.pop_batch(q.len());
772        assert!(q.is_empty());
773        let (entry4, _rx4) = make_dup_entry(100, tx);
774        assert!(q.insert(entry4).unwrap());
775    }
776
777    #[test]
778    fn test_duplicate_key_counter_decrements_on_evict() {
779        use sui_types::base_types::AuthorityName;
780
781        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
782        let mut q = PriorityAdmissionQueue::new(2, metrics);
783
784        let tx = ConsensusTransaction::new_end_of_publish(AuthorityName::ZERO);
785
786        // Fill queue with two copies of `tx` at price 100.
787        let (entry1, _rx1) = make_dup_entry(100, tx.clone());
788        q.insert(entry1).unwrap();
789        let (entry2, _rx2) = make_dup_entry(100, tx.clone());
790        q.insert(entry2).unwrap();
791        assert_eq!(q.len(), 2);
792
793        // Evict one dup with a higher-priced non-dup.
794        let (filler, _) = make_test_entry(200);
795        q.insert(filler).unwrap();
796        assert_eq!(q.len(), 2);
797
798        // Evict the remaining dup with another non-dup. After both dups are
799        // evicted, the counter should hit 0 and re-inserting `tx` is not a
800        // duplicate.
801        let (filler2, _) = make_test_entry(300);
802        q.insert(filler2).unwrap();
803
804        let (entry3, _rx3) = make_dup_entry(500, tx);
805        assert!(q.insert(entry3).unwrap());
806    }
807
808    #[tokio::test]
809    async fn test_actor_shuts_down_when_handle_dropped() {
810        use crate::authority::test_authority_builder::TestAuthorityBuilder;
811        use crate::checkpoints::CheckpointStore;
812        use crate::consensus_adapter::ConsensusAdapterMetrics;
813        use crate::mysticeti_adapter::LazyMysticetiClient;
814        use sui_types::base_types::AuthorityName;
815
816        let state = TestAuthorityBuilder::new().build().await;
817        let epoch_store = state.epoch_store_for_testing().clone();
818        let consensus_adapter = Arc::new(ConsensusAdapter::new(
819            Arc::new(LazyMysticetiClient::new()),
820            CheckpointStore::new_for_tests(),
821            AuthorityName::ZERO,
822            100_000,
823            100_000,
824            ConsensusAdapterMetrics::new_test(),
825            Arc::new(tokio::sync::Notify::new()),
826        ));
827
828        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
829        let (sender, receiver) = mpsc::channel(100);
830        let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
831
832        let event_loop = AdmissionQueueEventLoop {
833            receiver,
834            queue: PriorityAdmissionQueue::new(100, metrics),
835            consensus_adapter,
836            slot_freed_notify,
837            epoch_store,
838            last_drain: Arc::new(Mutex::new(Instant::now())),
839            queue_depth: Arc::new(AtomicUsize::new(0)),
840            last_published_depth: 0,
841        };
842
843        let handle = tokio::spawn(event_loop.run());
844
845        // Drop the sender — this closes the channel.
846        drop(sender);
847
848        // The actor should exit promptly.
849        tokio::time::timeout(std::time::Duration::from_secs(5), handle)
850            .await
851            .expect("actor did not shut down within timeout")
852            .expect("actor task panicked");
853    }
854
855    async fn build_consensus_adapter(
856        max_pending_transactions: usize,
857    ) -> (
858        Arc<ConsensusAdapter>,
859        Arc<AuthorityPerEpochStore>,
860        Arc<tokio::sync::Notify>,
861    ) {
862        use crate::authority::test_authority_builder::TestAuthorityBuilder;
863        use crate::checkpoints::CheckpointStore;
864        use crate::consensus_adapter::ConsensusAdapterMetrics;
865        use crate::mysticeti_adapter::LazyMysticetiClient;
866        use sui_types::base_types::AuthorityName;
867
868        let state = TestAuthorityBuilder::new().build().await;
869        let epoch_store = state.epoch_store_for_testing().clone();
870        let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
871        let adapter = Arc::new(ConsensusAdapter::new(
872            Arc::new(LazyMysticetiClient::new()),
873            CheckpointStore::new_for_tests(),
874            AuthorityName::ZERO,
875            max_pending_transactions,
876            100_000,
877            ConsensusAdapterMetrics::new_test(),
878            slot_freed_notify.clone(),
879        ));
880        (adapter, epoch_store, slot_freed_notify)
881    }
882
883    #[tokio::test]
884    async fn test_failover_tripped_when_actor_stalls() {
885        // Construct a handle with a tiny failover window and no running actor.
886        // Failover requires queue_depth > 0, so simulate a non-empty queue.
887        let handle = AdmissionQueueHandle {
888            sender: mpsc::channel(1).0,
889            last_drain: Arc::new(Mutex::new(Instant::now())),
890            queue_depth: Arc::new(AtomicUsize::new(1)),
891            failover_timeout: Duration::from_millis(10),
892        };
893        assert!(!handle.failover_tripped());
894        tokio::time::sleep(Duration::from_millis(30)).await;
895        assert!(handle.failover_tripped());
896
897        // An empty queue is never a failover, even if last_drain is stale.
898        handle.queue_depth.store(0, Ordering::Relaxed);
899        assert!(!handle.failover_tripped());
900    }
901
902    #[tokio::test]
903    async fn test_idle_actor_does_not_trip_failover() {
904        // A healthy actor with an empty queue must never trip failover, even
905        // after long idle periods while blocked on `receiver.recv()`.
906        let (adapter, epoch_store, notify) = build_consensus_adapter(100_000).await;
907        let manager = AdmissionQueueManager::new(
908            adapter,
909            Arc::new(AdmissionQueueMetrics::new_for_tests()),
910            0.5,
911            0.9,
912            Duration::from_millis(10),
913            notify,
914        );
915        let handle = manager.spawn(epoch_store);
916        tokio::time::sleep(Duration::from_millis(50)).await;
917        assert!(
918            !handle.failover_tripped(),
919            "idle actor with empty queue must not trip failover"
920        );
921    }
922
923    /// If `drain_batch` is entered but consensus has zero slots available
924    /// (the inflight count raced past `max_pending_transactions` between the
925    /// `has_consensus_capacity` check and the read inside `drain_batch`), no
926    /// entries are popped and `last_drain` must NOT advance — otherwise a
927    /// truly stuck drainer would be hidden from the failover check.
928    #[tokio::test]
929    async fn test_drain_batch_does_not_bump_last_drain_when_no_slots() {
930        let (adapter, epoch_store, notify) = build_consensus_adapter(0).await;
931        let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
932        let (_sender, receiver) = mpsc::channel(10);
933
934        let mut queue = PriorityAdmissionQueue::new(10, metrics.clone());
935        let (entry, _rx) = make_test_entry(100);
936        assert!(queue.insert(entry).is_ok());
937        assert_eq!(queue.len(), 1);
938
939        let last_drain = Arc::new(Mutex::new(Instant::now()));
940        let before = *last_drain.lock().unwrap();
941
942        let mut event_loop = AdmissionQueueEventLoop {
943            receiver,
944            queue,
945            consensus_adapter: adapter,
946            slot_freed_notify: notify,
947            epoch_store,
948            last_drain: last_drain.clone(),
949            queue_depth: Arc::new(AtomicUsize::new(0)),
950            last_published_depth: 0,
951        };
952
953        // Sleep so that if drain_batch erroneously stamps Instant::now() the
954        // stored value would differ from `before`.
955        tokio::time::sleep(Duration::from_millis(20)).await;
956
957        event_loop.drain_batch();
958
959        assert_eq!(event_loop.queue.len(), 1, "no entries should be drained");
960        assert_eq!(
961            *last_drain.lock().unwrap(),
962            before,
963            "last_drain must not advance when drain_batch drained nothing"
964        );
965    }
966}