sui_graphql_client/
streams.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::error;
5use crate::query_types::PageInfo;
6use crate::Direction;
7use crate::Page;
8use crate::PaginationFilter;
9
10use futures::Stream;
11use std::future::Future;
12use std::pin::Pin;
13use std::task::Context;
14use std::task::Poll;
15
16/// A stream that yields items from a paginated query with support for bidirectional pagination.
17pub struct PageStream<T, F, Fut> {
18    query_fn: F,
19    direction: Direction,
20    current_page: Option<(PageInfo, std::vec::IntoIter<T>)>,
21    current_future: Option<Pin<Box<Fut>>>,
22    finished: bool,
23    is_first_page: bool,
24}
25
26impl<T, F, Fut> PageStream<T, F, Fut> {
27    pub fn new(query_fn: F, direction: Direction) -> Self {
28        Self {
29            query_fn,
30            direction,
31            current_page: None,
32            current_future: None,
33            finished: false,
34            is_first_page: true,
35        }
36    }
37}
38
39impl<T, F, Fut> Stream for PageStream<T, F, Fut>
40where
41    T: Clone + Unpin,
42    F: Fn(PaginationFilter) -> Fut,
43    F: Unpin,
44    Fut: Future<Output = Result<Page<T>, error::Error>>,
45{
46    type Item = Result<T, error::Error>;
47
48    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49        if self.finished {
50            return Poll::Ready(None);
51        }
52
53        loop {
54            let direction = self.direction.clone();
55            // If we have a current page, return the next item
56            if let Some((page_info, iter)) = &mut self.current_page {
57                if let Some(item) = iter.next() {
58                    return Poll::Ready(Some(Ok(item)));
59                }
60
61                // For backward pagination, we check for previous page
62                // For the first page in backward pagination, we don't need to check has_previous_page
63                let should_continue = match direction {
64                    Direction::Forward => page_info.has_next_page,
65                    Direction::Backward => page_info.has_previous_page,
66                };
67                if !should_continue {
68                    self.finished = true;
69                    return Poll::Ready(None);
70                }
71            }
72
73            // Get cursor from current page
74            let current_cursor = self
75                .current_page
76                .as_ref()
77                .and_then(|(page_info, _iter)| {
78                    match self.direction {
79                        Direction::Forward => page_info
80                            .has_next_page
81                            .then(|| page_info.end_cursor.clone()),
82                        Direction::Backward => {
83                            // For the first page in backward pagination, we don't use a cursor
84                            // This ensures we start from the last page
85                            if self.is_first_page {
86                                None
87                            } else {
88                                page_info
89                                    .has_previous_page
90                                    .then(|| page_info.start_cursor.clone())
91                            }
92                        }
93                    }
94                })
95                .flatten();
96
97            // If there's no future yet, create one
98            if self.current_future.is_none() {
99                if self.is_first_page && current_cursor.is_some() {
100                    self.is_first_page = false;
101                }
102                let filter = PaginationFilter {
103                    direction: self.direction.clone(),
104                    cursor: current_cursor,
105                    limit: None,
106                };
107                let future = (self.query_fn)(filter);
108                self.current_future = Some(Box::pin(future));
109            }
110
111            // Poll the future
112            match self.current_future.as_mut().unwrap().as_mut().poll(cx) {
113                Poll::Ready(Ok(page)) => {
114                    self.current_future = None;
115
116                    if page.is_empty() {
117                        self.finished = true;
118                        return Poll::Ready(None);
119                    }
120
121                    let (page_info, data) = page.into_parts();
122                    // For backward pagination, we need to reverse the items
123                    let iter = match self.direction {
124                        Direction::Forward => data.into_iter(),
125                        Direction::Backward => {
126                            let mut vec = data;
127                            vec.reverse();
128                            vec.into_iter()
129                        }
130                    };
131                    self.current_page = Some((page_info, iter));
132
133                    if self.is_first_page {
134                        self.is_first_page = false;
135                    }
136                }
137                Poll::Ready(Err(e)) => {
138                    if self.is_first_page {
139                        self.is_first_page = false;
140                    }
141                    self.finished = true;
142                    self.current_future = None;
143                    return Poll::Ready(Some(Err(e)));
144                }
145                Poll::Pending => return Poll::Pending,
146            }
147        }
148    }
149}
150
151/// Creates a new `PageStream` for a paginated query.
152///
153/// Examples
154///
155/// ```rust,ignore
156/// use futures::StreamExt;
157/// use sui_graphql_client::streams::stream_paginated_query;
158/// use sui_graphql_client::Client;
159/// use sui_graphql_client::PaginationFilter;
160/// use sui_graphql_client::Direction;
161///
162/// let client = Client::new_testnet();
163/// let stream = stream_paginated_query(|pagination_filter, Direction::Forward| {
164///    client.coins(owner, coin_type, pagination_filter)
165/// });
166///
167/// while let Some(result) = stream.next().await {
168///    match result {
169///        Ok(coin) => println!("Got coin: {:?}", coin),
170///        Err(e) => eprintln!("Error: {}", e),
171///    }
172/// }
173/// ```
174pub fn stream_paginated_query<T, F, Fut>(query_fn: F, direction: Direction) -> PageStream<T, F, Fut>
175where
176    F: Fn(PaginationFilter) -> Fut,
177    Fut: Future<Output = Result<Page<T>, error::Error>>,
178{
179    PageStream::new(query_fn, direction)
180}