diff --git a/src/index.ts b/src/index.ts index 90b8bd4..6199d10 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,137 +1,2 @@ -/** - * Simple queue that executes an asynchronous handler on data in order. - */ -export class AsyncTaskQueue { - private _queue: T[] = []; - private readonly _handler: (items: T[]) => Promise; - private _errorHandler: (err: unknown) => unknown = console.error; - private _batchSize = 1; - private _throttleIntervalMs = 0; - private _processing: boolean = false; - - constructor(handler: (items: T[]) => Promise) { - this._handler = handler; - } - - /** - * Replaces the error handler for this queue. The error handler is called - * whenever the task handler throws. - * - * This method updates the queue in place and returns it, so method calls may - * be chained. - */ - onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue { - this._errorHandler = errorHandler; - return this; - } - - /** - * Sets batch size for this queue. Setting this value greater than 1 allows - * the queue to pass up to n items to the task handler each time it is called. - * - * This method updates the queue in place and returns it, so method calls may - * be chained. - */ - batchSize(batchSize: number): AsyncTaskQueue { - if (batchSize < 1) { - throw new Error("Batch size must be at least 1."); - } - this._batchSize = batchSize; - return this; - } - - /** - * Sets throttle interval for this queue. Setting this longer than 0 - * milliseconds will cause the queue to call the task handler at most once - * every n milliseconds. - * - * This method updates the queue in place and returns it, so method calls may - * be chained. - */ - throttleMs(intervalMs: number): AsyncTaskQueue { - if (intervalMs < 0) { - throw new Error("Interval must be zero or more milliseconds."); - } - this._throttleIntervalMs = intervalMs; - return this; - } - - /** - * Adds an item to the back of the queue. - */ - push(item: T): void { - this._queue.push(item); - if (!this._processing) { - this._processAll().catch(console.error); - } - } - - /** - * Returns all unprocessed items and removes them from the queue. - */ - drain(): T[] { - const items = this._queue; - this._queue = []; - return items; - } - - /** - * Returns the number of unprocessed items in the queue. - */ - size(): number { - return this._queue.length; - } - - private async _processAll(): Promise { - // Control loop is very simple: a `while` loop in a single async "thread", - // which runs until the queue is empty and then waits for - // `this._processAll()` to be called again. Because the execution - // environment is single-threaded, the `this._processing` property is a - // sufficient stand-in for a mutex. - if (this._processing) { - throw new Error( - "Assertion error: attq should never process queue concurrently.", - ); - } - this._processing = true; - - while (this._queue.length > 0) { - const throttleIntervalMs = this._throttleIntervalMs; - const throttleTimeout = throttleIntervalMs - ? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs)) - : Promise.resolve(); - - const items = this._queue.splice(0, this._batchSize); - await this._handler(items).catch(this._errorHandler); - - await throttleTimeout; - } - - this._processing = false; - } -} - -/** - * Wraps an arbitrary async function with retry logic for exceptions. - */ -export function withRetry( - fn: (...args: T) => Promise, - { attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: { - attempts: number; - backoffMs(attempt: number): number; - }, -): (...args: T) => Promise { - return async (...args: T) => { - for (let attempt = 0; attempt < attempts - 1; attempt += 1) { - try { - return await fn(...args); - } catch { - const delay = backoffMs(attempt); - await new Promise((resolve) => setTimeout(resolve, delay)); - } - } - // Preferably run the final attempt without a try/catch block so that thrown - // exceptions get a clean stack trace. - return await fn(...args); - }; -} +export { AsyncTaskQueue } from "./task-queue"; +export { withRetry } from "./retries"; diff --git a/src/retries.ts b/src/retries.ts new file mode 100644 index 0000000..07c9a6c --- /dev/null +++ b/src/retries.ts @@ -0,0 +1,24 @@ +/** + * Wraps an arbitrary async function with retry logic for exceptions. + */ +export function withRetry( + fn: (...args: T) => Promise, + { attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: { + attempts: number; + backoffMs(attempt: number): number; + }, +): (...args: T) => Promise { + return async (...args: T) => { + for (let attempt = 0; attempt < attempts - 1; attempt += 1) { + try { + return await fn(...args); + } catch { + const delay = backoffMs(attempt); + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + // Preferably run the final attempt without a try/catch block so that thrown + // exceptions get a clean stack trace. + return await fn(...args); + }; +} diff --git a/src/index.test.ts b/src/task-queue.test.ts similarity index 100% rename from src/index.test.ts rename to src/task-queue.test.ts diff --git a/src/task-queue.ts b/src/task-queue.ts new file mode 100644 index 0000000..deaf76f --- /dev/null +++ b/src/task-queue.ts @@ -0,0 +1,112 @@ +/** + * Simple queue that executes an asynchronous handler on data in order. + */ +export class AsyncTaskQueue { + private _queue: T[] = []; + private readonly _handler: (items: T[]) => Promise; + private _errorHandler: (err: unknown) => unknown = console.error; + private _batchSize = 1; + private _throttleIntervalMs = 0; + private _processing: boolean = false; + + constructor(handler: (items: T[]) => Promise) { + this._handler = handler; + } + + /** + * Replaces the error handler for this queue. The error handler is called + * whenever the task handler throws. + * + * This method updates the queue in place and returns it, so method calls may + * be chained. + */ + onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue { + this._errorHandler = errorHandler; + return this; + } + + /** + * Sets batch size for this queue. Setting this value greater than 1 allows + * the queue to pass up to n items to the task handler each time it is called. + * + * This method updates the queue in place and returns it, so method calls may + * be chained. + */ + batchSize(batchSize: number): AsyncTaskQueue { + if (batchSize < 1) { + throw new Error("Batch size must be at least 1."); + } + this._batchSize = batchSize; + return this; + } + + /** + * Sets throttle interval for this queue. Setting this longer than 0 + * milliseconds will cause the queue to call the task handler at most once + * every n milliseconds. + * + * This method updates the queue in place and returns it, so method calls may + * be chained. + */ + throttleMs(intervalMs: number): AsyncTaskQueue { + if (intervalMs < 0) { + throw new Error("Interval must be zero or more milliseconds."); + } + this._throttleIntervalMs = intervalMs; + return this; + } + + /** + * Adds an item to the back of the queue. + */ + push(item: T): void { + this._queue.push(item); + if (!this._processing) { + this._processAll().catch(console.error); + } + } + + /** + * Returns all unprocessed items and removes them from the queue. + */ + drain(): T[] { + const items = this._queue; + this._queue = []; + return items; + } + + /** + * Returns the number of unprocessed items in the queue. + */ + size(): number { + return this._queue.length; + } + + private async _processAll(): Promise { + // Control loop is very simple: a `while` loop in a single async "thread", + // which runs until the queue is empty and then waits for + // `this._processAll()` to be called again. Because the execution + // environment is single-threaded, the `this._processing` property is a + // sufficient stand-in for a mutex. + if (this._processing) { + throw new Error( + "Assertion error: attq should never process queue concurrently.", + ); + } + this._processing = true; + + while (this._queue.length > 0) { + const throttleIntervalMs = this._throttleIntervalMs; + const throttleTimeout = throttleIntervalMs + ? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs)) + : Promise.resolve(); + + const items = this._queue.splice(0, this._batchSize); + await this._handler(items).catch(this._errorHandler); + + await throttleTimeout; + } + + this._processing = false; + } +}