sui_rpc/client/lists.rs
1use super::Client;
2use super::Result;
3use crate::proto::sui::rpc::v2::Balance;
4use crate::proto::sui::rpc::v2::DynamicField;
5use crate::proto::sui::rpc::v2::ListBalancesRequest;
6use crate::proto::sui::rpc::v2::ListDynamicFieldsRequest;
7use crate::proto::sui::rpc::v2::ListOwnedObjectsRequest;
8use crate::proto::sui::rpc::v2::ListPackageVersionsRequest;
9use crate::proto::sui::rpc::v2::Object;
10use crate::proto::sui::rpc::v2::PackageVersion;
11use futures::stream;
12use futures::stream::Stream;
13
14impl Client {
15 /// Creates a stream of objects based on the provided request.
16 ///
17 /// The stream handles pagination automatically by using the page_token from responses
18 /// to fetch subsequent pages. The original request's page_token is used as the starting point.
19 ///
20 /// # Arguments
21 /// * `request` - The initial `ListOwnedObjectsRequest` with search criteria
22 ///
23 /// # Returns
24 /// A stream that yields `Result<Object>` instances. If any RPC call fails, the
25 /// tonic::Status from that request is returned.
26 pub fn list_owned_objects(
27 &self,
28 request: impl tonic::IntoRequest<ListOwnedObjectsRequest>,
29 ) -> impl Stream<Item = Result<Object>> + 'static {
30 let client = self.clone();
31 let request = request.into_request();
32
33 stream::unfold(
34 (
35 Vec::new().into_iter(), // current batch of objects
36 true, // has_next_page
37 request, // request (page_token will be updated as we paginate)
38 client, // client for making requests
39 ),
40 move |(mut iter, has_next_page, mut request, mut client)| async move {
41 if let Some(item) = iter.next() {
42 return Some((Ok(item), (iter, has_next_page, request, client)));
43 }
44
45 if has_next_page {
46 let new_request = tonic::Request::from_parts(
47 request.metadata().clone(),
48 request.extensions().clone(),
49 request.get_ref().clone(),
50 );
51
52 match client.state_client().list_owned_objects(new_request).await {
53 Ok(response) => {
54 let response = response.into_inner();
55 let mut iter = response.objects.into_iter();
56
57 let has_next_page = response.next_page_token.is_some();
58 request.get_mut().page_token = response.next_page_token;
59
60 iter.next()
61 .map(|item| (Ok(item), (iter, has_next_page, request, client)))
62 }
63 Err(e) => {
64 // Return error and terminate stream
65 request.get_mut().page_token = None;
66 Some((Err(e), (Vec::new().into_iter(), false, request, client)))
67 }
68 }
69 } else {
70 None
71 }
72 },
73 )
74 }
75
76 /// Creates a stream of `DynamicField`s based on the provided request.
77 ///
78 /// The stream handles pagination automatically by using the page_token from responses
79 /// to fetch subsequent pages. The original request's page_token is used as the starting point.
80 ///
81 /// # Arguments
82 /// * `request` - The initial `ListDynamicFieldsRequest` with search criteria
83 ///
84 /// # Returns
85 /// A stream that yields `Result<DynamicField>` instances. If any RPC call fails, the
86 /// tonic::Status from that request is returned.
87 pub fn list_dynamic_fields(
88 &self,
89 request: impl tonic::IntoRequest<ListDynamicFieldsRequest>,
90 ) -> impl Stream<Item = Result<DynamicField>> + 'static {
91 let client = self.clone();
92 let request = request.into_request();
93
94 stream::unfold(
95 (
96 Vec::new().into_iter(), // current batch of objects
97 true, // has_next_page
98 request, // request (page_token will be updated as we paginate)
99 client, // client for making requests
100 ),
101 move |(mut iter, has_next_page, mut request, mut client)| async move {
102 if let Some(item) = iter.next() {
103 return Some((Ok(item), (iter, has_next_page, request, client)));
104 }
105
106 if has_next_page {
107 let new_request = tonic::Request::from_parts(
108 request.metadata().clone(),
109 request.extensions().clone(),
110 request.get_ref().clone(),
111 );
112
113 match client.state_client().list_dynamic_fields(new_request).await {
114 Ok(response) => {
115 let response = response.into_inner();
116 let mut iter = response.dynamic_fields.into_iter();
117
118 let has_next_page = response.next_page_token.is_some();
119 request.get_mut().page_token = response.next_page_token;
120
121 iter.next()
122 .map(|item| (Ok(item), (iter, has_next_page, request, client)))
123 }
124 Err(e) => {
125 // Return error and terminate stream
126 request.get_mut().page_token = None;
127 Some((Err(e), (Vec::new().into_iter(), false, request, client)))
128 }
129 }
130 } else {
131 None
132 }
133 },
134 )
135 }
136
137 /// Creates a stream of `Balance`s based on the provided request.
138 ///
139 /// The stream handles pagination automatically by using the page_token from responses
140 /// to fetch subsequent pages. The original request's page_token is used as the starting point.
141 ///
142 /// # Arguments
143 /// * `request` - The initial `ListBalancesRequest` with search criteria
144 ///
145 /// # Returns
146 /// A stream that yields `Result<Balance>` instances. If any RPC call fails, the
147 /// tonic::Status from that request is returned.
148 pub fn list_balances(
149 &self,
150 request: impl tonic::IntoRequest<ListBalancesRequest>,
151 ) -> impl Stream<Item = Result<Balance>> + 'static {
152 let client = self.clone();
153 let request = request.into_request();
154
155 stream::unfold(
156 (
157 Vec::new().into_iter(), // current batch of objects
158 true, // has_next_page
159 request, // request (page_token will be updated as we paginate)
160 client, // client for making requests
161 ),
162 move |(mut iter, has_next_page, mut request, mut client)| async move {
163 if let Some(item) = iter.next() {
164 return Some((Ok(item), (iter, has_next_page, request, client)));
165 }
166
167 if has_next_page {
168 let new_request = tonic::Request::from_parts(
169 request.metadata().clone(),
170 request.extensions().clone(),
171 request.get_ref().clone(),
172 );
173
174 match client.state_client().list_balances(new_request).await {
175 Ok(response) => {
176 let response = response.into_inner();
177 let mut iter = response.balances.into_iter();
178
179 let has_next_page = response.next_page_token.is_some();
180 request.get_mut().page_token = response.next_page_token;
181
182 iter.next()
183 .map(|item| (Ok(item), (iter, has_next_page, request, client)))
184 }
185 Err(e) => {
186 // Return error and terminate stream
187 request.get_mut().page_token = None;
188 Some((Err(e), (Vec::new().into_iter(), false, request, client)))
189 }
190 }
191 } else {
192 None
193 }
194 },
195 )
196 }
197
198 /// Creates a stream of `PackageVersion`s based on the provided request.
199 ///
200 /// The stream handles pagination automatically by using the page_token from responses
201 /// to fetch subsequent pages. The original request's page_token is used as the starting point.
202 ///
203 /// # Arguments
204 /// * `request` - The initial `ListPackageVersionsRequest` with search criteria
205 ///
206 /// # Returns
207 /// A stream that yields `Result<PackageVersion>` instances. If any RPC call fails, the
208 /// tonic::Status from that request is returned.
209 pub fn list_package_versions(
210 &self,
211 request: impl tonic::IntoRequest<ListPackageVersionsRequest>,
212 ) -> impl Stream<Item = Result<PackageVersion>> + 'static {
213 let client = self.clone();
214 let request = request.into_request();
215
216 stream::unfold(
217 (
218 Vec::new().into_iter(), // current batch of objects
219 true, // has_next_page
220 request, // request (page_token will be updated as we paginate)
221 client, // client for making requests
222 ),
223 move |(mut iter, has_next_page, mut request, mut client)| async move {
224 if let Some(item) = iter.next() {
225 return Some((Ok(item), (iter, has_next_page, request, client)));
226 }
227
228 if has_next_page {
229 let new_request = tonic::Request::from_parts(
230 request.metadata().clone(),
231 request.extensions().clone(),
232 request.get_ref().clone(),
233 );
234
235 match client
236 .package_client()
237 .list_package_versions(new_request)
238 .await
239 {
240 Ok(response) => {
241 let response = response.into_inner();
242 let mut iter = response.versions.into_iter();
243
244 let has_next_page = response.next_page_token.is_some();
245 request.get_mut().page_token = response.next_page_token;
246
247 iter.next()
248 .map(|item| (Ok(item), (iter, has_next_page, request, client)))
249 }
250 Err(e) => {
251 // Return error and terminate stream
252 request.get_mut().page_token = None;
253 Some((Err(e), (Vec::new().into_iter(), false, request, client)))
254 }
255 }
256 } else {
257 None
258 }
259 },
260 )
261 }
262}