1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::consensus_adapter::ConsensusAdapter;
6use arc_swap::ArcSwap;
7use mysten_common::debug_fatal;
8use mysten_metrics::spawn_monitored_task;
9use prometheus::{
10 Histogram, IntCounter, IntGauge, Registry, register_histogram_with_registry,
11 register_int_counter_with_registry, register_int_gauge_with_registry,
12};
13use std::collections::{BTreeMap, HashMap, VecDeque};
14use std::net::IpAddr;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, Instant};
18use sui_macros::handle_fail_point_if;
19use sui_network::tonic;
20use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
21use sui_types::messages_consensus::{
22 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey,
23};
24use tokio::sync::{mpsc, oneshot};
25use tracing::debug;
26
27pub struct QueueEntry {
29 pub gas_price: u64,
30 pub transactions: Vec<ConsensusTransaction>,
31 pub position_sender: oneshot::Sender<Result<Vec<ConsensusPosition>, tonic::Status>>,
32 pub submitter_client_addr: Option<IpAddr>,
33 pub enqueue_time: Instant,
34}
35
36impl QueueEntry {
37 #[cfg(test)]
38 pub fn new_for_test(
39 gas_price: u64,
40 position_sender: oneshot::Sender<Result<Vec<ConsensusPosition>, tonic::Status>>,
41 ) -> Self {
42 Self {
43 gas_price,
44 transactions: vec![],
45 position_sender,
46 submitter_client_addr: None,
47 enqueue_time: Instant::now(),
48 }
49 }
50}
51
52pub struct AdmissionQueueMetrics {
54 pub queue_depth: IntGauge,
55 pub queue_wait_latency: Histogram,
56 pub evictions: IntCounter,
57 pub rejections: IntCounter,
58 pub duplicate_inserts: IntCounter,
59}
60
61impl AdmissionQueueMetrics {
62 pub fn new(registry: &Registry) -> Self {
63 Self {
64 queue_depth: register_int_gauge_with_registry!(
65 "admission_queue_depth",
66 "Current number of entries in the admission priority queue",
67 registry,
68 )
69 .unwrap(),
70 queue_wait_latency: register_histogram_with_registry!(
71 "admission_queue_wait_latency",
72 "Time a transaction spends waiting in the admission queue before being drained",
73 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
74 registry,
75 )
76 .unwrap(),
77 evictions: register_int_counter_with_registry!(
78 "admission_queue_evictions",
79 "Number of entries evicted from the admission queue by higher gas price transactions",
80 registry,
81 )
82 .unwrap(),
83 rejections: register_int_counter_with_registry!(
84 "admission_queue_rejections",
85 "Number of transactions rejected because the queue was full and their gas price was too low",
86 registry,
87 )
88 .unwrap(),
89 duplicate_inserts: register_int_counter_with_registry!(
90 "admission_queue_duplicate_inserts",
91 "Transactions admitted to the queue whose ConsensusTransactionKey duplicated an entry already present. Tallied as spam for DoS protection.",
92 registry,
93 )
94 .unwrap(),
95 }
96 }
97
98 pub fn new_for_tests() -> Self {
99 Self::new(&Registry::new())
100 }
101}
102
103pub struct PriorityAdmissionQueue {
107 capacity: usize,
108 map: BTreeMap<u64, VecDeque<QueueEntry>>,
109 queued_keys: HashMap<ConsensusTransactionKey, u32>,
111 total_len: usize,
112 metrics: Arc<AdmissionQueueMetrics>,
113}
114
115impl PriorityAdmissionQueue {
116 pub fn new(capacity: usize, metrics: Arc<AdmissionQueueMetrics>) -> Self {
117 Self {
118 capacity,
119 map: BTreeMap::new(),
120 queued_keys: HashMap::new(),
121 total_len: 0,
122 metrics,
123 }
124 }
125
126 pub fn len(&self) -> usize {
127 self.total_len
128 }
129
130 pub fn min_gas_price(&self) -> Option<u64> {
131 self.map.first_key_value().map(|(&k, _)| k)
132 }
133
134 pub fn insert(&mut self, entry: QueueEntry) -> SuiResult<bool> {
138 let keys: Vec<_> = entry.transactions.iter().map(|t| t.key()).collect();
139 let newly_inserted = !keys.iter().any(|k| self.queued_keys.contains_key(k));
140 if !newly_inserted {
141 self.metrics.duplicate_inserts.inc();
142 }
143
144 if self.total_len < self.capacity {
145 self.push_entry(entry, keys);
146 self.metrics.queue_depth.set(self.total_len as i64);
147 return Ok(newly_inserted);
148 }
149
150 let min_price = self.min_gas_price().unwrap();
151 if entry.gas_price > min_price {
152 let evicter_price = entry.gas_price;
153 let evicted = self.evict_lowest();
154 self.push_entry(entry, keys);
155 self.metrics.evictions.inc();
156 let _ = evicted
159 .position_sender
160 .send(Err(tonic::Status::from(SuiError::from(
161 SuiErrorKind::TransactionRejectedDueToOutbiddingDuringCongestion {
162 min_gas_price: evicter_price,
163 },
164 ))));
165 return Ok(newly_inserted);
166 }
167
168 self.metrics.rejections.inc();
169 Err(
170 SuiErrorKind::TransactionRejectedDueToOutbiddingDuringCongestion {
171 min_gas_price: min_price,
172 }
173 .into(),
174 )
175 }
176
177 pub fn pop_batch(&mut self, count: usize) -> Vec<QueueEntry> {
180 let mut remaining = count.min(self.total_len);
181 let mut entries = Vec::with_capacity(remaining);
182 while remaining > 0 {
183 let Some(mut last) = self.map.last_entry() else {
184 break;
185 };
186 let deque = last.get_mut();
187 if deque.len() <= remaining {
188 remaining -= deque.len();
190 self.total_len -= deque.len();
191 entries.extend(last.remove());
192 } else {
193 self.total_len -= remaining;
195 entries.extend(deque.drain(..remaining));
196 remaining = 0;
197 }
198 }
199 for entry in &entries {
200 self.remove_keys(entry);
201 }
202 self.metrics.queue_depth.set(self.total_len as i64);
203 entries
204 }
205
206 pub fn is_empty(&self) -> bool {
207 self.total_len == 0
208 }
209
210 fn push_entry(&mut self, entry: QueueEntry, keys: Vec<ConsensusTransactionKey>) {
211 for key in keys {
212 *self.queued_keys.entry(key).or_insert(0) += 1;
213 }
214 self.map
215 .entry(entry.gas_price)
216 .or_default()
217 .push_back(entry);
218 self.total_len += 1;
219 }
220
221 fn evict_lowest(&mut self) -> QueueEntry {
222 let evicted = {
223 let mut first = self
224 .map
225 .first_entry()
226 .expect("evict_lowest called on empty queue");
227 let deque = first.get_mut();
228 let evicted = deque.pop_front().unwrap();
229 if deque.is_empty() {
230 first.remove();
231 }
232 evicted
233 };
234 self.remove_keys(&evicted);
235 self.total_len -= 1;
236 evicted
237 }
238
239 fn remove_keys(&mut self, entry: &QueueEntry) {
240 for tx in &entry.transactions {
241 let key = tx.key();
242 let std::collections::hash_map::Entry::Occupied(mut slot) = self.queued_keys.entry(key)
243 else {
244 debug_fatal!("remove_keys on absent key");
245 continue;
246 };
247 *slot.get_mut() -= 1;
248 if *slot.get() == 0 {
249 slot.remove();
250 }
251 }
252 }
253}
254
255struct InsertCommand {
257 entry: QueueEntry,
258 response: oneshot::Sender<SuiResult<bool>>,
259}
260
261#[derive(Clone)]
264pub struct AdmissionQueueHandle {
265 sender: mpsc::Sender<InsertCommand>,
266 last_drain: Arc<Mutex<Instant>>,
268 queue_depth: Arc<AtomicUsize>,
269 failover_timeout: Duration,
270}
271
272impl AdmissionQueueHandle {
273 pub fn failover_tripped(&self) -> bool {
277 if self.queue_depth.load(Ordering::Relaxed) == 0 {
278 return false;
279 }
280 self.last_drain.lock().unwrap().elapsed() > self.failover_timeout
281 }
282
283 pub async fn try_insert(
286 &self,
287 gas_price: u64,
288 transactions: Vec<ConsensusTransaction>,
289 submitter_client_addr: Option<IpAddr>,
290 ) -> SuiResult<(
291 oneshot::Receiver<Result<Vec<ConsensusPosition>, tonic::Status>>,
292 bool,
293 )> {
294 let (position_tx, position_rx) = oneshot::channel();
295 let entry = QueueEntry {
296 gas_price,
297 transactions,
298 position_sender: position_tx,
299 submitter_client_addr,
300 enqueue_time: Instant::now(),
301 };
302
303 let (resp_tx, resp_rx) = oneshot::channel();
304 let cmd = InsertCommand {
305 entry,
306 response: resp_tx,
307 };
308
309 self.sender
310 .send(cmd)
311 .await
312 .map_err(|_| SuiError::from(SuiErrorKind::TooManyTransactionsPendingConsensus))?;
313
314 let newly_inserted = resp_rx
315 .await
316 .map_err(|_| SuiError::from(SuiErrorKind::TooManyTransactionsPendingConsensus))??;
317
318 Ok((position_rx, newly_inserted))
319 }
320}
321
322pub struct AdmissionQueueManager {
326 capacity: usize,
327 bypass_threshold: usize,
328 failover_timeout: Duration,
329 metrics: Arc<AdmissionQueueMetrics>,
330 consensus_adapter: Arc<ConsensusAdapter>,
331 slot_freed_notify: Arc<tokio::sync::Notify>,
332}
333
334impl AdmissionQueueManager {
335 pub fn new(
336 consensus_adapter: Arc<ConsensusAdapter>,
337 metrics: Arc<AdmissionQueueMetrics>,
338 capacity_fraction: f64,
339 bypass_fraction: f64,
340 failover_timeout: Duration,
341 slot_freed_notify: Arc<tokio::sync::Notify>,
342 ) -> Self {
343 let max_pending = consensus_adapter.max_pending_transactions();
344 let capacity = (max_pending as f64 * capacity_fraction) as usize;
345 assert!(
346 capacity > 0,
347 "admission_queue_capacity_fraction ({capacity_fraction}) * max_pending_transactions ({max_pending}) must be > 0"
348 );
349 Self {
350 capacity,
351 bypass_threshold: (max_pending as f64 * bypass_fraction) as usize,
352 failover_timeout,
353 metrics,
354 consensus_adapter,
355 slot_freed_notify,
356 }
357 }
358
359 pub fn new_for_tests(
360 consensus_adapter: Arc<ConsensusAdapter>,
361 slot_freed_notify: Arc<tokio::sync::Notify>,
362 ) -> Self {
363 Self {
364 capacity: 10_000,
365 bypass_threshold: usize::MAX,
366 failover_timeout: Duration::from_secs(30),
367 metrics: Arc::new(AdmissionQueueMetrics::new_for_tests()),
368 consensus_adapter,
369 slot_freed_notify,
370 }
371 }
372
373 pub fn metrics(&self) -> &Arc<AdmissionQueueMetrics> {
374 &self.metrics
375 }
376
377 pub(crate) fn bypass_threshold(&self) -> usize {
378 self.bypass_threshold
379 }
380
381 pub fn spawn(&self, epoch_store: Arc<AuthorityPerEpochStore>) -> AdmissionQueueHandle {
384 let last_drain = Arc::new(Mutex::new(Instant::now()));
385 let queue_depth = Arc::new(AtomicUsize::new(0));
386
387 let (sender, receiver) = mpsc::channel(self.capacity.max(1024));
388
389 let event_loop = AdmissionQueueEventLoop {
390 receiver,
391 queue: PriorityAdmissionQueue::new(self.capacity, self.metrics.clone()),
392 consensus_adapter: self.consensus_adapter.clone(),
393 slot_freed_notify: self.slot_freed_notify.clone(),
394 epoch_store,
395 last_drain: last_drain.clone(),
396 queue_depth: queue_depth.clone(),
397 last_published_depth: 0,
398 };
399 spawn_monitored_task!(event_loop.run());
400
401 AdmissionQueueHandle {
402 sender,
403 last_drain,
404 queue_depth,
405 failover_timeout: self.failover_timeout,
406 }
407 }
408}
409
410#[derive(Clone)]
416pub struct AdmissionQueueContext {
417 manager: Arc<AdmissionQueueManager>,
418 swap: Arc<ArcSwap<AdmissionQueueHandle>>,
419}
420
421impl AdmissionQueueContext {
422 pub fn spawn(
423 manager: Arc<AdmissionQueueManager>,
424 epoch_store: Arc<AuthorityPerEpochStore>,
425 ) -> Self {
426 let initial_handle = manager.spawn(epoch_store);
427 let swap = Arc::new(ArcSwap::new(Arc::new(initial_handle)));
428 Self { manager, swap }
429 }
430
431 pub fn rotate_for_epoch(&self, epoch_store: Arc<AuthorityPerEpochStore>) {
434 self.swap.store(Arc::new(self.manager.spawn(epoch_store)));
435 }
436
437 pub(crate) fn bypass_threshold(&self) -> usize {
438 self.manager.bypass_threshold()
439 }
440
441 pub(crate) fn load(&self) -> arc_swap::Guard<Arc<AdmissionQueueHandle>> {
442 self.swap.load()
443 }
444}
445
446struct AdmissionQueueEventLoop {
449 receiver: mpsc::Receiver<InsertCommand>,
450 queue: PriorityAdmissionQueue,
451 consensus_adapter: Arc<ConsensusAdapter>,
452 slot_freed_notify: Arc<tokio::sync::Notify>,
453 epoch_store: Arc<AuthorityPerEpochStore>,
454 last_drain: Arc<Mutex<Instant>>,
455 queue_depth: Arc<AtomicUsize>,
456 last_published_depth: usize,
457}
458
459impl AdmissionQueueEventLoop {
460 pub async fn run(mut self) {
461 loop {
462 self.process_pending_inserts();
463 self.publish_queue_depth();
464
465 if !handle_fail_point_if("admission_queue_disable_drain")
466 && !self.queue.is_empty()
467 && self.has_consensus_capacity()
468 {
469 self.drain_batch();
470 self.publish_queue_depth();
471 continue;
472 }
473
474 if self.queue.is_empty() {
475 match self.receiver.recv().await {
477 Some(cmd) => self.handle_insert(cmd),
478 None => {
479 debug!("Admission queue actor shutting down");
480 break;
481 }
482 }
483 continue;
484 }
485
486 let notify = self.slot_freed_notify.clone();
491 let slot_freed = notify.notified();
492 tokio::pin!(slot_freed);
493
494 self.process_pending_inserts();
495 if !handle_fail_point_if("admission_queue_disable_drain")
496 && !self.queue.is_empty()
497 && self.has_consensus_capacity()
498 {
499 continue;
500 }
501
502 tokio::select! {
503 biased;
504
505 result = self.receiver.recv() => {
506 match result {
507 Some(cmd) => self.handle_insert(cmd),
508 None => {
509 debug!("Admission queue actor shutting down");
510 break;
511 }
512 }
513 }
514
515 _ = &mut slot_freed => {}
516 }
517 }
518 }
519
520 fn publish_queue_depth(&mut self) {
521 let len = self.queue.len();
522 if len != self.last_published_depth {
523 self.queue_depth.store(len, Ordering::Relaxed);
524 self.last_published_depth = len;
525 }
526 }
527
528 fn process_pending_inserts(&mut self) {
529 while let Ok(cmd) = self.receiver.try_recv() {
530 self.handle_insert(cmd);
531 }
532 }
533
534 fn has_consensus_capacity(&self) -> bool {
535 self.consensus_adapter.num_inflight_transactions()
536 < u64::try_from(self.consensus_adapter.max_pending_transactions()).unwrap()
537 }
538
539 fn drain_batch(&mut self) {
540 let max_pending = u64::try_from(self.consensus_adapter.max_pending_transactions()).unwrap();
541 let available =
542 max_pending.saturating_sub(self.consensus_adapter.num_inflight_transactions());
543 let entries = self.queue.pop_batch(usize::try_from(available).unwrap());
544 if entries.is_empty() {
545 return;
546 }
547 for entry in entries {
548 self.queue
549 .metrics
550 .queue_wait_latency
551 .observe(entry.enqueue_time.elapsed().as_secs_f64());
552 let adapter = self.consensus_adapter.clone();
553 let es = self.epoch_store.clone();
554 spawn_monitored_task!(submit_queue_entry(entry, adapter, es));
555 }
556 *self.last_drain.lock().unwrap() = Instant::now();
557 }
558
559 fn handle_insert(&mut self, cmd: InsertCommand) {
560 let _ = cmd.response.send(self.queue.insert(cmd.entry));
561 }
562}
563
564async fn submit_queue_entry(
565 entry: QueueEntry,
566 consensus_adapter: Arc<ConsensusAdapter>,
567 epoch_store: Arc<AuthorityPerEpochStore>,
568) {
569 let _ = entry.position_sender.send(
570 consensus_adapter
571 .submit_and_get_positions(
572 entry.transactions,
573 &epoch_store,
574 entry.submitter_client_addr,
575 )
576 .await
577 .map_err(tonic::Status::from),
578 );
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584
585 fn make_test_entry(
586 gas_price: u64,
587 ) -> (
588 QueueEntry,
589 oneshot::Receiver<Result<Vec<ConsensusPosition>, tonic::Status>>,
590 ) {
591 let (tx, rx) = oneshot::channel();
592 (QueueEntry::new_for_test(gas_price, tx), rx)
593 }
594
595 fn build_queue(capacity: usize, gas_prices: &[u64]) -> PriorityAdmissionQueue {
596 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
597 let mut q = PriorityAdmissionQueue::new(capacity, metrics);
598 for &gp in gas_prices {
599 let (entry, _) = make_test_entry(gp);
600 q.insert(entry).unwrap();
601 }
602 q
603 }
604
605 #[test]
606 fn test_insert_within_capacity() {
607 let q = build_queue(3, &[100, 200, 50]);
608 assert_eq!(q.len(), 3);
609 }
610
611 #[test]
612 fn test_eviction_when_full() {
613 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
614 let mut q = PriorityAdmissionQueue::new(2, metrics);
615
616 let (e1, mut r1) = make_test_entry(100);
617 let (e2, _) = make_test_entry(200);
618 let (e3, _) = make_test_entry(300);
619
620 q.insert(e1).unwrap();
621 q.insert(e2).unwrap();
622 assert_eq!(q.len(), 2);
623
624 assert!(q.insert(e3).is_ok());
625 assert_eq!(q.len(), 2);
626 let r1_result = r1.try_recv().expect("evicted entry must be signalled");
628 assert!(matches!(r1_result, Err(ref status) if status.message().contains("outbid")));
629 }
630
631 #[test]
632 fn test_rejection_when_full_and_low_price() {
633 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
634 let mut q = PriorityAdmissionQueue::new(2, metrics);
635
636 let (e1, _) = make_test_entry(100);
637 let (e2, _) = make_test_entry(200);
638 let (e3, mut r3) = make_test_entry(50);
639
640 q.insert(e1).unwrap();
641 q.insert(e2).unwrap();
642
643 assert!(matches!(
644 q.insert(e3).unwrap_err().as_inner(),
645 SuiErrorKind::TransactionRejectedDueToOutbiddingDuringCongestion { min_gas_price: 100 }
646 ));
647 assert_eq!(q.len(), 2);
648 assert!(r3.try_recv().is_err());
649 }
650
651 #[test]
652 fn test_pop_batch() {
653 let mut q = build_queue(5, &[100, 300, 200]);
654 let batch = q.pop_batch(2);
655 assert_eq!(batch.len(), 2);
656 assert_eq!(q.len(), 1);
657 }
658
659 #[test]
660 fn test_min_gas_price() {
661 let q = build_queue(5, &[200, 100, 300]);
662 assert_eq!(q.min_gas_price(), Some(100));
663 }
664
665 #[test]
666 fn test_gasless_tx_evicted_first() {
667 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
668 let mut q = PriorityAdmissionQueue::new(2, metrics);
669
670 let (gasless, mut r_gasless) = make_test_entry(0);
671 let (normal, _) = make_test_entry(1000);
672 let (high, _) = make_test_entry(2000);
673
674 q.insert(gasless).unwrap();
675 q.insert(normal).unwrap();
676
677 assert!(q.insert(high).is_ok());
678 let gasless_result = r_gasless
679 .try_recv()
680 .expect("evicted gasless entry must be signalled");
681 assert!(matches!(gasless_result, Err(ref status) if status.message().contains("outbid")));
682 assert_eq!(q.min_gas_price(), Some(1000));
683 }
684
685 #[test]
686 fn test_pop_batch_returns_highest_gas_price_first() {
687 let mut q = build_queue(5, &[100, 500, 200, 400, 300]);
688 let batch = q.pop_batch(5);
689 let gas_prices: Vec<u64> = batch.iter().map(|e| e.gas_price).collect();
690 assert_eq!(gas_prices, vec![500, 400, 300, 200, 100]);
691 }
692
693 #[test]
694 fn test_equal_gas_price_rejected_when_full() {
695 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
696 let mut q = PriorityAdmissionQueue::new(1, metrics);
697
698 let (e1, _) = make_test_entry(100);
699 let (e2, _) = make_test_entry(100);
700
701 q.insert(e1).unwrap();
702 assert!(matches!(
703 q.insert(e2).unwrap_err().as_inner(),
704 SuiErrorKind::TransactionRejectedDueToOutbiddingDuringCongestion { min_gas_price: 100 }
705 ));
706 }
707
708 fn make_dup_entry(
709 gas_price: u64,
710 tx: ConsensusTransaction,
711 ) -> (
712 QueueEntry,
713 oneshot::Receiver<Result<Vec<ConsensusPosition>, tonic::Status>>,
714 ) {
715 let (position_tx, position_rx) = oneshot::channel();
716 let entry = QueueEntry {
717 gas_price,
718 transactions: vec![tx],
719 position_sender: position_tx,
720 submitter_client_addr: None,
721 enqueue_time: Instant::now(),
722 };
723 (entry, position_rx)
724 }
725
726 #[test]
727 fn test_duplicate_transaction_admitted_and_flagged() {
728 use sui_types::base_types::AuthorityName;
729
730 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
731 let mut q = PriorityAdmissionQueue::new(10, metrics);
732
733 let tx = ConsensusTransaction::new_end_of_publish(AuthorityName::ZERO);
734
735 let (entry1, _rx1) = make_dup_entry(100, tx.clone());
736 assert!(q.insert(entry1).unwrap());
737 assert_eq!(q.len(), 1);
738
739 let (entry2, _rx2) = make_dup_entry(100, tx.clone());
742 assert!(!q.insert(entry2).unwrap());
743 assert_eq!(q.len(), 2);
744 }
745
746 #[test]
747 fn test_duplicate_key_counter_decrements_on_pop() {
748 use sui_types::base_types::AuthorityName;
749
750 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
751 let mut q = PriorityAdmissionQueue::new(10, metrics);
752
753 let tx = ConsensusTransaction::new_end_of_publish(AuthorityName::ZERO);
754
755 let (entry1, _rx1) = make_dup_entry(100, tx.clone());
757 q.insert(entry1).unwrap();
758 let (entry2, _rx2) = make_dup_entry(100, tx.clone());
759 assert!(!q.insert(entry2).unwrap());
760
761 let batch = q.pop_batch(1);
764 assert_eq!(batch.len(), 1);
765 let (entry3, _rx3) = make_dup_entry(100, tx.clone());
766 assert!(!q.insert(entry3).unwrap());
767 assert_eq!(q.len(), 2);
768
769 let _ = q.pop_batch(q.len());
772 assert!(q.is_empty());
773 let (entry4, _rx4) = make_dup_entry(100, tx);
774 assert!(q.insert(entry4).unwrap());
775 }
776
777 #[test]
778 fn test_duplicate_key_counter_decrements_on_evict() {
779 use sui_types::base_types::AuthorityName;
780
781 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
782 let mut q = PriorityAdmissionQueue::new(2, metrics);
783
784 let tx = ConsensusTransaction::new_end_of_publish(AuthorityName::ZERO);
785
786 let (entry1, _rx1) = make_dup_entry(100, tx.clone());
788 q.insert(entry1).unwrap();
789 let (entry2, _rx2) = make_dup_entry(100, tx.clone());
790 q.insert(entry2).unwrap();
791 assert_eq!(q.len(), 2);
792
793 let (filler, _) = make_test_entry(200);
795 q.insert(filler).unwrap();
796 assert_eq!(q.len(), 2);
797
798 let (filler2, _) = make_test_entry(300);
802 q.insert(filler2).unwrap();
803
804 let (entry3, _rx3) = make_dup_entry(500, tx);
805 assert!(q.insert(entry3).unwrap());
806 }
807
808 #[tokio::test]
809 async fn test_actor_shuts_down_when_handle_dropped() {
810 use crate::authority::test_authority_builder::TestAuthorityBuilder;
811 use crate::checkpoints::CheckpointStore;
812 use crate::consensus_adapter::ConsensusAdapterMetrics;
813 use crate::mysticeti_adapter::LazyMysticetiClient;
814 use sui_types::base_types::AuthorityName;
815
816 let state = TestAuthorityBuilder::new().build().await;
817 let epoch_store = state.epoch_store_for_testing().clone();
818 let consensus_adapter = Arc::new(ConsensusAdapter::new(
819 Arc::new(LazyMysticetiClient::new()),
820 CheckpointStore::new_for_tests(),
821 AuthorityName::ZERO,
822 100_000,
823 100_000,
824 ConsensusAdapterMetrics::new_test(),
825 Arc::new(tokio::sync::Notify::new()),
826 ));
827
828 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
829 let (sender, receiver) = mpsc::channel(100);
830 let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
831
832 let event_loop = AdmissionQueueEventLoop {
833 receiver,
834 queue: PriorityAdmissionQueue::new(100, metrics),
835 consensus_adapter,
836 slot_freed_notify,
837 epoch_store,
838 last_drain: Arc::new(Mutex::new(Instant::now())),
839 queue_depth: Arc::new(AtomicUsize::new(0)),
840 last_published_depth: 0,
841 };
842
843 let handle = tokio::spawn(event_loop.run());
844
845 drop(sender);
847
848 tokio::time::timeout(std::time::Duration::from_secs(5), handle)
850 .await
851 .expect("actor did not shut down within timeout")
852 .expect("actor task panicked");
853 }
854
855 async fn build_consensus_adapter(
856 max_pending_transactions: usize,
857 ) -> (
858 Arc<ConsensusAdapter>,
859 Arc<AuthorityPerEpochStore>,
860 Arc<tokio::sync::Notify>,
861 ) {
862 use crate::authority::test_authority_builder::TestAuthorityBuilder;
863 use crate::checkpoints::CheckpointStore;
864 use crate::consensus_adapter::ConsensusAdapterMetrics;
865 use crate::mysticeti_adapter::LazyMysticetiClient;
866 use sui_types::base_types::AuthorityName;
867
868 let state = TestAuthorityBuilder::new().build().await;
869 let epoch_store = state.epoch_store_for_testing().clone();
870 let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
871 let adapter = Arc::new(ConsensusAdapter::new(
872 Arc::new(LazyMysticetiClient::new()),
873 CheckpointStore::new_for_tests(),
874 AuthorityName::ZERO,
875 max_pending_transactions,
876 100_000,
877 ConsensusAdapterMetrics::new_test(),
878 slot_freed_notify.clone(),
879 ));
880 (adapter, epoch_store, slot_freed_notify)
881 }
882
883 #[tokio::test]
884 async fn test_failover_tripped_when_actor_stalls() {
885 let handle = AdmissionQueueHandle {
888 sender: mpsc::channel(1).0,
889 last_drain: Arc::new(Mutex::new(Instant::now())),
890 queue_depth: Arc::new(AtomicUsize::new(1)),
891 failover_timeout: Duration::from_millis(10),
892 };
893 assert!(!handle.failover_tripped());
894 tokio::time::sleep(Duration::from_millis(30)).await;
895 assert!(handle.failover_tripped());
896
897 handle.queue_depth.store(0, Ordering::Relaxed);
899 assert!(!handle.failover_tripped());
900 }
901
902 #[tokio::test]
903 async fn test_idle_actor_does_not_trip_failover() {
904 let (adapter, epoch_store, notify) = build_consensus_adapter(100_000).await;
907 let manager = AdmissionQueueManager::new(
908 adapter,
909 Arc::new(AdmissionQueueMetrics::new_for_tests()),
910 0.5,
911 0.9,
912 Duration::from_millis(10),
913 notify,
914 );
915 let handle = manager.spawn(epoch_store);
916 tokio::time::sleep(Duration::from_millis(50)).await;
917 assert!(
918 !handle.failover_tripped(),
919 "idle actor with empty queue must not trip failover"
920 );
921 }
922
923 #[tokio::test]
929 async fn test_drain_batch_does_not_bump_last_drain_when_no_slots() {
930 let (adapter, epoch_store, notify) = build_consensus_adapter(0).await;
931 let metrics = Arc::new(AdmissionQueueMetrics::new_for_tests());
932 let (_sender, receiver) = mpsc::channel(10);
933
934 let mut queue = PriorityAdmissionQueue::new(10, metrics.clone());
935 let (entry, _rx) = make_test_entry(100);
936 assert!(queue.insert(entry).is_ok());
937 assert_eq!(queue.len(), 1);
938
939 let last_drain = Arc::new(Mutex::new(Instant::now()));
940 let before = *last_drain.lock().unwrap();
941
942 let mut event_loop = AdmissionQueueEventLoop {
943 receiver,
944 queue,
945 consensus_adapter: adapter,
946 slot_freed_notify: notify,
947 epoch_store,
948 last_drain: last_drain.clone(),
949 queue_depth: Arc::new(AtomicUsize::new(0)),
950 last_published_depth: 0,
951 };
952
953 tokio::time::sleep(Duration::from_millis(20)).await;
956
957 event_loop.drain_batch();
958
959 assert_eq!(event_loop.queue.len(), 1, "no entries should be drained");
960 assert_eq!(
961 *last_drain.lock().unwrap(),
962 before,
963 "last_drain must not advance when drain_batch drained nothing"
964 );
965 }
966}