sui_authority_aggregation/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::Future;
5use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
6use mysten_metrics::monitored_future;
7
8use std::collections::{BTreeMap, BTreeSet};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use sui_types::base_types::ConciseableName;
12use sui_types::committee::{CommitteeTrait, StakeUnit};
13
14use tokio::time::timeout;
15
16pub type AsyncResult<'a, T, E> = BoxFuture<'a, Result<T, E>>;
17
18pub struct SigRequestPrefs<K> {
19    pub ordering_pref: BTreeSet<K>,
20    pub prefetch_timeout: Duration,
21}
22
23pub enum ReduceOutput<R, S> {
24    Continue(S),
25    Failed(S),
26    Success(R),
27}
28
29/// This function takes an initial state, than executes an asynchronous function (FMap) for each
30/// authority, and folds the results as they become available into the state using an async function (FReduce).
31///
32/// prefetch_timeout: the minimum amount of time to spend trying to gather results from all authorities
33/// before falling back to arrival order.
34///
35/// total_timeout: the maximum amount of total time to wait for results from all authorities, including
36/// time spent prefetching.
37pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
38    'a,
39    C,
40    K,
41    Client: 'a,
42    S,
43    V,
44    R,
45    E,
46    FMap,
47    FReduce,
48>(
49    committee: Arc<C>,
50    authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
51    authority_preferences: Option<SigRequestPrefs<K>>,
52    initial_state: S,
53    map_each_authority: FMap,
54    reduce_result: FReduce,
55    total_timeout: Duration,
56) -> Result<
57    (
58        R,
59        FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
60    ),
61    S,
62>
63where
64    K: Ord + ConciseableName<'a> + Clone + 'a,
65    C: CommitteeTrait<K>,
66    FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
67    FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>>,
68{
69    let (preference, prefetch_timeout) = if let Some(SigRequestPrefs {
70        ordering_pref,
71        prefetch_timeout,
72    }) = authority_preferences
73    {
74        (Some(ordering_pref), Some(prefetch_timeout))
75    } else {
76        (None, None)
77    };
78    let authorities_shuffled = committee.shuffle_by_stake(preference.as_ref(), None);
79    let mut accumulated_state = initial_state;
80    let start_time = Instant::now();
81
82    // First, execute in parallel for each authority FMap.
83    let mut responses: futures::stream::FuturesUnordered<_> = authorities_shuffled
84        .clone()
85        .into_iter()
86        .map(|name| {
87            let client = authority_clients[&name].clone();
88            let execute = map_each_authority.clone();
89            monitored_future!(async move { (name.clone(), execute(name, client).await,) })
90        })
91        .collect();
92    if let Some(prefetch_timeout) = prefetch_timeout {
93        let prefetch_sleep = tokio::time::sleep(prefetch_timeout);
94        let mut authority_to_result: BTreeMap<K, Result<V, E>> = BTreeMap::new();
95        tokio::pin!(prefetch_sleep);
96        // get all the sigs we can within prefetch_timeout
97        loop {
98            tokio::select! {
99                resp = responses.next() => {
100                    match resp {
101                        Some((authority_name, result)) => {
102                            authority_to_result.insert(authority_name, result);
103                        }
104                        None => {
105                            // we have processed responses from the full committee so can stop early
106                            break;
107                        }
108                    }
109                }
110                _ = &mut prefetch_sleep => {
111                    break;
112                }
113            }
114        }
115        // process what we have up to this point
116        for authority_name in authorities_shuffled {
117            let authority_weight = committee.weight(&authority_name);
118            if let Some(result) = authority_to_result.remove(&authority_name) {
119                accumulated_state = match reduce_result(
120                    accumulated_state,
121                    authority_name,
122                    authority_weight,
123                    result,
124                )
125                .await
126                {
127                    // In the first two cases we are told to continue the iteration.
128                    ReduceOutput::Continue(state) => state,
129                    ReduceOutput::Failed(state) => {
130                        return Err(state);
131                    }
132                    ReduceOutput::Success(result) => {
133                        // The reducer tells us that we have the result needed. Just return it.
134                        return Ok((result, responses));
135                    }
136                };
137            }
138        }
139        // if we got here, fallback through the if statement to continue in arrival order on
140        // the remaining validators
141    }
142
143    // As results become available fold them into the state using FReduce.
144    while let Ok(Some((authority_name, result))) = timeout(
145        total_timeout.saturating_sub(start_time.elapsed()),
146        responses.next(),
147    )
148    .await
149    {
150        let authority_weight = committee.weight(&authority_name);
151        accumulated_state =
152            match reduce_result(accumulated_state, authority_name, authority_weight, result).await {
153                // In the first two cases we are told to continue the iteration.
154                ReduceOutput::Continue(state) => state,
155                ReduceOutput::Failed(state) => {
156                    return Err(state);
157                }
158                ReduceOutput::Success(result) => {
159                    // The reducer tells us that we have the result needed. Just return it.
160                    return Ok((result, responses));
161                }
162            }
163    }
164    // If we have exhausted all authorities and still have not returned a result, return
165    // error with the accumulated state.
166    Err(accumulated_state)
167}
168
169/// This function takes an initial state, than executes an asynchronous function (FMap) for each
170/// authority, and folds the results as they become available into the state using an async function (FReduce).
171///
172/// FMap can do io, and returns a result V. An error there may not be fatal, and could be consumed by the
173/// MReduce function to overall recover from it. This is necessary to ensure byzantine authorities cannot
174/// interrupt the logic of this function.
175///
176/// FReduce returns a result to a ReduceOutput. If the result is Err the function
177/// shortcuts and the Err is returned. An Ok ReduceOutput result can be used to shortcut and return
178/// the resulting state (ReduceOutput::End), continue the folding as new states arrive (ReduceOutput::Continue).
179///
180/// This function provides a flexible way to communicate with a quorum of authorities, processing and
181/// processing their results into a safe overall result, and also safely allowing operations to continue
182/// past the quorum to ensure all authorities are up to date (up to a timeout).
183pub async fn quorum_map_then_reduce_with_timeout<
184    'a,
185    C,
186    K,
187    Client: 'a,
188    S: 'a,
189    V: 'a,
190    R: 'a,
191    E,
192    FMap,
193    FReduce,
194>(
195    committee: Arc<C>,
196    authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
197    // The initial state that will be used to fold in values from authorities.
198    initial_state: S,
199    // The async function used to apply to each authority. It takes an authority name,
200    // and authority client parameter and returns a Result<V>.
201    map_each_authority: FMap,
202    // The async function that takes an accumulated state, and a new result for V from an
203    // authority and returns a result to a ReduceOutput state.
204    reduce_result: FReduce,
205    // The initial timeout applied to all
206    initial_timeout: Duration,
207) -> Result<
208    (
209        R,
210        FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
211    ),
212    S,
213>
214where
215    K: Ord + ConciseableName<'a> + Clone + 'a,
216    C: CommitteeTrait<K>,
217    FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
218    FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>> + 'a,
219{
220    quorum_map_then_reduce_with_timeout_and_prefs(
221        committee,
222        authority_clients,
223        None,
224        initial_state,
225        map_each_authority,
226        reduce_result,
227        initial_timeout,
228    )
229    .await
230}