sui_authority_aggregation/
lib.rs1use futures::Future;
5use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
6use mysten_metrics::monitored_future;
7
8use std::collections::{BTreeMap, BTreeSet};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use sui_types::base_types::ConciseableName;
12use sui_types::committee::{CommitteeTrait, StakeUnit};
13
14use tokio::time::timeout;
15
16pub type AsyncResult<'a, T, E> = BoxFuture<'a, Result<T, E>>;
17
18pub struct SigRequestPrefs<K> {
19 pub ordering_pref: BTreeSet<K>,
20 pub prefetch_timeout: Duration,
21}
22
23pub enum ReduceOutput<R, S> {
24 Continue(S),
25 Failed(S),
26 Success(R),
27}
28
29pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
38 'a,
39 C,
40 K,
41 Client: 'a,
42 S,
43 V,
44 R,
45 E,
46 FMap,
47 FReduce,
48>(
49 committee: Arc<C>,
50 authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
51 authority_preferences: Option<SigRequestPrefs<K>>,
52 initial_state: S,
53 map_each_authority: FMap,
54 reduce_result: FReduce,
55 total_timeout: Duration,
56) -> Result<
57 (
58 R,
59 FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
60 ),
61 S,
62>
63where
64 K: Ord + ConciseableName<'a> + Clone + 'a,
65 C: CommitteeTrait<K>,
66 FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
67 FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>>,
68{
69 let (preference, prefetch_timeout) = if let Some(SigRequestPrefs {
70 ordering_pref,
71 prefetch_timeout,
72 }) = authority_preferences
73 {
74 (Some(ordering_pref), Some(prefetch_timeout))
75 } else {
76 (None, None)
77 };
78 let authorities_shuffled = committee.shuffle_by_stake(preference.as_ref(), None);
79 let mut accumulated_state = initial_state;
80 let start_time = Instant::now();
81
82 let mut responses: futures::stream::FuturesUnordered<_> = authorities_shuffled
84 .clone()
85 .into_iter()
86 .map(|name| {
87 let client = authority_clients[&name].clone();
88 let execute = map_each_authority.clone();
89 monitored_future!(async move { (name.clone(), execute(name, client).await,) })
90 })
91 .collect();
92 if let Some(prefetch_timeout) = prefetch_timeout {
93 let prefetch_sleep = tokio::time::sleep(prefetch_timeout);
94 let mut authority_to_result: BTreeMap<K, Result<V, E>> = BTreeMap::new();
95 tokio::pin!(prefetch_sleep);
96 loop {
98 tokio::select! {
99 resp = responses.next() => {
100 match resp {
101 Some((authority_name, result)) => {
102 authority_to_result.insert(authority_name, result);
103 }
104 None => {
105 break;
107 }
108 }
109 }
110 _ = &mut prefetch_sleep => {
111 break;
112 }
113 }
114 }
115 for authority_name in authorities_shuffled {
117 let authority_weight = committee.weight(&authority_name);
118 if let Some(result) = authority_to_result.remove(&authority_name) {
119 accumulated_state = match reduce_result(
120 accumulated_state,
121 authority_name,
122 authority_weight,
123 result,
124 )
125 .await
126 {
127 ReduceOutput::Continue(state) => state,
129 ReduceOutput::Failed(state) => {
130 return Err(state);
131 }
132 ReduceOutput::Success(result) => {
133 return Ok((result, responses));
135 }
136 };
137 }
138 }
139 }
142
143 while let Ok(Some((authority_name, result))) = timeout(
145 total_timeout.saturating_sub(start_time.elapsed()),
146 responses.next(),
147 )
148 .await
149 {
150 let authority_weight = committee.weight(&authority_name);
151 accumulated_state =
152 match reduce_result(accumulated_state, authority_name, authority_weight, result).await {
153 ReduceOutput::Continue(state) => state,
155 ReduceOutput::Failed(state) => {
156 return Err(state);
157 }
158 ReduceOutput::Success(result) => {
159 return Ok((result, responses));
161 }
162 }
163 }
164 Err(accumulated_state)
167}
168
169pub async fn quorum_map_then_reduce_with_timeout<
184 'a,
185 C,
186 K,
187 Client: 'a,
188 S: 'a,
189 V: 'a,
190 R: 'a,
191 E,
192 FMap,
193 FReduce,
194>(
195 committee: Arc<C>,
196 authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
197 initial_state: S,
199 map_each_authority: FMap,
202 reduce_result: FReduce,
205 initial_timeout: Duration,
207) -> Result<
208 (
209 R,
210 FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
211 ),
212 S,
213>
214where
215 K: Ord + ConciseableName<'a> + Clone + 'a,
216 C: CommitteeTrait<K>,
217 FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
218 FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>> + 'a,
219{
220 quorum_map_then_reduce_with_timeout_and_prefs(
221 committee,
222 authority_clients,
223 None,
224 initial_state,
225 map_each_authority,
226 reduce_result,
227 initial_timeout,
228 )
229 .await
230}