Compare commits

..

No commits in common. "d715fabeb570641f85d1af653808555b2d3eaee0" and "3c2ef378bcf42100ca1c4d5a0dfef5f6f1f1843f" have entirely different histories.

11 changed files with 147 additions and 306 deletions

View file

@ -10,12 +10,3 @@ comment-token = "//"
block-comment-tokens = { start = "/*", end = "*/" } block-comment-tokens = { start = "/*", end = "*/" }
language-servers = [ "typescript-language-server", "vscode-eslint-language-server" ] language-servers = [ "typescript-language-server", "vscode-eslint-language-server" ]
indent = { tab-width = 2, unit = " " } indent = { tab-width = 2, unit = " " }
[[language]]
name = "json"
scope = "source.json"
injection-regex = "json"
file-types = [ "json" ]
language-servers = [ "vscode-json-language-server" ]
auto-format = false # Prevent formatter from removing newline at end of file
indent = { tab-width = 2, unit = " " }

View file

@ -2,24 +2,11 @@
An async tiny task queue (and related utilities), with zero dependencies. Attq An async tiny task queue (and related utilities), with zero dependencies. Attq
provides a data structure that executes an asynchronous callback sequentially on provides a data structure that executes an asynchronous callback sequentially on
a flexible list. It is generally designed to facilitate replaying ordered events a flexible list. It is designed to facilitate in-order client-server message
from a client to a server and comes with bells and whistles including batching, passing with bells and whistles including batching, throttling, and configurable
throttling, and configurable retries. retries. **Work in progress.**
## Installation ## Example
```sh
npm install --save attq
```
## Examples
### Basic Task Queue
The `AsyncTaskQueue` constructor takes a task handler callback, and items are
added to the queue object with the `.push()` method. Additional options may be
configured with methods such as `.onError()`, `.batchSize()`, and
`.throttleMs()`.
```typescript ```typescript
import { AsyncTaskQueue } from "attq"; import { AsyncTaskQueue } from "attq";
@ -46,30 +33,3 @@ q.onError(() => {
q.drain(); q.drain();
}); });
``` ```
### Retries
Rather than build retry logic into the queue itself, Attq provides a
`withRetry()` higher-order function which can be wrapped around the task
handler. (If desired, `withRetry()` may be used independently of the task queue
as well!)
```typescript
import { AsyncTaskQueue, withRetry } from "attq";
let q = new AsyncTaskQueue<number>(
// Defaults to 6 attempts with binary exponential backoff.
withRetry((nums) => fetch(`/refine?macrodata=${nums.join(",")}`)),
);
// To specify, for example, up to 3 attempts with a linear backoff:
q = new AsyncTaskQueue<number>(
withRetry(
(nums) => fetch(`/refine?macrodata=${nums.join(",")}`),
{
attempts: 3,
backoffMs: (attempt) => 1000 * attempt,
},
),
);
```

View file

@ -5,13 +5,8 @@ node = "24"
run = "npx tsc --noEmit && npx eslint ./src" run = "npx tsc --noEmit && npx eslint ./src"
[tasks.build] [tasks.build]
run = """ run = "npx rollup -c"
rm -r dist || true
npx rollup -c
"""
[tasks.test] [tasks.test]
depends = ["lint"]
run = "npx jest" run = "npx jest"
[tasks.prepublish]
depends = ["lint", "test", "build"]

4
package-lock.json generated
View file

@ -1,12 +1,12 @@
{ {
"name": "attq", "name": "attq",
"version": "0.1.0", "version": "0.1.0-beta.0",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "attq", "name": "attq",
"version": "0.1.0", "version": "0.1.0-beta.0",
"license": "MIT", "license": "MIT",
"devDependencies": { "devDependencies": {
"@eslint/js": "^9.39.2", "@eslint/js": "^9.39.2",

View file

@ -1,6 +1,6 @@
{ {
"name": "attq", "name": "attq",
"version": "0.1.0", "version": "0.1.0-beta.0",
"description": "Async Tiny Task Queue", "description": "Async Tiny Task Queue",
"repository": { "repository": {
"type": "git", "type": "git",
@ -8,14 +8,12 @@
}, },
"license": "MIT", "license": "MIT",
"author": "Brent Schroeter (https://brentsch.com)", "author": "Brent Schroeter (https://brentsch.com)",
"files": ["dist", "LICENSE", "README.md"],
"browser": "dist/index.min.js", "browser": "dist/index.min.js",
"main": "dist/index.js", "main": "dist/index.js",
"module": "dist/index.mjs", "module": "dist/index.mjs",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",
"scripts": { "scripts": {
"build": "mise run build", "build": "mise run build",
"prepublishOnly": "mise run prepublish",
"test": "mise run test" "test": "mise run test"
}, },
"devDependencies": { "devDependencies": {

View file

@ -15,8 +15,5 @@ export default {
{ file: pkg.main, format: "cjs" }, { file: pkg.main, format: "cjs" },
{ file: pkg.module, format: "es" }, { file: pkg.module, format: "es" },
], ],
plugins: [typescript({ plugins: [typescript()],
include: ["src/**/*.ts"],
exclude: ["**/*.test.ts"],
})],
}; };

View file

@ -1,2 +1,137 @@
export { AsyncTaskQueue } from "./task-queue"; /**
export { withRetry } from "./retries"; * Simple queue that executes an asynchronous handler on data in order.
*/
export class AsyncTaskQueue<T> {
private _queue: T[] = [];
private readonly _handler: (items: T[]) => Promise<unknown>;
private _errorHandler: (err: unknown) => unknown = console.error;
private _batchSize = 1;
private _throttleIntervalMs = 0;
private _processing: boolean = false;
constructor(handler: (items: T[]) => Promise<unknown>) {
this._handler = handler;
}
/**
* Replaces the error handler for this queue. The error handler is called
* whenever the task handler throws.
*
* This method updates the queue in place and returns it, so method calls may
* be chained.
*/
onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue<T> {
this._errorHandler = errorHandler;
return this;
}
/**
* Sets batch size for this queue. Setting this value greater than 1 allows
* the queue to pass up to n items to the task handler each time it is called.
*
* This method updates the queue in place and returns it, so method calls may
* be chained.
*/
batchSize(batchSize: number): AsyncTaskQueue<T> {
if (batchSize < 1) {
throw new Error("Batch size must be at least 1.");
}
this._batchSize = batchSize;
return this;
}
/**
* Sets throttle interval for this queue. Setting this longer than 0
* milliseconds will cause the queue to call the task handler at most once
* every n milliseconds.
*
* This method updates the queue in place and returns it, so method calls may
* be chained.
*/
throttleMs(intervalMs: number): AsyncTaskQueue<T> {
if (intervalMs < 0) {
throw new Error("Interval must be zero or more milliseconds.");
}
this._throttleIntervalMs = intervalMs;
return this;
}
/**
* Adds an item to the back of the queue.
*/
push(item: T): void {
this._queue.push(item);
if (!this._processing) {
this._processAll().catch(console.error);
}
}
/**
* Returns all unprocessed items and removes them from the queue.
*/
drain(): T[] {
const items = this._queue;
this._queue = [];
return items;
}
/**
* Returns the number of unprocessed items in the queue.
*/
size(): number {
return this._queue.length;
}
private async _processAll(): Promise<void> {
// Control loop is very simple: a `while` loop in a single async "thread",
// which runs until the queue is empty and then waits for
// `this._processAll()` to be called again. Because the execution
// environment is single-threaded, the `this._processing` property is a
// sufficient stand-in for a mutex.
if (this._processing) {
throw new Error(
"Assertion error: attq should never process queue concurrently.",
);
}
this._processing = true;
while (this._queue.length > 0) {
const throttleIntervalMs = this._throttleIntervalMs;
const throttleTimeout = throttleIntervalMs
? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs))
: Promise.resolve();
const items = this._queue.splice(0, this._batchSize);
await this._handler(items).catch(this._errorHandler);
await throttleTimeout;
}
this._processing = false;
}
}
/**
* Wraps an arbitrary async function with retry logic for exceptions.
*/
export function withRetry<T extends unknown[], U>(
fn: (...args: T) => Promise<U>,
{ attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: {
attempts: number;
backoffMs(attempt: number): number;
},
): (...args: T) => Promise<U> {
return async (...args: T) => {
for (let attempt = 0; attempt < attempts - 1; attempt += 1) {
try {
return await fn(...args);
} catch {
const delay = backoffMs(attempt);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
// Preferably run the final attempt without a try/catch block so that thrown
// exceptions get a clean stack trace.
return await fn(...args);
};
}

View file

@ -1,99 +0,0 @@
import { withRetry } from "./retries";
const FAST_RETRY_CONFIG: Parameters<typeof withRetry>[1] = {
attempts: 6,
// Always retry almost immediately, even when using real timers.
backoffMs: () => 1,
};
test("successful function only executes once", async () => {
const fn = jest.fn(async () => "success!");
const result = await withRetry(fn, FAST_RETRY_CONFIG)();
expect(result).toEqual("success!");
expect(fn).toHaveBeenCalledTimes(1);
});
test("failing function rejects after correct number of attempts", async () => {
const fn = jest.fn(async () => {
throw "oops!";
});
await expect(withRetry(fn, FAST_RETRY_CONFIG)()).rejects.toEqual("oops!");
expect(fn).toHaveBeenCalledTimes(FAST_RETRY_CONFIG.attempts);
});
test("parameters are passed through", async () => {
const fn = async (str: string, n: number) => {
let result = "";
for (let i = 0; i < n; i += 1) {
result += str;
}
return result;
};
const retriable = withRetry(fn, FAST_RETRY_CONFIG);
// A bit of type system hackery to assert at build time that function
// signatures match.
/* eslint-disable @typescript-eslint/no-unused-vars */
type Assert<_T extends true> = void;
type _ = Assert<typeof retriable extends typeof fn ? true : false>;
/* eslint-enable @typescript-eslint/no-unused-vars */
await expect(retriable("hello;", 3)).resolves.toEqual("hello;hello;hello;");
});
test("retries back off with expected delays", async () => {
jest.useFakeTimers();
let n = 0;
const fn = jest.fn(async () => {
if (n < 5) {
n += 1;
throw "oops!";
} else {
// Fake timers don't play nicely with the `expect().rejects` API, so
// ensure that the last attempt succeeds.
return "success!";
}
});
const promise = withRetry(
fn,
{
attempts: 6,
backoffMs: (attempt) => 1000 * 2 ** attempt,
},
)();
expect(fn).toHaveBeenCalledTimes(1);
await jest.advanceTimersByTimeAsync(100);
expect(fn).toHaveBeenCalledTimes(1);
await jest.advanceTimersByTimeAsync(1000);
expect(fn).toHaveBeenCalledTimes(2);
await jest.advanceTimersByTimeAsync(100);
expect(fn).toHaveBeenCalledTimes(2);
await jest.advanceTimersByTimeAsync(2000);
expect(fn).toHaveBeenCalledTimes(3);
await jest.advanceTimersByTimeAsync(100);
expect(fn).toHaveBeenCalledTimes(3);
await jest.advanceTimersByTimeAsync(4000);
expect(fn).toHaveBeenCalledTimes(4);
await jest.advanceTimersByTimeAsync(100);
expect(fn).toHaveBeenCalledTimes(4);
await jest.advanceTimersByTimeAsync(8000);
expect(fn).toHaveBeenCalledTimes(5);
await jest.advanceTimersByTimeAsync(100);
expect(fn).toHaveBeenCalledTimes(5);
await jest.advanceTimersByTimeAsync(16000);
expect(fn).toHaveBeenCalledTimes(6);
await expect(promise).resolves.toEqual("success!")
// Fake timers don't play nicely with the `expect().rejects` API, so disable
// them again for other tests.
jest.useRealTimers();
});

View file

@ -1,24 +0,0 @@
/**
* Wraps an arbitrary async function with retry logic for exceptions.
*/
export function withRetry<T extends unknown[], U>(
fn: (...args: T) => Promise<U>,
{ attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: {
attempts: number;
backoffMs(attempt: number): number;
},
): (...args: T) => Promise<U> {
return async (...args: T) => {
for (let attempt = 0; attempt < attempts - 1; attempt += 1) {
try {
return await fn(...args);
} catch {
const delay = backoffMs(attempt);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
// Preferably run the final attempt without a try/catch block so that thrown
// exceptions get a clean stack trace.
return await fn(...args);
};
}

View file

@ -1,112 +0,0 @@
/**
* Simple queue that executes an asynchronous handler on data in order.
*/
export class AsyncTaskQueue<T> {
private _queue: T[] = [];
private readonly _handler: (items: T[]) => Promise<unknown>;
private _errorHandler: (err: unknown) => unknown = console.error;
private _batchSize = 1;
private _throttleIntervalMs = 0;
private _processing: boolean = false;
constructor(handler: (items: T[]) => Promise<unknown>) {
this._handler = handler;
}
/**
* Replaces the error handler for this queue. The error handler is called
* whenever the task handler throws.
*
* This method updates the queue in place and returns it, so method calls may
* be chained.
*/
onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue<T> {
this._errorHandler = errorHandler;
return this;
}
/**
* Sets batch size for this queue. Setting this value greater than 1 allows
* the queue to pass up to n items to the task handler each time it is called.
*
* This method updates the queue in place and returns it, so method calls may
* be chained.
*/
batchSize(batchSize: number): AsyncTaskQueue<T> {
if (batchSize < 1) {
throw new Error("Batch size must be at least 1.");
}
this._batchSize = batchSize;
return this;
}
/**
* Sets throttle interval for this queue. Setting this longer than 0
* milliseconds will cause the queue to call the task handler at most once
* every n milliseconds.
*
* This method updates the queue in place and returns it, so method calls may
* be chained.
*/
throttleMs(intervalMs: number): AsyncTaskQueue<T> {
if (intervalMs < 0) {
throw new Error("Interval must be zero or more milliseconds.");
}
this._throttleIntervalMs = intervalMs;
return this;
}
/**
* Adds an item to the back of the queue.
*/
push(item: T): void {
this._queue.push(item);
if (!this._processing) {
this._processAll().catch(console.error);
}
}
/**
* Returns all unprocessed items and removes them from the queue.
*/
drain(): T[] {
const items = this._queue;
this._queue = [];
return items;
}
/**
* Returns the number of unprocessed items in the queue.
*/
size(): number {
return this._queue.length;
}
private async _processAll(): Promise<void> {
// Control loop is very simple: a `while` loop in a single async "thread",
// which runs until the queue is empty and then waits for
// `this._processAll()` to be called again. Because the execution
// environment is single-threaded, the `this._processing` property is a
// sufficient stand-in for a mutex.
if (this._processing) {
throw new Error(
"Assertion error: attq should never process queue concurrently.",
);
}
this._processing = true;
while (this._queue.length > 0) {
const throttleIntervalMs = this._throttleIntervalMs;
const throttleTimeout = throttleIntervalMs
? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs))
: Promise.resolve();
const items = this._queue.splice(0, this._batchSize);
await this._handler(items).catch(this._errorHandler);
await throttleTimeout;
}
this._processing = false;
}
}