sui_rpc/client/
lists.rs

1use super::Client;
2use super::Result;
3use crate::proto::sui::rpc::v2::Balance;
4use crate::proto::sui::rpc::v2::DynamicField;
5use crate::proto::sui::rpc::v2::ListBalancesRequest;
6use crate::proto::sui::rpc::v2::ListDynamicFieldsRequest;
7use crate::proto::sui::rpc::v2::ListOwnedObjectsRequest;
8use crate::proto::sui::rpc::v2::ListPackageVersionsRequest;
9use crate::proto::sui::rpc::v2::Object;
10use crate::proto::sui::rpc::v2::PackageVersion;
11use futures::stream;
12use futures::stream::Stream;
13
14impl Client {
15    /// Creates a stream of objects based on the provided request.
16    ///
17    /// The stream handles pagination automatically by using the page_token from responses
18    /// to fetch subsequent pages. The original request's page_token is used as the starting point.
19    ///
20    /// # Arguments
21    /// * `request` - The initial `ListOwnedObjectsRequest` with search criteria
22    ///
23    /// # Returns
24    /// A stream that yields `Result<Object>` instances. If any RPC call fails, the
25    /// tonic::Status from that request is returned.
26    pub fn list_owned_objects(
27        &self,
28        request: impl tonic::IntoRequest<ListOwnedObjectsRequest>,
29    ) -> impl Stream<Item = Result<Object>> + 'static {
30        let client = self.clone();
31        let request = request.into_request();
32
33        stream::unfold(
34            (
35                Vec::new().into_iter(), // current batch of objects
36                true,                   // has_next_page
37                request,                // request (page_token will be updated as we paginate)
38                client,                 // client for making requests
39            ),
40            move |(mut iter, has_next_page, mut request, mut client)| async move {
41                if let Some(item) = iter.next() {
42                    return Some((Ok(item), (iter, has_next_page, request, client)));
43                }
44
45                if has_next_page {
46                    let new_request = tonic::Request::from_parts(
47                        request.metadata().clone(),
48                        request.extensions().clone(),
49                        request.get_ref().clone(),
50                    );
51
52                    match client.state_client().list_owned_objects(new_request).await {
53                        Ok(response) => {
54                            let response = response.into_inner();
55                            let mut iter = response.objects.into_iter();
56
57                            let has_next_page = response.next_page_token.is_some();
58                            request.get_mut().page_token = response.next_page_token;
59
60                            iter.next()
61                                .map(|item| (Ok(item), (iter, has_next_page, request, client)))
62                        }
63                        Err(e) => {
64                            // Return error and terminate stream
65                            request.get_mut().page_token = None;
66                            Some((Err(e), (Vec::new().into_iter(), false, request, client)))
67                        }
68                    }
69                } else {
70                    None
71                }
72            },
73        )
74    }
75
76    /// Creates a stream of `DynamicField`s based on the provided request.
77    ///
78    /// The stream handles pagination automatically by using the page_token from responses
79    /// to fetch subsequent pages. The original request's page_token is used as the starting point.
80    ///
81    /// # Arguments
82    /// * `request` - The initial `ListDynamicFieldsRequest` with search criteria
83    ///
84    /// # Returns
85    /// A stream that yields `Result<DynamicField>` instances. If any RPC call fails, the
86    /// tonic::Status from that request is returned.
87    pub fn list_dynamic_fields(
88        &self,
89        request: impl tonic::IntoRequest<ListDynamicFieldsRequest>,
90    ) -> impl Stream<Item = Result<DynamicField>> + 'static {
91        let client = self.clone();
92        let request = request.into_request();
93
94        stream::unfold(
95            (
96                Vec::new().into_iter(), // current batch of objects
97                true,                   // has_next_page
98                request,                // request (page_token will be updated as we paginate)
99                client,                 // client for making requests
100            ),
101            move |(mut iter, has_next_page, mut request, mut client)| async move {
102                if let Some(item) = iter.next() {
103                    return Some((Ok(item), (iter, has_next_page, request, client)));
104                }
105
106                if has_next_page {
107                    let new_request = tonic::Request::from_parts(
108                        request.metadata().clone(),
109                        request.extensions().clone(),
110                        request.get_ref().clone(),
111                    );
112
113                    match client.state_client().list_dynamic_fields(new_request).await {
114                        Ok(response) => {
115                            let response = response.into_inner();
116                            let mut iter = response.dynamic_fields.into_iter();
117
118                            let has_next_page = response.next_page_token.is_some();
119                            request.get_mut().page_token = response.next_page_token;
120
121                            iter.next()
122                                .map(|item| (Ok(item), (iter, has_next_page, request, client)))
123                        }
124                        Err(e) => {
125                            // Return error and terminate stream
126                            request.get_mut().page_token = None;
127                            Some((Err(e), (Vec::new().into_iter(), false, request, client)))
128                        }
129                    }
130                } else {
131                    None
132                }
133            },
134        )
135    }
136
137    /// Creates a stream of `Balance`s based on the provided request.
138    ///
139    /// The stream handles pagination automatically by using the page_token from responses
140    /// to fetch subsequent pages. The original request's page_token is used as the starting point.
141    ///
142    /// # Arguments
143    /// * `request` - The initial `ListBalancesRequest` with search criteria
144    ///
145    /// # Returns
146    /// A stream that yields `Result<Balance>` instances. If any RPC call fails, the
147    /// tonic::Status from that request is returned.
148    pub fn list_balances(
149        &self,
150        request: impl tonic::IntoRequest<ListBalancesRequest>,
151    ) -> impl Stream<Item = Result<Balance>> + 'static {
152        let client = self.clone();
153        let request = request.into_request();
154
155        stream::unfold(
156            (
157                Vec::new().into_iter(), // current batch of objects
158                true,                   // has_next_page
159                request,                // request (page_token will be updated as we paginate)
160                client,                 // client for making requests
161            ),
162            move |(mut iter, has_next_page, mut request, mut client)| async move {
163                if let Some(item) = iter.next() {
164                    return Some((Ok(item), (iter, has_next_page, request, client)));
165                }
166
167                if has_next_page {
168                    let new_request = tonic::Request::from_parts(
169                        request.metadata().clone(),
170                        request.extensions().clone(),
171                        request.get_ref().clone(),
172                    );
173
174                    match client.state_client().list_balances(new_request).await {
175                        Ok(response) => {
176                            let response = response.into_inner();
177                            let mut iter = response.balances.into_iter();
178
179                            let has_next_page = response.next_page_token.is_some();
180                            request.get_mut().page_token = response.next_page_token;
181
182                            iter.next()
183                                .map(|item| (Ok(item), (iter, has_next_page, request, client)))
184                        }
185                        Err(e) => {
186                            // Return error and terminate stream
187                            request.get_mut().page_token = None;
188                            Some((Err(e), (Vec::new().into_iter(), false, request, client)))
189                        }
190                    }
191                } else {
192                    None
193                }
194            },
195        )
196    }
197
198    /// Creates a stream of `PackageVersion`s based on the provided request.
199    ///
200    /// The stream handles pagination automatically by using the page_token from responses
201    /// to fetch subsequent pages. The original request's page_token is used as the starting point.
202    ///
203    /// # Arguments
204    /// * `request` - The initial `ListPackageVersionsRequest` with search criteria
205    ///
206    /// # Returns
207    /// A stream that yields `Result<PackageVersion>` instances. If any RPC call fails, the
208    /// tonic::Status from that request is returned.
209    pub fn list_package_versions(
210        &self,
211        request: impl tonic::IntoRequest<ListPackageVersionsRequest>,
212    ) -> impl Stream<Item = Result<PackageVersion>> + 'static {
213        let client = self.clone();
214        let request = request.into_request();
215
216        stream::unfold(
217            (
218                Vec::new().into_iter(), // current batch of objects
219                true,                   // has_next_page
220                request,                // request (page_token will be updated as we paginate)
221                client,                 // client for making requests
222            ),
223            move |(mut iter, has_next_page, mut request, mut client)| async move {
224                if let Some(item) = iter.next() {
225                    return Some((Ok(item), (iter, has_next_page, request, client)));
226                }
227
228                if has_next_page {
229                    let new_request = tonic::Request::from_parts(
230                        request.metadata().clone(),
231                        request.extensions().clone(),
232                        request.get_ref().clone(),
233                    );
234
235                    match client
236                        .package_client()
237                        .list_package_versions(new_request)
238                        .await
239                    {
240                        Ok(response) => {
241                            let response = response.into_inner();
242                            let mut iter = response.versions.into_iter();
243
244                            let has_next_page = response.next_page_token.is_some();
245                            request.get_mut().page_token = response.next_page_token;
246
247                            iter.next()
248                                .map(|item| (Ok(item), (iter, has_next_page, request, client)))
249                        }
250                        Err(e) => {
251                            // Return error and terminate stream
252                            request.get_mut().page_token = None;
253                            Some((Err(e), (Vec::new().into_iter(), false, request, client)))
254                        }
255                    }
256                } else {
257                    None
258                }
259            },
260        )
261    }
262}