diff --git a/ferrtable-test/src/main.rs b/ferrtable-test/src/main.rs index 5c19d8b..40774d8 100644 --- a/ferrtable-test/src/main.rs +++ b/ferrtable-test/src/main.rs @@ -29,6 +29,12 @@ async fn main() { let settings = Settings::load().unwrap(); let client = Client::new_from_access_token(&settings.access_token).unwrap(); + println!("Testing Client::list_bases()..."); + let mut bases = client.list_bases().build().unwrap().stream_items(); + while let Some(res) = bases.next().await { + dbg!(res.unwrap()); + } + println!("Testing Client::create_records()..."); client .create_records([TestRecord { diff --git a/ferrtable/src/client.rs b/ferrtable/src/client.rs index dd7ef85..310f3c9 100644 --- a/ferrtable/src/client.rs +++ b/ferrtable/src/client.rs @@ -2,7 +2,10 @@ use std::fmt::Debug; use serde::Serialize; -use crate::{create_records::CreateRecordsQueryBuilder, list_records::ListRecordsQueryBuilder}; +use crate::{ + create_records::CreateRecordsQueryBuilder, list_bases::ListBasesQueryBuilder, + list_records::ListRecordsQueryBuilder, +}; const DEFAULT_API_ROOT: &str = "https://api.airtable.com"; @@ -82,6 +85,29 @@ impl Client { .with_records(records.into_iter().collect()) } + /// List the bases the token can access + /// + /// # Examples + /// + /// ## Consuming as Stream + /// + /// ```no_run + /// use futures::prelude::*; + /// + /// let mut base_stream = client + /// .list_bases() + /// .build() + /// .unwrap() + /// .stream_items(); + /// + /// while let Some(result) = base_stream.next().await { + /// dbg!(result.unwrap()); + /// } + /// ``` + pub fn list_bases(&self) -> ListBasesQueryBuilder { + ListBasesQueryBuilder::default().with_client(self.clone()) + } + /// List records in a table. Note that table names and table ids can be used /// interchangeably. We recommend using table IDs so you don't need to modify /// your API request when your table name changes. @@ -108,6 +134,17 @@ impl Client { ListRecordsQueryBuilder::default().with_client(self.clone()) } + /// Constructs a RequestBuilder with URL "{self.api_root}/{path}" and the + /// Authorization header set to the correct bearer auth value. + pub(crate) fn get_path(&self, path: &str) -> reqwest::RequestBuilder { + let Self { + api_root, token, .. + } = self; + self.client + .get(format!("{api_root}/{path}")) + .header(reqwest::header::AUTHORIZATION, format!("Bearer {token}")) + } + /// Constructs a RequestBuilder with URL "{self.api_root}/{path}" and the /// Authorization header set to the correct bearer auth value. pub(crate) fn post_path(&self, path: &str) -> reqwest::RequestBuilder { diff --git a/ferrtable/src/lib.rs b/ferrtable/src/lib.rs index b5628bd..2298062 100644 --- a/ferrtable/src/lib.rs +++ b/ferrtable/src/lib.rs @@ -1,10 +1,12 @@ pub mod cell_values; pub mod client; pub mod errors; +mod pagination; pub mod types; // Each API operation is organized into a dedicated Rust module. pub mod create_records; +pub mod list_bases; pub mod list_records; pub use client::Client; diff --git a/ferrtable/src/list_bases.rs b/ferrtable/src/list_bases.rs new file mode 100644 index 0000000..e56cee2 --- /dev/null +++ b/ferrtable/src/list_bases.rs @@ -0,0 +1,75 @@ +use std::collections::VecDeque; +use std::pin::Pin; + +use derive_builder::Builder; +use futures::prelude::*; +use serde::{Deserialize, Serialize}; + +use crate::client::Client; +use crate::errors::ExecutionError; +use crate::pagination::{PaginatedQuery, PaginatedResponse, execute_paginated}; + +#[derive(Builder, Clone, Debug, Serialize)] +#[builder(pattern = "owned", setter(prefix = "with"))] +pub struct ListBasesQuery { + #[serde(skip)] + #[builder(vis = "pub(crate)")] + client: Client, + + /// To fetch the next page of records, include offset from the previous + /// request in the next request's parameters. + #[builder(default, private)] + offset: Option, +} + +impl PaginatedQuery for ListBasesQuery { + fn get_offset(&self) -> Option { + self.offset.clone() + } + + fn set_offset(&mut self, value: Option) { + self.offset = value + } + + fn get_req_builder(&self) -> reqwest::RequestBuilder { + self.client + .get_path("v0/meta/bases") + .query(&[("offset", self.offset.clone())]) + } +} + +impl ListBasesQuery { + pub fn stream_items(self) -> Pin>>> { + execute_paginated::(self) + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct Base { + /// Base ID, a unique identifier for a base. + pub id: String, + + pub name: String, + + #[serde(rename = "permissionLevel")] + pub permission_level: String, +} + +#[derive(Clone, Deserialize)] +pub struct ListBasesResponse { + /// If there are more records, the response will contain an offset. Pass + /// this offset into the next request to fetch the next page of records. + offset: Option, + + bases: VecDeque, +} + +impl PaginatedResponse for ListBasesResponse { + fn get_offset(&self) -> Option { + self.offset.clone() + } + + fn get_items(&self) -> VecDeque { + self.bases.clone() + } +} diff --git a/ferrtable/src/list_records.rs b/ferrtable/src/list_records.rs index 6e70779..65b0657 100644 --- a/ferrtable/src/list_records.rs +++ b/ferrtable/src/list_records.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use crate::client::Client; use crate::errors::ExecutionError; +use crate::pagination::{PaginatedQuery, PaginatedResponse, execute_paginated}; use crate::types::AirtableRecord; #[derive(Builder, Clone, Debug, Serialize)] @@ -27,21 +28,19 @@ pub struct ListRecordsQuery { #[builder(default)] fields: Option>, - // filterByFormula is renamed so that the builder method, that is, - // `.with_filter()`, reads more cleanly. /// A formula used to filter records. The formula will be evaluated for /// each record, and if the result is not 0, false, "", NaN, [], or #Error! /// the record will be included in the response. /// /// If combined with the view parameter, only records in that view which /// satisfy the formula will be returned. - #[serde(rename = "filterByFormula")] #[builder(default)] + // filterByFormula is renamed so that the builder method, that is, + // `.with_filter()`, reads more cleanly. + #[serde(rename = "filterByFormula")] filter: Option, - /// To fetch the next page of records, include offset from the previous - /// request in the next request's parameters. - #[builder(default, vis = "pub(crate)")] + #[builder(default, private)] offset: Option, #[serde(rename = "pageSize")] @@ -52,39 +51,25 @@ pub struct ListRecordsQuery { table_id: String, } -#[derive(Clone, Deserialize)] -struct ListRecordsResponse +impl PaginatedQuery, ListRecordsResponse> for ListRecordsQuery where - T: Clone + Serialize, + T: Clone + DeserializeOwned, { - /// If there are more records, the response will contain an offset. Pass - /// this offset into the next request to fetch the next page of records. - offset: Option, + fn get_offset(&self) -> Option { + self.offset.clone() + } - records: VecDeque>, -} + fn set_offset(&mut self, value: Option) { + self.offset = value; + } -// Acts similarly to a `?` operator, but for the result stream. Upon an error, -// it short-circuit returns the error as the final item in the stream. -macro_rules! handle_stream_err { - ($fallible:expr, state = $state:expr) => { - match $fallible { - Ok(value) => value, - Err(err) => { - return Some(( - Err(ExecutionError::from(err)), - StreamState { - buffered: VecDeque::new(), - started: true, - query: ListRecordsQuery { - offset: None, - ..$state.query - }, - }, - )); - } - } - }; + fn get_req_builder(&self) -> reqwest::RequestBuilder { + let base_id = utf8_percent_encode(&self.base_id, NON_ALPHANUMERIC).to_string(); + let table_id = utf8_percent_encode(&self.table_id, NON_ALPHANUMERIC).to_string(); + self.client + .post_path(&format!("v0/{base_id}/{table_id}/listRecords",)) + .json(&self) + } } impl ListRecordsQuery { @@ -92,63 +77,30 @@ impl ListRecordsQuery { self, ) -> Pin, ExecutionError>>>> where - T: Clone + DeserializeOwned + Serialize + Unpin, + T: Clone + DeserializeOwned, { - struct StreamState - where - T: Clone + Serialize + Unpin, - { - buffered: VecDeque>, - query: ListRecordsQuery, - started: bool, - } - - // Stream has to be pinned to the heap so that the closure inside - // doesn't need to implement Unpin (which I don't think it can). - Box::pin(futures::stream::unfold( - StreamState { - buffered: VecDeque::new(), - query: self.clone(), - started: false, - }, - |mut state: StreamState| async move { - if let Some(value) = state.buffered.pop_front() { - // Iterate through a pre-loaded page. - return Some((Ok(value), state)); - } - if state.query.offset.is_some() || !state.started { - // Fetch the next page. - state.started = true; - let base_id = - utf8_percent_encode(&state.query.base_id, NON_ALPHANUMERIC).to_string(); - let table_id = - utf8_percent_encode(&state.query.table_id, NON_ALPHANUMERIC).to_string(); - let http_resp = handle_stream_err!( - handle_stream_err!( - state - .query - .client - .post_path(&format!("v0/{base_id}/{table_id}/listRecords",)) - .json(&state.query) - .send() - .await, - state = state - ) - .error_for_status(), - state = state - ); - let deserialized_resp: ListRecordsResponse = - handle_stream_err!(http_resp.json().await, state = state); - state.buffered = deserialized_resp.records; - state.query.offset = deserialized_resp.offset; - if let Some(value) = state.buffered.pop_front() { - // Yield the first item from the newly fetched page. - return Some((Ok(value), state)); - } - } - // No more items buffered and no subsequent page to fetch. - None - }, - )) + execute_paginated::, ListRecordsResponse>(self) + } +} + +#[derive(Clone, Deserialize)] +pub struct ListRecordsResponse +where + T: Clone, +{ + offset: Option, + records: VecDeque>, +} + +impl PaginatedResponse> for ListRecordsResponse +where + T: Clone + DeserializeOwned, +{ + fn get_offset(&self) -> Option { + self.offset.clone() + } + + fn get_items(&self) -> VecDeque> { + self.records.clone() } } diff --git a/ferrtable/src/pagination.rs b/ferrtable/src/pagination.rs new file mode 100644 index 0000000..6055da0 --- /dev/null +++ b/ferrtable/src/pagination.rs @@ -0,0 +1,109 @@ +use std::collections::VecDeque; +use std::pin::Pin; + +use futures::prelude::*; +use reqwest::RequestBuilder; +use serde::de::DeserializeOwned; + +use crate::errors::ExecutionError; + +/// An Airtable API request type with cursor-based pagination (in Airtable API +/// parlance, "offset" pagination). +pub(crate) trait PaginatedQuery: Clone +where + T: Clone, + R: PaginatedResponse, +{ + // TODO: docs + fn get_offset(&self) -> Option; + + fn set_offset(&mut self, value: Option); + + fn get_req_builder(&self) -> RequestBuilder; +} + +pub(crate) trait PaginatedResponse: Clone + DeserializeOwned +where + T: Clone, +{ + fn get_offset(&self) -> Option; + + fn get_items(&self) -> VecDeque; +} + +struct StreamState +where + Q: Clone, + T: Clone, +{ + buffered: VecDeque, + query: Q, + started: bool, +} + +/// Acts similarly to a `?` operator, but for the result stream. Upon an error, +/// it short-circuit returns the error as the final item in the stream. +macro_rules! handle_stream_err { + ($fallible:expr, state = $state:expr) => { + match $fallible { + Ok(value) => value, + Err(err) => { + $state.query.set_offset(None); + return Some(( + Err(ExecutionError::from(err)), + StreamState { + buffered: VecDeque::new(), + started: true, + query: $state.query, + }, + )); + } + } + }; +} + +// This could be brought into PaginatedQuery as a default implementation, but +// that forces that the traits in this module be exposed outside of the crate +// and additionally results in worse client ergonomics overall. +pub(crate) fn execute_paginated( + query: impl PaginatedQuery, +) -> Pin>>> +where + T: Clone, + R: PaginatedResponse, +{ + // Stream has to be pinned to the heap so that the closure inside + // doesn't need to implement Unpin (which I don't think it can). + Box::pin(futures::stream::unfold( + StreamState { + buffered: VecDeque::new(), + query, + started: false, + }, + |mut state| async move { + if let Some(value) = state.buffered.pop_front() { + // Iterate through a pre-loaded page. + return Some((Ok(value), state)); + } + if state.query.get_offset().is_some() || !state.started { + // Fetch the next page. + state.started = true; + let http_resp = handle_stream_err!( + handle_stream_err!(state.query.get_req_builder().send().await, state = state) + .error_for_status(), + state = state + ); + let deserialized_resp: R = + handle_stream_err!(http_resp.json().await, state = state); + state.buffered = deserialized_resp.get_items(); + state.query.set_offset(deserialized_resp.get_offset()); + if let Some(value) = state.buffered.pop_front() { + // Yield the first item from the newly fetched page. + return Some((Ok(value), state)); + } + } + // No more items buffered and no subsequent page to fetch. + None + }, + )) +}