add list_bases()

includes refactor of pagination logic into a dedicated module
This commit is contained in:
Brent Schroeter 2025-09-16 19:43:50 -07:00
parent 2117d039b7
commit 751e88fdc4
6 changed files with 274 additions and 93 deletions

View file

@ -29,6 +29,12 @@ async fn main() {
let settings = Settings::load().unwrap();
let client = Client::new_from_access_token(&settings.access_token).unwrap();
println!("Testing Client::list_bases()...");
let mut bases = client.list_bases().build().unwrap().stream_items();
while let Some(res) = bases.next().await {
dbg!(res.unwrap());
}
println!("Testing Client::create_records()...");
client
.create_records([TestRecord {

View file

@ -2,7 +2,10 @@ use std::fmt::Debug;
use serde::Serialize;
use crate::{create_records::CreateRecordsQueryBuilder, list_records::ListRecordsQueryBuilder};
use crate::{
create_records::CreateRecordsQueryBuilder, list_bases::ListBasesQueryBuilder,
list_records::ListRecordsQueryBuilder,
};
const DEFAULT_API_ROOT: &str = "https://api.airtable.com";
@ -82,6 +85,29 @@ impl Client {
.with_records(records.into_iter().collect())
}
/// List the bases the token can access
///
/// # Examples
///
/// ## Consuming as Stream
///
/// ```no_run
/// use futures::prelude::*;
///
/// let mut base_stream = client
/// .list_bases()
/// .build()
/// .unwrap()
/// .stream_items();
///
/// while let Some(result) = base_stream.next().await {
/// dbg!(result.unwrap());
/// }
/// ```
pub fn list_bases(&self) -> ListBasesQueryBuilder {
ListBasesQueryBuilder::default().with_client(self.clone())
}
/// List records in a table. Note that table names and table ids can be used
/// interchangeably. We recommend using table IDs so you don't need to modify
/// your API request when your table name changes.
@ -108,6 +134,17 @@ impl Client {
ListRecordsQueryBuilder::default().with_client(self.clone())
}
/// Constructs a RequestBuilder with URL "{self.api_root}/{path}" and the
/// Authorization header set to the correct bearer auth value.
pub(crate) fn get_path(&self, path: &str) -> reqwest::RequestBuilder {
let Self {
api_root, token, ..
} = self;
self.client
.get(format!("{api_root}/{path}"))
.header(reqwest::header::AUTHORIZATION, format!("Bearer {token}"))
}
/// Constructs a RequestBuilder with URL "{self.api_root}/{path}" and the
/// Authorization header set to the correct bearer auth value.
pub(crate) fn post_path(&self, path: &str) -> reqwest::RequestBuilder {

View file

@ -1,10 +1,12 @@
pub mod cell_values;
pub mod client;
pub mod errors;
mod pagination;
pub mod types;
// Each API operation is organized into a dedicated Rust module.
pub mod create_records;
pub mod list_bases;
pub mod list_records;
pub use client::Client;

View file

@ -0,0 +1,75 @@
use std::collections::VecDeque;
use std::pin::Pin;
use derive_builder::Builder;
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use crate::client::Client;
use crate::errors::ExecutionError;
use crate::pagination::{PaginatedQuery, PaginatedResponse, execute_paginated};
#[derive(Builder, Clone, Debug, Serialize)]
#[builder(pattern = "owned", setter(prefix = "with"))]
pub struct ListBasesQuery {
#[serde(skip)]
#[builder(vis = "pub(crate)")]
client: Client,
/// To fetch the next page of records, include offset from the previous
/// request in the next request's parameters.
#[builder(default, private)]
offset: Option<String>,
}
impl PaginatedQuery<Base, ListBasesResponse> for ListBasesQuery {
fn get_offset(&self) -> Option<String> {
self.offset.clone()
}
fn set_offset(&mut self, value: Option<String>) {
self.offset = value
}
fn get_req_builder(&self) -> reqwest::RequestBuilder {
self.client
.get_path("v0/meta/bases")
.query(&[("offset", self.offset.clone())])
}
}
impl ListBasesQuery {
pub fn stream_items(self) -> Pin<Box<impl Stream<Item = Result<Base, ExecutionError>>>> {
execute_paginated::<Base, ListBasesResponse>(self)
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct Base {
/// Base ID, a unique identifier for a base.
pub id: String,
pub name: String,
#[serde(rename = "permissionLevel")]
pub permission_level: String,
}
#[derive(Clone, Deserialize)]
pub struct ListBasesResponse {
/// If there are more records, the response will contain an offset. Pass
/// this offset into the next request to fetch the next page of records.
offset: Option<String>,
bases: VecDeque<Base>,
}
impl PaginatedResponse<Base> for ListBasesResponse {
fn get_offset(&self) -> Option<String> {
self.offset.clone()
}
fn get_items(&self) -> VecDeque<Base> {
self.bases.clone()
}
}

View file

@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
use crate::client::Client;
use crate::errors::ExecutionError;
use crate::pagination::{PaginatedQuery, PaginatedResponse, execute_paginated};
use crate::types::AirtableRecord;
#[derive(Builder, Clone, Debug, Serialize)]
@ -27,21 +28,19 @@ pub struct ListRecordsQuery {
#[builder(default)]
fields: Option<Vec<String>>,
// filterByFormula is renamed so that the builder method, that is,
// `.with_filter()`, reads more cleanly.
/// A formula used to filter records. The formula will be evaluated for
/// each record, and if the result is not 0, false, "", NaN, [], or #Error!
/// the record will be included in the response.
///
/// If combined with the view parameter, only records in that view which
/// satisfy the formula will be returned.
#[serde(rename = "filterByFormula")]
#[builder(default)]
// filterByFormula is renamed so that the builder method, that is,
// `.with_filter()`, reads more cleanly.
#[serde(rename = "filterByFormula")]
filter: Option<String>,
/// To fetch the next page of records, include offset from the previous
/// request in the next request's parameters.
#[builder(default, vis = "pub(crate)")]
#[builder(default, private)]
offset: Option<String>,
#[serde(rename = "pageSize")]
@ -52,39 +51,25 @@ pub struct ListRecordsQuery {
table_id: String,
}
#[derive(Clone, Deserialize)]
struct ListRecordsResponse<T>
impl<T> PaginatedQuery<AirtableRecord<T>, ListRecordsResponse<T>> for ListRecordsQuery
where
T: Clone + Serialize,
T: Clone + DeserializeOwned,
{
/// If there are more records, the response will contain an offset. Pass
/// this offset into the next request to fetch the next page of records.
offset: Option<String>,
fn get_offset(&self) -> Option<String> {
self.offset.clone()
}
records: VecDeque<AirtableRecord<T>>,
}
fn set_offset(&mut self, value: Option<String>) {
self.offset = value;
}
// Acts similarly to a `?` operator, but for the result stream. Upon an error,
// it short-circuit returns the error as the final item in the stream.
macro_rules! handle_stream_err {
($fallible:expr, state = $state:expr) => {
match $fallible {
Ok(value) => value,
Err(err) => {
return Some((
Err(ExecutionError::from(err)),
StreamState {
buffered: VecDeque::new(),
started: true,
query: ListRecordsQuery {
offset: None,
..$state.query
},
},
));
}
}
};
fn get_req_builder(&self) -> reqwest::RequestBuilder {
let base_id = utf8_percent_encode(&self.base_id, NON_ALPHANUMERIC).to_string();
let table_id = utf8_percent_encode(&self.table_id, NON_ALPHANUMERIC).to_string();
self.client
.post_path(&format!("v0/{base_id}/{table_id}/listRecords",))
.json(&self)
}
}
impl ListRecordsQuery {
@ -92,63 +77,30 @@ impl ListRecordsQuery {
self,
) -> Pin<Box<impl Stream<Item = Result<AirtableRecord<T>, ExecutionError>>>>
where
T: Clone + DeserializeOwned + Serialize + Unpin,
T: Clone + DeserializeOwned,
{
struct StreamState<T>
where
T: Clone + Serialize + Unpin,
{
buffered: VecDeque<AirtableRecord<T>>,
query: ListRecordsQuery,
started: bool,
}
// Stream has to be pinned to the heap so that the closure inside
// doesn't need to implement Unpin (which I don't think it can).
Box::pin(futures::stream::unfold(
StreamState {
buffered: VecDeque::new(),
query: self.clone(),
started: false,
},
|mut state: StreamState<T>| async move {
if let Some(value) = state.buffered.pop_front() {
// Iterate through a pre-loaded page.
return Some((Ok(value), state));
}
if state.query.offset.is_some() || !state.started {
// Fetch the next page.
state.started = true;
let base_id =
utf8_percent_encode(&state.query.base_id, NON_ALPHANUMERIC).to_string();
let table_id =
utf8_percent_encode(&state.query.table_id, NON_ALPHANUMERIC).to_string();
let http_resp = handle_stream_err!(
handle_stream_err!(
state
.query
.client
.post_path(&format!("v0/{base_id}/{table_id}/listRecords",))
.json(&state.query)
.send()
.await,
state = state
)
.error_for_status(),
state = state
);
let deserialized_resp: ListRecordsResponse<T> =
handle_stream_err!(http_resp.json().await, state = state);
state.buffered = deserialized_resp.records;
state.query.offset = deserialized_resp.offset;
if let Some(value) = state.buffered.pop_front() {
// Yield the first item from the newly fetched page.
return Some((Ok(value), state));
}
}
// No more items buffered and no subsequent page to fetch.
None
},
))
execute_paginated::<AirtableRecord<T>, ListRecordsResponse<T>>(self)
}
}
#[derive(Clone, Deserialize)]
pub struct ListRecordsResponse<T>
where
T: Clone,
{
offset: Option<String>,
records: VecDeque<AirtableRecord<T>>,
}
impl<T> PaginatedResponse<AirtableRecord<T>> for ListRecordsResponse<T>
where
T: Clone + DeserializeOwned,
{
fn get_offset(&self) -> Option<String> {
self.offset.clone()
}
fn get_items(&self) -> VecDeque<AirtableRecord<T>> {
self.records.clone()
}
}

109
ferrtable/src/pagination.rs Normal file
View file

@ -0,0 +1,109 @@
use std::collections::VecDeque;
use std::pin::Pin;
use futures::prelude::*;
use reqwest::RequestBuilder;
use serde::de::DeserializeOwned;
use crate::errors::ExecutionError;
/// An Airtable API request type with cursor-based pagination (in Airtable API
/// parlance, "offset" pagination).
pub(crate) trait PaginatedQuery<T, R>: Clone
where
T: Clone,
R: PaginatedResponse<T>,
{
// TODO: docs
fn get_offset(&self) -> Option<String>;
fn set_offset(&mut self, value: Option<String>);
fn get_req_builder(&self) -> RequestBuilder;
}
pub(crate) trait PaginatedResponse<T>: Clone + DeserializeOwned
where
T: Clone,
{
fn get_offset(&self) -> Option<String>;
fn get_items(&self) -> VecDeque<T>;
}
struct StreamState<Q, T>
where
Q: Clone,
T: Clone,
{
buffered: VecDeque<T>,
query: Q,
started: bool,
}
/// Acts similarly to a `?` operator, but for the result stream. Upon an error,
/// it short-circuit returns the error as the final item in the stream.
macro_rules! handle_stream_err {
($fallible:expr, state = $state:expr) => {
match $fallible {
Ok(value) => value,
Err(err) => {
$state.query.set_offset(None);
return Some((
Err(ExecutionError::from(err)),
StreamState {
buffered: VecDeque::new(),
started: true,
query: $state.query,
},
));
}
}
};
}
// This could be brought into PaginatedQuery as a default implementation, but
// that forces that the traits in this module be exposed outside of the crate
// and additionally results in worse client ergonomics overall.
pub(crate) fn execute_paginated<T, R>(
query: impl PaginatedQuery<T, R>,
) -> Pin<Box<impl Stream<Item = Result<T, ExecutionError>>>>
where
T: Clone,
R: PaginatedResponse<T>,
{
// Stream has to be pinned to the heap so that the closure inside
// doesn't need to implement Unpin (which I don't think it can).
Box::pin(futures::stream::unfold(
StreamState {
buffered: VecDeque::new(),
query,
started: false,
},
|mut state| async move {
if let Some(value) = state.buffered.pop_front() {
// Iterate through a pre-loaded page.
return Some((Ok(value), state));
}
if state.query.get_offset().is_some() || !state.started {
// Fetch the next page.
state.started = true;
let http_resp = handle_stream_err!(
handle_stream_err!(state.query.get_req_builder().send().await, state = state)
.error_for_status(),
state = state
);
let deserialized_resp: R =
handle_stream_err!(http_resp.json().await, state = state);
state.buffered = deserialized_resp.get_items();
state.query.set_offset(deserialized_resp.get_offset());
if let Some(value) = state.buffered.pop_front() {
// Yield the first item from the newly fetched page.
return Some((Ok(value), state));
}
}
// No more items buffered and no subsequent page to fetch.
None
},
))
}