Compare commits
No commits in common. "d715fabeb570641f85d1af653808555b2d3eaee0" and "3c2ef378bcf42100ca1c4d5a0dfef5f6f1f1843f" have entirely different histories.
d715fabeb5
...
3c2ef378bc
11 changed files with 147 additions and 306 deletions
|
|
@ -10,12 +10,3 @@ comment-token = "//"
|
|||
block-comment-tokens = { start = "/*", end = "*/" }
|
||||
language-servers = [ "typescript-language-server", "vscode-eslint-language-server" ]
|
||||
indent = { tab-width = 2, unit = " " }
|
||||
|
||||
[[language]]
|
||||
name = "json"
|
||||
scope = "source.json"
|
||||
injection-regex = "json"
|
||||
file-types = [ "json" ]
|
||||
language-servers = [ "vscode-json-language-server" ]
|
||||
auto-format = false # Prevent formatter from removing newline at end of file
|
||||
indent = { tab-width = 2, unit = " " }
|
||||
|
|
|
|||
48
README.md
48
README.md
|
|
@ -2,24 +2,11 @@
|
|||
|
||||
An async tiny task queue (and related utilities), with zero dependencies. Attq
|
||||
provides a data structure that executes an asynchronous callback sequentially on
|
||||
a flexible list. It is generally designed to facilitate replaying ordered events
|
||||
from a client to a server and comes with bells and whistles including batching,
|
||||
throttling, and configurable retries.
|
||||
a flexible list. It is designed to facilitate in-order client-server message
|
||||
passing with bells and whistles including batching, throttling, and configurable
|
||||
retries. **Work in progress.**
|
||||
|
||||
## Installation
|
||||
|
||||
```sh
|
||||
npm install --save attq
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
### Basic Task Queue
|
||||
|
||||
The `AsyncTaskQueue` constructor takes a task handler callback, and items are
|
||||
added to the queue object with the `.push()` method. Additional options may be
|
||||
configured with methods such as `.onError()`, `.batchSize()`, and
|
||||
`.throttleMs()`.
|
||||
## Example
|
||||
|
||||
```typescript
|
||||
import { AsyncTaskQueue } from "attq";
|
||||
|
|
@ -46,30 +33,3 @@ q.onError(() => {
|
|||
q.drain();
|
||||
});
|
||||
```
|
||||
|
||||
### Retries
|
||||
|
||||
Rather than build retry logic into the queue itself, Attq provides a
|
||||
`withRetry()` higher-order function which can be wrapped around the task
|
||||
handler. (If desired, `withRetry()` may be used independently of the task queue
|
||||
as well!)
|
||||
|
||||
```typescript
|
||||
import { AsyncTaskQueue, withRetry } from "attq";
|
||||
|
||||
let q = new AsyncTaskQueue<number>(
|
||||
// Defaults to 6 attempts with binary exponential backoff.
|
||||
withRetry((nums) => fetch(`/refine?macrodata=${nums.join(",")}`)),
|
||||
);
|
||||
|
||||
// To specify, for example, up to 3 attempts with a linear backoff:
|
||||
q = new AsyncTaskQueue<number>(
|
||||
withRetry(
|
||||
(nums) => fetch(`/refine?macrodata=${nums.join(",")}`),
|
||||
{
|
||||
attempts: 3,
|
||||
backoffMs: (attempt) => 1000 * attempt,
|
||||
},
|
||||
),
|
||||
);
|
||||
```
|
||||
|
|
|
|||
|
|
@ -5,13 +5,8 @@ node = "24"
|
|||
run = "npx tsc --noEmit && npx eslint ./src"
|
||||
|
||||
[tasks.build]
|
||||
run = """
|
||||
rm -r dist || true
|
||||
npx rollup -c
|
||||
"""
|
||||
run = "npx rollup -c"
|
||||
|
||||
[tasks.test]
|
||||
depends = ["lint"]
|
||||
run = "npx jest"
|
||||
|
||||
[tasks.prepublish]
|
||||
depends = ["lint", "test", "build"]
|
||||
|
|
|
|||
4
package-lock.json
generated
4
package-lock.json
generated
|
|
@ -1,12 +1,12 @@
|
|||
{
|
||||
"name": "attq",
|
||||
"version": "0.1.0",
|
||||
"version": "0.1.0-beta.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "attq",
|
||||
"version": "0.1.0",
|
||||
"version": "0.1.0-beta.0",
|
||||
"license": "MIT",
|
||||
"devDependencies": {
|
||||
"@eslint/js": "^9.39.2",
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "attq",
|
||||
"version": "0.1.0",
|
||||
"version": "0.1.0-beta.0",
|
||||
"description": "Async Tiny Task Queue",
|
||||
"repository": {
|
||||
"type": "git",
|
||||
|
|
@ -8,14 +8,12 @@
|
|||
},
|
||||
"license": "MIT",
|
||||
"author": "Brent Schroeter (https://brentsch.com)",
|
||||
"files": ["dist", "LICENSE", "README.md"],
|
||||
"browser": "dist/index.min.js",
|
||||
"main": "dist/index.js",
|
||||
"module": "dist/index.mjs",
|
||||
"types": "dist/index.d.ts",
|
||||
"scripts": {
|
||||
"build": "mise run build",
|
||||
"prepublishOnly": "mise run prepublish",
|
||||
"test": "mise run test"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
|
|
|||
|
|
@ -15,8 +15,5 @@ export default {
|
|||
{ file: pkg.main, format: "cjs" },
|
||||
{ file: pkg.module, format: "es" },
|
||||
],
|
||||
plugins: [typescript({
|
||||
include: ["src/**/*.ts"],
|
||||
exclude: ["**/*.test.ts"],
|
||||
})],
|
||||
plugins: [typescript()],
|
||||
};
|
||||
|
|
|
|||
139
src/index.ts
139
src/index.ts
|
|
@ -1,2 +1,137 @@
|
|||
export { AsyncTaskQueue } from "./task-queue";
|
||||
export { withRetry } from "./retries";
|
||||
/**
|
||||
* Simple queue that executes an asynchronous handler on data in order.
|
||||
*/
|
||||
export class AsyncTaskQueue<T> {
|
||||
private _queue: T[] = [];
|
||||
private readonly _handler: (items: T[]) => Promise<unknown>;
|
||||
private _errorHandler: (err: unknown) => unknown = console.error;
|
||||
private _batchSize = 1;
|
||||
private _throttleIntervalMs = 0;
|
||||
private _processing: boolean = false;
|
||||
|
||||
constructor(handler: (items: T[]) => Promise<unknown>) {
|
||||
this._handler = handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces the error handler for this queue. The error handler is called
|
||||
* whenever the task handler throws.
|
||||
*
|
||||
* This method updates the queue in place and returns it, so method calls may
|
||||
* be chained.
|
||||
*/
|
||||
onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue<T> {
|
||||
this._errorHandler = errorHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets batch size for this queue. Setting this value greater than 1 allows
|
||||
* the queue to pass up to n items to the task handler each time it is called.
|
||||
*
|
||||
* This method updates the queue in place and returns it, so method calls may
|
||||
* be chained.
|
||||
*/
|
||||
batchSize(batchSize: number): AsyncTaskQueue<T> {
|
||||
if (batchSize < 1) {
|
||||
throw new Error("Batch size must be at least 1.");
|
||||
}
|
||||
this._batchSize = batchSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets throttle interval for this queue. Setting this longer than 0
|
||||
* milliseconds will cause the queue to call the task handler at most once
|
||||
* every n milliseconds.
|
||||
*
|
||||
* This method updates the queue in place and returns it, so method calls may
|
||||
* be chained.
|
||||
*/
|
||||
throttleMs(intervalMs: number): AsyncTaskQueue<T> {
|
||||
if (intervalMs < 0) {
|
||||
throw new Error("Interval must be zero or more milliseconds.");
|
||||
}
|
||||
this._throttleIntervalMs = intervalMs;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an item to the back of the queue.
|
||||
*/
|
||||
push(item: T): void {
|
||||
this._queue.push(item);
|
||||
if (!this._processing) {
|
||||
this._processAll().catch(console.error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all unprocessed items and removes them from the queue.
|
||||
*/
|
||||
drain(): T[] {
|
||||
const items = this._queue;
|
||||
this._queue = [];
|
||||
return items;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of unprocessed items in the queue.
|
||||
*/
|
||||
size(): number {
|
||||
return this._queue.length;
|
||||
}
|
||||
|
||||
private async _processAll(): Promise<void> {
|
||||
// Control loop is very simple: a `while` loop in a single async "thread",
|
||||
// which runs until the queue is empty and then waits for
|
||||
// `this._processAll()` to be called again. Because the execution
|
||||
// environment is single-threaded, the `this._processing` property is a
|
||||
// sufficient stand-in for a mutex.
|
||||
if (this._processing) {
|
||||
throw new Error(
|
||||
"Assertion error: attq should never process queue concurrently.",
|
||||
);
|
||||
}
|
||||
this._processing = true;
|
||||
|
||||
while (this._queue.length > 0) {
|
||||
const throttleIntervalMs = this._throttleIntervalMs;
|
||||
const throttleTimeout = throttleIntervalMs
|
||||
? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs))
|
||||
: Promise.resolve();
|
||||
|
||||
const items = this._queue.splice(0, this._batchSize);
|
||||
await this._handler(items).catch(this._errorHandler);
|
||||
|
||||
await throttleTimeout;
|
||||
}
|
||||
|
||||
this._processing = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an arbitrary async function with retry logic for exceptions.
|
||||
*/
|
||||
export function withRetry<T extends unknown[], U>(
|
||||
fn: (...args: T) => Promise<U>,
|
||||
{ attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: {
|
||||
attempts: number;
|
||||
backoffMs(attempt: number): number;
|
||||
},
|
||||
): (...args: T) => Promise<U> {
|
||||
return async (...args: T) => {
|
||||
for (let attempt = 0; attempt < attempts - 1; attempt += 1) {
|
||||
try {
|
||||
return await fn(...args);
|
||||
} catch {
|
||||
const delay = backoffMs(attempt);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
}
|
||||
}
|
||||
// Preferably run the final attempt without a try/catch block so that thrown
|
||||
// exceptions get a clean stack trace.
|
||||
return await fn(...args);
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,99 +0,0 @@
|
|||
import { withRetry } from "./retries";
|
||||
|
||||
const FAST_RETRY_CONFIG: Parameters<typeof withRetry>[1] = {
|
||||
attempts: 6,
|
||||
// Always retry almost immediately, even when using real timers.
|
||||
backoffMs: () => 1,
|
||||
};
|
||||
|
||||
test("successful function only executes once", async () => {
|
||||
const fn = jest.fn(async () => "success!");
|
||||
const result = await withRetry(fn, FAST_RETRY_CONFIG)();
|
||||
expect(result).toEqual("success!");
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test("failing function rejects after correct number of attempts", async () => {
|
||||
const fn = jest.fn(async () => {
|
||||
throw "oops!";
|
||||
});
|
||||
await expect(withRetry(fn, FAST_RETRY_CONFIG)()).rejects.toEqual("oops!");
|
||||
expect(fn).toHaveBeenCalledTimes(FAST_RETRY_CONFIG.attempts);
|
||||
});
|
||||
|
||||
test("parameters are passed through", async () => {
|
||||
const fn = async (str: string, n: number) => {
|
||||
let result = "";
|
||||
for (let i = 0; i < n; i += 1) {
|
||||
result += str;
|
||||
}
|
||||
return result;
|
||||
};
|
||||
const retriable = withRetry(fn, FAST_RETRY_CONFIG);
|
||||
|
||||
// A bit of type system hackery to assert at build time that function
|
||||
// signatures match.
|
||||
/* eslint-disable @typescript-eslint/no-unused-vars */
|
||||
type Assert<_T extends true> = void;
|
||||
type _ = Assert<typeof retriable extends typeof fn ? true : false>;
|
||||
/* eslint-enable @typescript-eslint/no-unused-vars */
|
||||
|
||||
await expect(retriable("hello;", 3)).resolves.toEqual("hello;hello;hello;");
|
||||
});
|
||||
|
||||
test("retries back off with expected delays", async () => {
|
||||
jest.useFakeTimers();
|
||||
|
||||
let n = 0;
|
||||
const fn = jest.fn(async () => {
|
||||
if (n < 5) {
|
||||
n += 1;
|
||||
throw "oops!";
|
||||
} else {
|
||||
// Fake timers don't play nicely with the `expect().rejects` API, so
|
||||
// ensure that the last attempt succeeds.
|
||||
return "success!";
|
||||
}
|
||||
});
|
||||
|
||||
const promise = withRetry(
|
||||
fn,
|
||||
{
|
||||
attempts: 6,
|
||||
backoffMs: (attempt) => 1000 * 2 ** attempt,
|
||||
},
|
||||
)();
|
||||
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
await jest.advanceTimersByTimeAsync(100);
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(1000);
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
await jest.advanceTimersByTimeAsync(100);
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(2000);
|
||||
expect(fn).toHaveBeenCalledTimes(3);
|
||||
await jest.advanceTimersByTimeAsync(100);
|
||||
expect(fn).toHaveBeenCalledTimes(3);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(4000);
|
||||
expect(fn).toHaveBeenCalledTimes(4);
|
||||
await jest.advanceTimersByTimeAsync(100);
|
||||
expect(fn).toHaveBeenCalledTimes(4);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(8000);
|
||||
expect(fn).toHaveBeenCalledTimes(5);
|
||||
await jest.advanceTimersByTimeAsync(100);
|
||||
expect(fn).toHaveBeenCalledTimes(5);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(16000);
|
||||
expect(fn).toHaveBeenCalledTimes(6);
|
||||
|
||||
await expect(promise).resolves.toEqual("success!")
|
||||
|
||||
// Fake timers don't play nicely with the `expect().rejects` API, so disable
|
||||
// them again for other tests.
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
|
@ -1,24 +0,0 @@
|
|||
/**
|
||||
* Wraps an arbitrary async function with retry logic for exceptions.
|
||||
*/
|
||||
export function withRetry<T extends unknown[], U>(
|
||||
fn: (...args: T) => Promise<U>,
|
||||
{ attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: {
|
||||
attempts: number;
|
||||
backoffMs(attempt: number): number;
|
||||
},
|
||||
): (...args: T) => Promise<U> {
|
||||
return async (...args: T) => {
|
||||
for (let attempt = 0; attempt < attempts - 1; attempt += 1) {
|
||||
try {
|
||||
return await fn(...args);
|
||||
} catch {
|
||||
const delay = backoffMs(attempt);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
}
|
||||
}
|
||||
// Preferably run the final attempt without a try/catch block so that thrown
|
||||
// exceptions get a clean stack trace.
|
||||
return await fn(...args);
|
||||
};
|
||||
}
|
||||
|
|
@ -1,112 +0,0 @@
|
|||
/**
|
||||
* Simple queue that executes an asynchronous handler on data in order.
|
||||
*/
|
||||
export class AsyncTaskQueue<T> {
|
||||
private _queue: T[] = [];
|
||||
private readonly _handler: (items: T[]) => Promise<unknown>;
|
||||
private _errorHandler: (err: unknown) => unknown = console.error;
|
||||
private _batchSize = 1;
|
||||
private _throttleIntervalMs = 0;
|
||||
private _processing: boolean = false;
|
||||
|
||||
constructor(handler: (items: T[]) => Promise<unknown>) {
|
||||
this._handler = handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces the error handler for this queue. The error handler is called
|
||||
* whenever the task handler throws.
|
||||
*
|
||||
* This method updates the queue in place and returns it, so method calls may
|
||||
* be chained.
|
||||
*/
|
||||
onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue<T> {
|
||||
this._errorHandler = errorHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets batch size for this queue. Setting this value greater than 1 allows
|
||||
* the queue to pass up to n items to the task handler each time it is called.
|
||||
*
|
||||
* This method updates the queue in place and returns it, so method calls may
|
||||
* be chained.
|
||||
*/
|
||||
batchSize(batchSize: number): AsyncTaskQueue<T> {
|
||||
if (batchSize < 1) {
|
||||
throw new Error("Batch size must be at least 1.");
|
||||
}
|
||||
this._batchSize = batchSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets throttle interval for this queue. Setting this longer than 0
|
||||
* milliseconds will cause the queue to call the task handler at most once
|
||||
* every n milliseconds.
|
||||
*
|
||||
* This method updates the queue in place and returns it, so method calls may
|
||||
* be chained.
|
||||
*/
|
||||
throttleMs(intervalMs: number): AsyncTaskQueue<T> {
|
||||
if (intervalMs < 0) {
|
||||
throw new Error("Interval must be zero or more milliseconds.");
|
||||
}
|
||||
this._throttleIntervalMs = intervalMs;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an item to the back of the queue.
|
||||
*/
|
||||
push(item: T): void {
|
||||
this._queue.push(item);
|
||||
if (!this._processing) {
|
||||
this._processAll().catch(console.error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all unprocessed items and removes them from the queue.
|
||||
*/
|
||||
drain(): T[] {
|
||||
const items = this._queue;
|
||||
this._queue = [];
|
||||
return items;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of unprocessed items in the queue.
|
||||
*/
|
||||
size(): number {
|
||||
return this._queue.length;
|
||||
}
|
||||
|
||||
private async _processAll(): Promise<void> {
|
||||
// Control loop is very simple: a `while` loop in a single async "thread",
|
||||
// which runs until the queue is empty and then waits for
|
||||
// `this._processAll()` to be called again. Because the execution
|
||||
// environment is single-threaded, the `this._processing` property is a
|
||||
// sufficient stand-in for a mutex.
|
||||
if (this._processing) {
|
||||
throw new Error(
|
||||
"Assertion error: attq should never process queue concurrently.",
|
||||
);
|
||||
}
|
||||
this._processing = true;
|
||||
|
||||
while (this._queue.length > 0) {
|
||||
const throttleIntervalMs = this._throttleIntervalMs;
|
||||
const throttleTimeout = throttleIntervalMs
|
||||
? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs))
|
||||
: Promise.resolve();
|
||||
|
||||
const items = this._queue.splice(0, this._batchSize);
|
||||
await this._handler(items).catch(this._errorHandler);
|
||||
|
||||
await throttleTimeout;
|
||||
}
|
||||
|
||||
this._processing = false;
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue