Compare commits
3 commits
5ed334a454
...
1d7d5c8a59
Author | SHA1 | Date | |
---|---|---|---|
![]() |
1d7d5c8a59 | ||
![]() |
751e88fdc4 | ||
![]() |
2117d039b7 |
8 changed files with 338 additions and 156 deletions
|
@ -76,7 +76,7 @@ async fn main() {
|
|||
.with_filter("{status} = 'Todo' || {status} = 'In Progress'".to_owned())
|
||||
.build()
|
||||
.unwrap()
|
||||
.execute::<MyRecord>()
|
||||
.stream_items::<MyRecord>();
|
||||
|
||||
while let Some(result) = rec_stream.next().await {
|
||||
let rec = result.unwrap();
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use ferrtable::Client;
|
||||
use futures::prelude::*;
|
||||
use futures::StreamExt as _;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::settings::Settings;
|
||||
|
@ -29,6 +29,12 @@ async fn main() {
|
|||
let settings = Settings::load().unwrap();
|
||||
let client = Client::new_from_access_token(&settings.access_token).unwrap();
|
||||
|
||||
println!("Testing Client::list_bases()...");
|
||||
let mut bases = client.list_bases().build().unwrap().stream_items();
|
||||
while let Some(res) = bases.next().await {
|
||||
dbg!(res.unwrap());
|
||||
}
|
||||
|
||||
println!("Testing Client::create_records()...");
|
||||
client
|
||||
.create_records([TestRecord {
|
||||
|
@ -45,15 +51,20 @@ async fn main() {
|
|||
.unwrap();
|
||||
|
||||
println!("Testing Client::list_records()...");
|
||||
let mut records = client
|
||||
let records = client
|
||||
.list_records()
|
||||
.with_base_id(settings.base_id.clone())
|
||||
.with_table_id(settings.table_id.clone())
|
||||
.build()
|
||||
.unwrap()
|
||||
.execute::<TestRecord>();
|
||||
while let Some(res) = records.next().await {
|
||||
dbg!(res.unwrap().fields);
|
||||
.stream_items::<TestRecord>()
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap();
|
||||
for rec in records {
|
||||
dbg!(rec.fields);
|
||||
}
|
||||
|
||||
println!("All tests succeeded.");
|
||||
|
|
|
@ -2,7 +2,10 @@ use std::fmt::Debug;
|
|||
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{create_records::CreateRecordsQueryBuilder, list_records::ListRecordsQueryBuilder};
|
||||
use crate::{
|
||||
create_records::CreateRecordsQueryBuilder, list_bases::ListBasesQueryBuilder,
|
||||
list_records::ListRecordsQueryBuilder,
|
||||
};
|
||||
|
||||
const DEFAULT_API_ROOT: &str = "https://api.airtable.com";
|
||||
|
||||
|
@ -15,40 +18,46 @@ pub struct Client {
|
|||
token: String,
|
||||
}
|
||||
|
||||
// Implement the Default trait so that `derive_builder` will allow fields
|
||||
// containing the Client type to be defined with `#[builder(setter(skip))]`.
|
||||
//
|
||||
// In order to avoid repeating ourselves, this may also be used in
|
||||
// `new_from_access_token()`.
|
||||
impl Default for Client {
|
||||
/// WARNING: Default is implemented only to satisfy trait bound checks
|
||||
/// internally. You may use it externally as a placeholder value, but be
|
||||
/// aware that it will not be able to make any authenticated API requests.
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
impl Client {
|
||||
pub fn new_from_access_token(token: &str) -> Result<Self, reqwest::Error> {
|
||||
Ok(Self {
|
||||
api_root: DEFAULT_API_ROOT.to_owned(),
|
||||
client: reqwest::ClientBuilder::default()
|
||||
.https_only(true)
|
||||
.build()
|
||||
.expect("reqwest client is always built with the same configuration here"),
|
||||
token: "".to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new_from_access_token(token: &str) -> Result<Self, reqwest::Error> {
|
||||
Ok(Self {
|
||||
token: token.to_owned(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
/// Constructs a builder for inserting up to 10 records at a time into a
|
||||
/// table.
|
||||
/// Creates multiple records. Note that table names and table ids can be
|
||||
/// used interchangeably. We recommend using table IDs so you don't need
|
||||
/// to modify your API request when your table name changes.
|
||||
///
|
||||
/// Specify the base and table IDs with its `.with_base_id()` and
|
||||
/// `.with_table_id()` methods.
|
||||
/// Your request body should include an array of up to 10 record objects.
|
||||
///
|
||||
/// Returns a unique array of the newly created record ids if the call
|
||||
/// succeeds.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// client
|
||||
/// .create_records([
|
||||
/// HashMap<&str, &str>::from([
|
||||
/// ("name", "Steal Improbability Drive"),
|
||||
/// ("notes", "Just for fun, no other reason."),
|
||||
/// ("status", "In progress"),
|
||||
/// ]),
|
||||
/// ])
|
||||
/// .with_base_id("***".to_owned())
|
||||
/// .with_table_id("***".to_owned())
|
||||
/// .build()
|
||||
/// .unwrap()
|
||||
/// .execute()
|
||||
/// .await
|
||||
/// .unwrap();
|
||||
/// ```
|
||||
pub fn create_records<I, T>(&self, records: I) -> CreateRecordsQueryBuilder<T>
|
||||
where
|
||||
T: Serialize,
|
||||
|
@ -59,14 +68,66 @@ impl Client {
|
|||
.with_records(records.into_iter().collect())
|
||||
}
|
||||
|
||||
/// Constructs a builder for listing records in a table or view.
|
||||
/// List the bases the token can access
|
||||
///
|
||||
/// Specify the base and table IDs with its `.with_base_id()` and
|
||||
/// `.with_table_id()` methods.
|
||||
/// # Examples
|
||||
///
|
||||
/// ## Consuming as Stream
|
||||
///
|
||||
/// ```no_run
|
||||
/// use futures::prelude::*;
|
||||
///
|
||||
/// let mut base_stream = client
|
||||
/// .list_bases()
|
||||
/// .build()
|
||||
/// .unwrap()
|
||||
/// .stream_items();
|
||||
///
|
||||
/// while let Some(result) = base_stream.next().await {
|
||||
/// dbg!(result.unwrap());
|
||||
/// }
|
||||
/// ```
|
||||
pub fn list_bases(&self) -> ListBasesQueryBuilder {
|
||||
ListBasesQueryBuilder::default().with_client(self.clone())
|
||||
}
|
||||
|
||||
/// List records in a table. Note that table names and table ids can be used
|
||||
/// interchangeably. We recommend using table IDs so you don't need to modify
|
||||
/// your API request when your table name changes.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ## Consuming as Stream
|
||||
///
|
||||
/// ```no_run
|
||||
/// let mut rec_stream = client
|
||||
/// .list_records()
|
||||
/// .with_base_id("***")
|
||||
/// .with_table_id("***")
|
||||
/// .build()
|
||||
/// .unwrap()
|
||||
/// .stream_items::<HashMap<String, serde_json::Value>>();
|
||||
///
|
||||
/// while let Some(result) = rec_stream.next().await {
|
||||
/// let rec = result.unwrap();
|
||||
/// dbg!(rec.fields);
|
||||
/// }
|
||||
/// ```
|
||||
pub fn list_records(&self) -> ListRecordsQueryBuilder {
|
||||
ListRecordsQueryBuilder::default().with_client(self.clone())
|
||||
}
|
||||
|
||||
/// Constructs a RequestBuilder with URL "{self.api_root}/{path}" and the
|
||||
/// Authorization header set to the correct bearer auth value.
|
||||
pub(crate) fn get_path(&self, path: &str) -> reqwest::RequestBuilder {
|
||||
let Self {
|
||||
api_root, token, ..
|
||||
} = self;
|
||||
self.client
|
||||
.get(format!("{api_root}/{path}"))
|
||||
.header(reqwest::header::AUTHORIZATION, format!("Bearer {token}"))
|
||||
}
|
||||
|
||||
/// Constructs a RequestBuilder with URL "{self.api_root}/{path}" and the
|
||||
/// Authorization header set to the correct bearer auth value.
|
||||
pub(crate) fn post_path(&self, path: &str) -> reqwest::RequestBuilder {
|
||||
|
|
|
@ -45,7 +45,7 @@ pub struct CreateRecordsDetails {
|
|||
pub reasons: Vec<String>,
|
||||
}
|
||||
|
||||
impl<'a, T> CreateRecordsQuery<T>
|
||||
impl<T> CreateRecordsQuery<T>
|
||||
where
|
||||
T: Clone + DeserializeOwned + Serialize,
|
||||
{
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
pub mod cell_values;
|
||||
pub mod client;
|
||||
pub mod errors;
|
||||
mod pagination;
|
||||
pub mod types;
|
||||
|
||||
// Each API operation is organized into a dedicated Rust module.
|
||||
pub mod create_records;
|
||||
pub mod list_bases;
|
||||
pub mod list_records;
|
||||
|
||||
pub use client::Client;
|
||||
|
|
75
ferrtable/src/list_bases.rs
Normal file
75
ferrtable/src/list_bases.rs
Normal file
|
@ -0,0 +1,75 @@
|
|||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
|
||||
use derive_builder::Builder;
|
||||
use futures::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::client::Client;
|
||||
use crate::errors::ExecutionError;
|
||||
use crate::pagination::{PaginatedQuery, PaginatedResponse, execute_paginated};
|
||||
|
||||
#[derive(Builder, Clone, Debug, Serialize)]
|
||||
#[builder(pattern = "owned", setter(prefix = "with"))]
|
||||
pub struct ListBasesQuery {
|
||||
#[serde(skip)]
|
||||
#[builder(vis = "pub(crate)")]
|
||||
client: Client,
|
||||
|
||||
/// To fetch the next page of records, include offset from the previous
|
||||
/// request in the next request's parameters.
|
||||
#[builder(default, private)]
|
||||
offset: Option<String>,
|
||||
}
|
||||
|
||||
impl PaginatedQuery<Base, ListBasesResponse> for ListBasesQuery {
|
||||
fn get_offset(&self) -> Option<String> {
|
||||
self.offset.clone()
|
||||
}
|
||||
|
||||
fn set_offset(&mut self, value: Option<String>) {
|
||||
self.offset = value
|
||||
}
|
||||
|
||||
fn get_req_builder(&self) -> reqwest::RequestBuilder {
|
||||
self.client
|
||||
.get_path("v0/meta/bases")
|
||||
.query(&[("offset", self.offset.clone())])
|
||||
}
|
||||
}
|
||||
|
||||
impl ListBasesQuery {
|
||||
pub fn stream_items(self) -> Pin<Box<impl Stream<Item = Result<Base, ExecutionError>>>> {
|
||||
execute_paginated::<Base, ListBasesResponse>(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
|
||||
pub struct Base {
|
||||
/// Base ID, a unique identifier for a base.
|
||||
pub id: String,
|
||||
|
||||
pub name: String,
|
||||
|
||||
#[serde(rename = "permissionLevel")]
|
||||
pub permission_level: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize)]
|
||||
pub struct ListBasesResponse {
|
||||
/// If there are more records, the response will contain an offset. Pass
|
||||
/// this offset into the next request to fetch the next page of records.
|
||||
offset: Option<String>,
|
||||
|
||||
bases: VecDeque<Base>,
|
||||
}
|
||||
|
||||
impl PaginatedResponse<Base> for ListBasesResponse {
|
||||
fn get_offset(&self) -> Option<String> {
|
||||
self.offset.clone()
|
||||
}
|
||||
|
||||
fn get_items(&self) -> VecDeque<Base> {
|
||||
self.bases.clone()
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
use crate::client::Client;
|
||||
use crate::errors::ExecutionError;
|
||||
use crate::pagination::{PaginatedQuery, PaginatedResponse, execute_paginated};
|
||||
use crate::types::AirtableRecord;
|
||||
|
||||
#[derive(Builder, Clone, Debug, Serialize)]
|
||||
|
@ -27,21 +28,19 @@ pub struct ListRecordsQuery {
|
|||
#[builder(default)]
|
||||
fields: Option<Vec<String>>,
|
||||
|
||||
// filterByFormula is renamed so that the builder method, that is,
|
||||
// `.with_filter()`, reads more cleanly.
|
||||
/// A formula used to filter records. The formula will be evaluated for
|
||||
/// each record, and if the result is not 0, false, "", NaN, [], or #Error!
|
||||
/// the record will be included in the response.
|
||||
///
|
||||
/// If combined with the view parameter, only records in that view which
|
||||
/// satisfy the formula will be returned.
|
||||
#[serde(rename = "filterByFormula")]
|
||||
#[builder(default)]
|
||||
// filterByFormula is renamed so that the builder method, that is,
|
||||
// `.with_filter()`, reads more cleanly.
|
||||
#[serde(rename = "filterByFormula")]
|
||||
filter: Option<String>,
|
||||
|
||||
/// To fetch the next page of records, include offset from the previous
|
||||
/// request in the next request's parameters.
|
||||
#[builder(default, vis = "pub(crate)")]
|
||||
#[builder(default, private)]
|
||||
offset: Option<String>,
|
||||
|
||||
#[serde(rename = "pageSize")]
|
||||
|
@ -52,131 +51,56 @@ pub struct ListRecordsQuery {
|
|||
table_id: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize)]
|
||||
struct ListRecordsResponse<T>
|
||||
impl<T> PaginatedQuery<AirtableRecord<T>, ListRecordsResponse<T>> for ListRecordsQuery
|
||||
where
|
||||
T: Clone + Serialize,
|
||||
T: Clone + DeserializeOwned,
|
||||
{
|
||||
/// If there are more records, the response will contain an offset. Pass
|
||||
/// this offset into the next request to fetch the next page of records.
|
||||
offset: Option<String>,
|
||||
|
||||
records: VecDeque<AirtableRecord<T>>,
|
||||
fn get_offset(&self) -> Option<String> {
|
||||
self.offset.clone()
|
||||
}
|
||||
|
||||
// Acts similarly to a `?` operator, but for the result stream. Upon an error,
|
||||
// it short-circuit returns the error as the final item in the stream.
|
||||
macro_rules! handle_stream_err {
|
||||
($fallible:expr, state = $state:expr) => {
|
||||
match $fallible {
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
return Some((
|
||||
Err(ExecutionError::from(err)),
|
||||
StreamState {
|
||||
buffered: VecDeque::new(),
|
||||
started: true,
|
||||
query: ListRecordsQuery {
|
||||
offset: None,
|
||||
..$state.query
|
||||
},
|
||||
},
|
||||
));
|
||||
fn set_offset(&mut self, value: Option<String>) {
|
||||
self.offset = value;
|
||||
}
|
||||
|
||||
fn get_req_builder(&self) -> reqwest::RequestBuilder {
|
||||
let base_id = utf8_percent_encode(&self.base_id, NON_ALPHANUMERIC).to_string();
|
||||
let table_id = utf8_percent_encode(&self.table_id, NON_ALPHANUMERIC).to_string();
|
||||
self.client
|
||||
.post_path(&format!("v0/{base_id}/{table_id}/listRecords",))
|
||||
.json(&self)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl ListRecordsQuery {
|
||||
/// Execute the API request.
|
||||
///
|
||||
/// Currently, failures return a one-size-fits-all error wrapping the
|
||||
/// underlying `reqwest::Error`. This may be improved in future releases
|
||||
/// to better differentiate between network, serialization, deserialization,
|
||||
/// and API errors.
|
||||
///
|
||||
/// Pagination is handled automatically, and items are returned as a
|
||||
/// seemingly continuous stream of Results. If an error is encountered while
|
||||
/// fetching a page, the Err will be yielded immediately and no further
|
||||
/// items will be returned.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// let mut rec_stream = client
|
||||
/// .list_records()
|
||||
/// .with_base_id("***")
|
||||
/// .with_table_id("***")
|
||||
/// .build()
|
||||
/// .unwrap()
|
||||
/// .execute::<HashMap<String, String>>()
|
||||
///
|
||||
/// while let Some(result) = rec_stream.next().await {
|
||||
/// let rec = result.unwrap();
|
||||
/// dbg!(rec.fields);
|
||||
/// }
|
||||
/// ```
|
||||
pub fn execute<T>(
|
||||
pub fn stream_items<T>(
|
||||
self,
|
||||
) -> Pin<Box<impl Stream<Item = Result<AirtableRecord<T>, ExecutionError>>>>
|
||||
where
|
||||
T: Clone + DeserializeOwned + Serialize + Unpin,
|
||||
T: Clone + DeserializeOwned,
|
||||
{
|
||||
struct StreamState<T>
|
||||
where
|
||||
T: Clone + Serialize + Unpin,
|
||||
{
|
||||
buffered: VecDeque<AirtableRecord<T>>,
|
||||
query: ListRecordsQuery,
|
||||
started: bool,
|
||||
execute_paginated::<AirtableRecord<T>, ListRecordsResponse<T>>(self)
|
||||
}
|
||||
}
|
||||
|
||||
// Stream has to be pinned to the heap so that the closure inside
|
||||
// doesn't need to implement Unpin (which I don't think it can).
|
||||
Box::pin(futures::stream::unfold(
|
||||
StreamState {
|
||||
buffered: VecDeque::new(),
|
||||
query: self.clone(),
|
||||
started: false,
|
||||
},
|
||||
|mut state: StreamState<T>| async move {
|
||||
if let Some(value) = state.buffered.pop_front() {
|
||||
// Iterate through a pre-loaded page.
|
||||
return Some((Ok(value), state));
|
||||
#[derive(Clone, Deserialize)]
|
||||
pub struct ListRecordsResponse<T>
|
||||
where
|
||||
T: Clone,
|
||||
{
|
||||
offset: Option<String>,
|
||||
records: VecDeque<AirtableRecord<T>>,
|
||||
}
|
||||
if state.query.offset.is_some() || !state.started {
|
||||
// Fetch the next page.
|
||||
state.started = true;
|
||||
let base_id =
|
||||
utf8_percent_encode(&state.query.base_id, NON_ALPHANUMERIC).to_string();
|
||||
let table_id =
|
||||
utf8_percent_encode(&state.query.table_id, NON_ALPHANUMERIC).to_string();
|
||||
let http_resp = handle_stream_err!(
|
||||
handle_stream_err!(
|
||||
state
|
||||
.query
|
||||
.client
|
||||
.post_path(&format!("v0/{base_id}/{table_id}/listRecords",))
|
||||
.json(&state.query)
|
||||
.send()
|
||||
.await,
|
||||
state = state
|
||||
)
|
||||
.error_for_status(),
|
||||
state = state
|
||||
);
|
||||
let deserialized_resp: ListRecordsResponse<T> =
|
||||
handle_stream_err!(http_resp.json().await, state = state);
|
||||
state.buffered = deserialized_resp.records;
|
||||
state.query.offset = deserialized_resp.offset;
|
||||
if let Some(value) = state.buffered.pop_front() {
|
||||
// Yield the first item from the newly fetched page.
|
||||
return Some((Ok(value), state));
|
||||
}
|
||||
}
|
||||
// No more items buffered and no subsequent page to fetch.
|
||||
None
|
||||
},
|
||||
))
|
||||
|
||||
impl<T> PaginatedResponse<AirtableRecord<T>> for ListRecordsResponse<T>
|
||||
where
|
||||
T: Clone + DeserializeOwned,
|
||||
{
|
||||
fn get_offset(&self) -> Option<String> {
|
||||
self.offset.clone()
|
||||
}
|
||||
|
||||
fn get_items(&self) -> VecDeque<AirtableRecord<T>> {
|
||||
self.records.clone()
|
||||
}
|
||||
}
|
||||
|
|
109
ferrtable/src/pagination.rs
Normal file
109
ferrtable/src/pagination.rs
Normal file
|
@ -0,0 +1,109 @@
|
|||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::prelude::*;
|
||||
use reqwest::RequestBuilder;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
use crate::errors::ExecutionError;
|
||||
|
||||
/// An Airtable API request type with cursor-based pagination (in Airtable API
|
||||
/// parlance, "offset" pagination).
|
||||
pub(crate) trait PaginatedQuery<T, R>: Clone
|
||||
where
|
||||
T: Clone,
|
||||
R: PaginatedResponse<T>,
|
||||
{
|
||||
// TODO: docs
|
||||
fn get_offset(&self) -> Option<String>;
|
||||
|
||||
fn set_offset(&mut self, value: Option<String>);
|
||||
|
||||
fn get_req_builder(&self) -> RequestBuilder;
|
||||
}
|
||||
|
||||
pub(crate) trait PaginatedResponse<T>: Clone + DeserializeOwned
|
||||
where
|
||||
T: Clone,
|
||||
{
|
||||
fn get_offset(&self) -> Option<String>;
|
||||
|
||||
fn get_items(&self) -> VecDeque<T>;
|
||||
}
|
||||
|
||||
struct StreamState<Q, T>
|
||||
where
|
||||
Q: Clone,
|
||||
T: Clone,
|
||||
{
|
||||
buffered: VecDeque<T>,
|
||||
query: Q,
|
||||
started: bool,
|
||||
}
|
||||
|
||||
/// Acts similarly to a `?` operator, but for the result stream. Upon an error,
|
||||
/// it short-circuit returns the error as the final item in the stream.
|
||||
macro_rules! handle_stream_err {
|
||||
($fallible:expr, state = $state:expr) => {
|
||||
match $fallible {
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
$state.query.set_offset(None);
|
||||
return Some((
|
||||
Err(ExecutionError::from(err)),
|
||||
StreamState {
|
||||
buffered: VecDeque::new(),
|
||||
started: true,
|
||||
query: $state.query,
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// This could be brought into PaginatedQuery as a default implementation, but
|
||||
// that forces that the traits in this module be exposed outside of the crate
|
||||
// and additionally results in worse client ergonomics overall.
|
||||
pub(crate) fn execute_paginated<T, R>(
|
||||
query: impl PaginatedQuery<T, R>,
|
||||
) -> Pin<Box<impl Stream<Item = Result<T, ExecutionError>>>>
|
||||
where
|
||||
T: Clone,
|
||||
R: PaginatedResponse<T>,
|
||||
{
|
||||
// Stream has to be pinned to the heap so that the closure inside
|
||||
// doesn't need to implement Unpin (which I don't think it can).
|
||||
Box::pin(futures::stream::unfold(
|
||||
StreamState {
|
||||
buffered: VecDeque::new(),
|
||||
query,
|
||||
started: false,
|
||||
},
|
||||
|mut state| async move {
|
||||
if let Some(value) = state.buffered.pop_front() {
|
||||
// Iterate through a pre-loaded page.
|
||||
return Some((Ok(value), state));
|
||||
}
|
||||
if state.query.get_offset().is_some() || !state.started {
|
||||
// Fetch the next page.
|
||||
state.started = true;
|
||||
let http_resp = handle_stream_err!(
|
||||
handle_stream_err!(state.query.get_req_builder().send().await, state = state)
|
||||
.error_for_status(),
|
||||
state = state
|
||||
);
|
||||
let deserialized_resp: R =
|
||||
handle_stream_err!(http_resp.json().await, state = state);
|
||||
state.buffered = deserialized_resp.get_items();
|
||||
state.query.set_offset(deserialized_resp.get_offset());
|
||||
if let Some(value) = state.buffered.pop_front() {
|
||||
// Yield the first item from the newly fetched page.
|
||||
return Some((Ok(value), state));
|
||||
}
|
||||
}
|
||||
// No more items buffered and no subsequent page to fetch.
|
||||
None
|
||||
},
|
||||
))
|
||||
}
|
Loading…
Add table
Reference in a new issue