sui_authority_aggregation/
lib.rs1use futures::Future;
5use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
6use mysten_metrics::monitored_future;
7
8use std::collections::{BTreeMap, BTreeSet};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use sui_types::base_types::ConciseableName;
12use sui_types::committee::{CommitteeTrait, StakeUnit};
13
14use tokio::time::timeout;
15
16pub type AsyncResult<'a, T, E> = BoxFuture<'a, Result<T, E>>;
17
18pub struct SigRequestPrefs<K> {
19 pub ordering_pref: BTreeSet<K>,
20 pub prefetch_timeout: Duration,
21}
22
23pub enum ReduceOutput<R, S> {
24 Continue(S),
25 Failed(S),
26 Success(R),
27}
28
29pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
38 'a,
39 C,
40 K,
41 Client: 'a,
42 S,
43 V,
44 R,
45 E,
46 FMap,
47 FReduce,
48>(
49 committee: Arc<C>,
50 authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
51 authority_preferences: Option<SigRequestPrefs<K>>,
52 initial_state: S,
53 map_each_authority: FMap,
54 reduce_result: FReduce,
55 total_timeout: Duration,
56) -> Result<
57 (
58 R,
59 FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
60 ),
61 S,
62>
63where
64 K: Ord + ConciseableName<'a> + Clone + 'a,
65 C: CommitteeTrait<K>,
66 FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
67 FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>>,
68{
69 let (preference, prefetch_timeout) = if let Some(SigRequestPrefs {
70 ordering_pref,
71 prefetch_timeout,
72 }) = authority_preferences
73 {
74 (Some(ordering_pref), Some(prefetch_timeout))
75 } else {
76 (None, None)
77 };
78 let authorities_shuffled = committee.shuffle_by_stake(preference.as_ref(), None);
79 let mut accumulated_state = initial_state;
80 let mut total_timeout = total_timeout;
81
82 let mut responses: futures::stream::FuturesUnordered<_> = authorities_shuffled
84 .clone()
85 .into_iter()
86 .map(|name| {
87 let client = authority_clients[&name].clone();
88 let execute = map_each_authority.clone();
89 monitored_future!(async move { (name.clone(), execute(name, client).await,) })
90 })
91 .collect();
92 if let Some(prefetch_timeout) = prefetch_timeout {
93 let elapsed = Instant::now();
94 let prefetch_sleep = tokio::time::sleep(prefetch_timeout);
95 let mut authority_to_result: BTreeMap<K, Result<V, E>> = BTreeMap::new();
96 tokio::pin!(prefetch_sleep);
97 loop {
99 tokio::select! {
100 resp = responses.next() => {
101 match resp {
102 Some((authority_name, result)) => {
103 authority_to_result.insert(authority_name, result);
104 }
105 None => {
106 break;
108 }
109 }
110 }
111 _ = &mut prefetch_sleep => {
112 break;
113 }
114 }
115 }
116 for authority_name in authorities_shuffled {
118 let authority_weight = committee.weight(&authority_name);
119 if let Some(result) = authority_to_result.remove(&authority_name) {
120 accumulated_state = match reduce_result(
121 accumulated_state,
122 authority_name,
123 authority_weight,
124 result,
125 )
126 .await
127 {
128 ReduceOutput::Continue(state) => state,
130 ReduceOutput::Failed(state) => {
131 return Err(state);
132 }
133 ReduceOutput::Success(result) => {
134 return Ok((result, responses));
136 }
137 };
138 }
139 }
140 total_timeout = total_timeout.saturating_sub(elapsed.elapsed());
143 }
144
145 while let Ok(Some((authority_name, result))) = timeout(total_timeout, responses.next()).await {
147 let authority_weight = committee.weight(&authority_name);
148 accumulated_state =
149 match reduce_result(accumulated_state, authority_name, authority_weight, result).await {
150 ReduceOutput::Continue(state) => state,
152 ReduceOutput::Failed(state) => {
153 return Err(state);
154 }
155 ReduceOutput::Success(result) => {
156 return Ok((result, responses));
158 }
159 }
160 }
161 Err(accumulated_state)
164}
165
166pub async fn quorum_map_then_reduce_with_timeout<
181 'a,
182 C,
183 K,
184 Client: 'a,
185 S: 'a,
186 V: 'a,
187 R: 'a,
188 E,
189 FMap,
190 FReduce,
191>(
192 committee: Arc<C>,
193 authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
194 initial_state: S,
196 map_each_authority: FMap,
199 reduce_result: FReduce,
202 initial_timeout: Duration,
204) -> Result<
205 (
206 R,
207 FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
208 ),
209 S,
210>
211where
212 K: Ord + ConciseableName<'a> + Clone + 'a,
213 C: CommitteeTrait<K>,
214 FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
215 FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>> + 'a,
216{
217 quorum_map_then_reduce_with_timeout_and_prefs(
218 committee,
219 authority_clients,
220 None,
221 initial_state,
222 map_each_authority,
223 reduce_result,
224 initial_timeout,
225 )
226 .await
227}