Compare commits
6 commits
3c2ef378bc
...
d715fabeb5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d715fabeb5 | ||
|
|
0f8ffd285d | ||
|
|
f5901f9111 | ||
|
|
582284a36b | ||
|
|
e4738ac41c | ||
|
|
fed6337a7e |
11 changed files with 306 additions and 147 deletions
|
|
@ -10,3 +10,12 @@ comment-token = "//"
|
||||||
block-comment-tokens = { start = "/*", end = "*/" }
|
block-comment-tokens = { start = "/*", end = "*/" }
|
||||||
language-servers = [ "typescript-language-server", "vscode-eslint-language-server" ]
|
language-servers = [ "typescript-language-server", "vscode-eslint-language-server" ]
|
||||||
indent = { tab-width = 2, unit = " " }
|
indent = { tab-width = 2, unit = " " }
|
||||||
|
|
||||||
|
[[language]]
|
||||||
|
name = "json"
|
||||||
|
scope = "source.json"
|
||||||
|
injection-regex = "json"
|
||||||
|
file-types = [ "json" ]
|
||||||
|
language-servers = [ "vscode-json-language-server" ]
|
||||||
|
auto-format = false # Prevent formatter from removing newline at end of file
|
||||||
|
indent = { tab-width = 2, unit = " " }
|
||||||
|
|
|
||||||
48
README.md
48
README.md
|
|
@ -2,11 +2,24 @@
|
||||||
|
|
||||||
An async tiny task queue (and related utilities), with zero dependencies. Attq
|
An async tiny task queue (and related utilities), with zero dependencies. Attq
|
||||||
provides a data structure that executes an asynchronous callback sequentially on
|
provides a data structure that executes an asynchronous callback sequentially on
|
||||||
a flexible list. It is designed to facilitate in-order client-server message
|
a flexible list. It is generally designed to facilitate replaying ordered events
|
||||||
passing with bells and whistles including batching, throttling, and configurable
|
from a client to a server and comes with bells and whistles including batching,
|
||||||
retries. **Work in progress.**
|
throttling, and configurable retries.
|
||||||
|
|
||||||
## Example
|
## Installation
|
||||||
|
|
||||||
|
```sh
|
||||||
|
npm install --save attq
|
||||||
|
```
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
### Basic Task Queue
|
||||||
|
|
||||||
|
The `AsyncTaskQueue` constructor takes a task handler callback, and items are
|
||||||
|
added to the queue object with the `.push()` method. Additional options may be
|
||||||
|
configured with methods such as `.onError()`, `.batchSize()`, and
|
||||||
|
`.throttleMs()`.
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
import { AsyncTaskQueue } from "attq";
|
import { AsyncTaskQueue } from "attq";
|
||||||
|
|
@ -33,3 +46,30 @@ q.onError(() => {
|
||||||
q.drain();
|
q.drain();
|
||||||
});
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Retries
|
||||||
|
|
||||||
|
Rather than build retry logic into the queue itself, Attq provides a
|
||||||
|
`withRetry()` higher-order function which can be wrapped around the task
|
||||||
|
handler. (If desired, `withRetry()` may be used independently of the task queue
|
||||||
|
as well!)
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { AsyncTaskQueue, withRetry } from "attq";
|
||||||
|
|
||||||
|
let q = new AsyncTaskQueue<number>(
|
||||||
|
// Defaults to 6 attempts with binary exponential backoff.
|
||||||
|
withRetry((nums) => fetch(`/refine?macrodata=${nums.join(",")}`)),
|
||||||
|
);
|
||||||
|
|
||||||
|
// To specify, for example, up to 3 attempts with a linear backoff:
|
||||||
|
q = new AsyncTaskQueue<number>(
|
||||||
|
withRetry(
|
||||||
|
(nums) => fetch(`/refine?macrodata=${nums.join(",")}`),
|
||||||
|
{
|
||||||
|
attempts: 3,
|
||||||
|
backoffMs: (attempt) => 1000 * attempt,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
);
|
||||||
|
```
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,13 @@ node = "24"
|
||||||
run = "npx tsc --noEmit && npx eslint ./src"
|
run = "npx tsc --noEmit && npx eslint ./src"
|
||||||
|
|
||||||
[tasks.build]
|
[tasks.build]
|
||||||
run = "npx rollup -c"
|
run = """
|
||||||
|
rm -r dist || true
|
||||||
|
npx rollup -c
|
||||||
|
"""
|
||||||
|
|
||||||
[tasks.test]
|
[tasks.test]
|
||||||
depends = ["lint"]
|
|
||||||
run = "npx jest"
|
run = "npx jest"
|
||||||
|
|
||||||
|
[tasks.prepublish]
|
||||||
|
depends = ["lint", "test", "build"]
|
||||||
|
|
|
||||||
4
package-lock.json
generated
4
package-lock.json
generated
|
|
@ -1,12 +1,12 @@
|
||||||
{
|
{
|
||||||
"name": "attq",
|
"name": "attq",
|
||||||
"version": "0.1.0-beta.0",
|
"version": "0.1.0",
|
||||||
"lockfileVersion": 3,
|
"lockfileVersion": 3,
|
||||||
"requires": true,
|
"requires": true,
|
||||||
"packages": {
|
"packages": {
|
||||||
"": {
|
"": {
|
||||||
"name": "attq",
|
"name": "attq",
|
||||||
"version": "0.1.0-beta.0",
|
"version": "0.1.0",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@eslint/js": "^9.39.2",
|
"@eslint/js": "^9.39.2",
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "attq",
|
"name": "attq",
|
||||||
"version": "0.1.0-beta.0",
|
"version": "0.1.0",
|
||||||
"description": "Async Tiny Task Queue",
|
"description": "Async Tiny Task Queue",
|
||||||
"repository": {
|
"repository": {
|
||||||
"type": "git",
|
"type": "git",
|
||||||
|
|
@ -8,12 +8,14 @@
|
||||||
},
|
},
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"author": "Brent Schroeter (https://brentsch.com)",
|
"author": "Brent Schroeter (https://brentsch.com)",
|
||||||
|
"files": ["dist", "LICENSE", "README.md"],
|
||||||
"browser": "dist/index.min.js",
|
"browser": "dist/index.min.js",
|
||||||
"main": "dist/index.js",
|
"main": "dist/index.js",
|
||||||
"module": "dist/index.mjs",
|
"module": "dist/index.mjs",
|
||||||
"types": "dist/index.d.ts",
|
"types": "dist/index.d.ts",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"build": "mise run build",
|
"build": "mise run build",
|
||||||
|
"prepublishOnly": "mise run prepublish",
|
||||||
"test": "mise run test"
|
"test": "mise run test"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
|
|
||||||
|
|
@ -15,5 +15,8 @@ export default {
|
||||||
{ file: pkg.main, format: "cjs" },
|
{ file: pkg.main, format: "cjs" },
|
||||||
{ file: pkg.module, format: "es" },
|
{ file: pkg.module, format: "es" },
|
||||||
],
|
],
|
||||||
plugins: [typescript()],
|
plugins: [typescript({
|
||||||
|
include: ["src/**/*.ts"],
|
||||||
|
exclude: ["**/*.test.ts"],
|
||||||
|
})],
|
||||||
};
|
};
|
||||||
|
|
|
||||||
139
src/index.ts
139
src/index.ts
|
|
@ -1,137 +1,2 @@
|
||||||
/**
|
export { AsyncTaskQueue } from "./task-queue";
|
||||||
* Simple queue that executes an asynchronous handler on data in order.
|
export { withRetry } from "./retries";
|
||||||
*/
|
|
||||||
export class AsyncTaskQueue<T> {
|
|
||||||
private _queue: T[] = [];
|
|
||||||
private readonly _handler: (items: T[]) => Promise<unknown>;
|
|
||||||
private _errorHandler: (err: unknown) => unknown = console.error;
|
|
||||||
private _batchSize = 1;
|
|
||||||
private _throttleIntervalMs = 0;
|
|
||||||
private _processing: boolean = false;
|
|
||||||
|
|
||||||
constructor(handler: (items: T[]) => Promise<unknown>) {
|
|
||||||
this._handler = handler;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Replaces the error handler for this queue. The error handler is called
|
|
||||||
* whenever the task handler throws.
|
|
||||||
*
|
|
||||||
* This method updates the queue in place and returns it, so method calls may
|
|
||||||
* be chained.
|
|
||||||
*/
|
|
||||||
onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue<T> {
|
|
||||||
this._errorHandler = errorHandler;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets batch size for this queue. Setting this value greater than 1 allows
|
|
||||||
* the queue to pass up to n items to the task handler each time it is called.
|
|
||||||
*
|
|
||||||
* This method updates the queue in place and returns it, so method calls may
|
|
||||||
* be chained.
|
|
||||||
*/
|
|
||||||
batchSize(batchSize: number): AsyncTaskQueue<T> {
|
|
||||||
if (batchSize < 1) {
|
|
||||||
throw new Error("Batch size must be at least 1.");
|
|
||||||
}
|
|
||||||
this._batchSize = batchSize;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets throttle interval for this queue. Setting this longer than 0
|
|
||||||
* milliseconds will cause the queue to call the task handler at most once
|
|
||||||
* every n milliseconds.
|
|
||||||
*
|
|
||||||
* This method updates the queue in place and returns it, so method calls may
|
|
||||||
* be chained.
|
|
||||||
*/
|
|
||||||
throttleMs(intervalMs: number): AsyncTaskQueue<T> {
|
|
||||||
if (intervalMs < 0) {
|
|
||||||
throw new Error("Interval must be zero or more milliseconds.");
|
|
||||||
}
|
|
||||||
this._throttleIntervalMs = intervalMs;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds an item to the back of the queue.
|
|
||||||
*/
|
|
||||||
push(item: T): void {
|
|
||||||
this._queue.push(item);
|
|
||||||
if (!this._processing) {
|
|
||||||
this._processAll().catch(console.error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns all unprocessed items and removes them from the queue.
|
|
||||||
*/
|
|
||||||
drain(): T[] {
|
|
||||||
const items = this._queue;
|
|
||||||
this._queue = [];
|
|
||||||
return items;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the number of unprocessed items in the queue.
|
|
||||||
*/
|
|
||||||
size(): number {
|
|
||||||
return this._queue.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
private async _processAll(): Promise<void> {
|
|
||||||
// Control loop is very simple: a `while` loop in a single async "thread",
|
|
||||||
// which runs until the queue is empty and then waits for
|
|
||||||
// `this._processAll()` to be called again. Because the execution
|
|
||||||
// environment is single-threaded, the `this._processing` property is a
|
|
||||||
// sufficient stand-in for a mutex.
|
|
||||||
if (this._processing) {
|
|
||||||
throw new Error(
|
|
||||||
"Assertion error: attq should never process queue concurrently.",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
this._processing = true;
|
|
||||||
|
|
||||||
while (this._queue.length > 0) {
|
|
||||||
const throttleIntervalMs = this._throttleIntervalMs;
|
|
||||||
const throttleTimeout = throttleIntervalMs
|
|
||||||
? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs))
|
|
||||||
: Promise.resolve();
|
|
||||||
|
|
||||||
const items = this._queue.splice(0, this._batchSize);
|
|
||||||
await this._handler(items).catch(this._errorHandler);
|
|
||||||
|
|
||||||
await throttleTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
this._processing = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wraps an arbitrary async function with retry logic for exceptions.
|
|
||||||
*/
|
|
||||||
export function withRetry<T extends unknown[], U>(
|
|
||||||
fn: (...args: T) => Promise<U>,
|
|
||||||
{ attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: {
|
|
||||||
attempts: number;
|
|
||||||
backoffMs(attempt: number): number;
|
|
||||||
},
|
|
||||||
): (...args: T) => Promise<U> {
|
|
||||||
return async (...args: T) => {
|
|
||||||
for (let attempt = 0; attempt < attempts - 1; attempt += 1) {
|
|
||||||
try {
|
|
||||||
return await fn(...args);
|
|
||||||
} catch {
|
|
||||||
const delay = backoffMs(attempt);
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Preferably run the final attempt without a try/catch block so that thrown
|
|
||||||
// exceptions get a clean stack trace.
|
|
||||||
return await fn(...args);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
|
||||||
99
src/retries.test.ts
Normal file
99
src/retries.test.ts
Normal file
|
|
@ -0,0 +1,99 @@
|
||||||
|
import { withRetry } from "./retries";
|
||||||
|
|
||||||
|
const FAST_RETRY_CONFIG: Parameters<typeof withRetry>[1] = {
|
||||||
|
attempts: 6,
|
||||||
|
// Always retry almost immediately, even when using real timers.
|
||||||
|
backoffMs: () => 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
test("successful function only executes once", async () => {
|
||||||
|
const fn = jest.fn(async () => "success!");
|
||||||
|
const result = await withRetry(fn, FAST_RETRY_CONFIG)();
|
||||||
|
expect(result).toEqual("success!");
|
||||||
|
expect(fn).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("failing function rejects after correct number of attempts", async () => {
|
||||||
|
const fn = jest.fn(async () => {
|
||||||
|
throw "oops!";
|
||||||
|
});
|
||||||
|
await expect(withRetry(fn, FAST_RETRY_CONFIG)()).rejects.toEqual("oops!");
|
||||||
|
expect(fn).toHaveBeenCalledTimes(FAST_RETRY_CONFIG.attempts);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("parameters are passed through", async () => {
|
||||||
|
const fn = async (str: string, n: number) => {
|
||||||
|
let result = "";
|
||||||
|
for (let i = 0; i < n; i += 1) {
|
||||||
|
result += str;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
};
|
||||||
|
const retriable = withRetry(fn, FAST_RETRY_CONFIG);
|
||||||
|
|
||||||
|
// A bit of type system hackery to assert at build time that function
|
||||||
|
// signatures match.
|
||||||
|
/* eslint-disable @typescript-eslint/no-unused-vars */
|
||||||
|
type Assert<_T extends true> = void;
|
||||||
|
type _ = Assert<typeof retriable extends typeof fn ? true : false>;
|
||||||
|
/* eslint-enable @typescript-eslint/no-unused-vars */
|
||||||
|
|
||||||
|
await expect(retriable("hello;", 3)).resolves.toEqual("hello;hello;hello;");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("retries back off with expected delays", async () => {
|
||||||
|
jest.useFakeTimers();
|
||||||
|
|
||||||
|
let n = 0;
|
||||||
|
const fn = jest.fn(async () => {
|
||||||
|
if (n < 5) {
|
||||||
|
n += 1;
|
||||||
|
throw "oops!";
|
||||||
|
} else {
|
||||||
|
// Fake timers don't play nicely with the `expect().rejects` API, so
|
||||||
|
// ensure that the last attempt succeeds.
|
||||||
|
return "success!";
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const promise = withRetry(
|
||||||
|
fn,
|
||||||
|
{
|
||||||
|
attempts: 6,
|
||||||
|
backoffMs: (attempt) => 1000 * 2 ** attempt,
|
||||||
|
},
|
||||||
|
)();
|
||||||
|
|
||||||
|
expect(fn).toHaveBeenCalledTimes(1);
|
||||||
|
await jest.advanceTimersByTimeAsync(100);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
await jest.advanceTimersByTimeAsync(1000);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(2);
|
||||||
|
await jest.advanceTimersByTimeAsync(100);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(2);
|
||||||
|
|
||||||
|
await jest.advanceTimersByTimeAsync(2000);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(3);
|
||||||
|
await jest.advanceTimersByTimeAsync(100);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(3);
|
||||||
|
|
||||||
|
await jest.advanceTimersByTimeAsync(4000);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(4);
|
||||||
|
await jest.advanceTimersByTimeAsync(100);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(4);
|
||||||
|
|
||||||
|
await jest.advanceTimersByTimeAsync(8000);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(5);
|
||||||
|
await jest.advanceTimersByTimeAsync(100);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(5);
|
||||||
|
|
||||||
|
await jest.advanceTimersByTimeAsync(16000);
|
||||||
|
expect(fn).toHaveBeenCalledTimes(6);
|
||||||
|
|
||||||
|
await expect(promise).resolves.toEqual("success!")
|
||||||
|
|
||||||
|
// Fake timers don't play nicely with the `expect().rejects` API, so disable
|
||||||
|
// them again for other tests.
|
||||||
|
jest.useRealTimers();
|
||||||
|
});
|
||||||
24
src/retries.ts
Normal file
24
src/retries.ts
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
/**
|
||||||
|
* Wraps an arbitrary async function with retry logic for exceptions.
|
||||||
|
*/
|
||||||
|
export function withRetry<T extends unknown[], U>(
|
||||||
|
fn: (...args: T) => Promise<U>,
|
||||||
|
{ attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: {
|
||||||
|
attempts: number;
|
||||||
|
backoffMs(attempt: number): number;
|
||||||
|
},
|
||||||
|
): (...args: T) => Promise<U> {
|
||||||
|
return async (...args: T) => {
|
||||||
|
for (let attempt = 0; attempt < attempts - 1; attempt += 1) {
|
||||||
|
try {
|
||||||
|
return await fn(...args);
|
||||||
|
} catch {
|
||||||
|
const delay = backoffMs(attempt);
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Preferably run the final attempt without a try/catch block so that thrown
|
||||||
|
// exceptions get a clean stack trace.
|
||||||
|
return await fn(...args);
|
||||||
|
};
|
||||||
|
}
|
||||||
112
src/task-queue.ts
Normal file
112
src/task-queue.ts
Normal file
|
|
@ -0,0 +1,112 @@
|
||||||
|
/**
|
||||||
|
* Simple queue that executes an asynchronous handler on data in order.
|
||||||
|
*/
|
||||||
|
export class AsyncTaskQueue<T> {
|
||||||
|
private _queue: T[] = [];
|
||||||
|
private readonly _handler: (items: T[]) => Promise<unknown>;
|
||||||
|
private _errorHandler: (err: unknown) => unknown = console.error;
|
||||||
|
private _batchSize = 1;
|
||||||
|
private _throttleIntervalMs = 0;
|
||||||
|
private _processing: boolean = false;
|
||||||
|
|
||||||
|
constructor(handler: (items: T[]) => Promise<unknown>) {
|
||||||
|
this._handler = handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replaces the error handler for this queue. The error handler is called
|
||||||
|
* whenever the task handler throws.
|
||||||
|
*
|
||||||
|
* This method updates the queue in place and returns it, so method calls may
|
||||||
|
* be chained.
|
||||||
|
*/
|
||||||
|
onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue<T> {
|
||||||
|
this._errorHandler = errorHandler;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets batch size for this queue. Setting this value greater than 1 allows
|
||||||
|
* the queue to pass up to n items to the task handler each time it is called.
|
||||||
|
*
|
||||||
|
* This method updates the queue in place and returns it, so method calls may
|
||||||
|
* be chained.
|
||||||
|
*/
|
||||||
|
batchSize(batchSize: number): AsyncTaskQueue<T> {
|
||||||
|
if (batchSize < 1) {
|
||||||
|
throw new Error("Batch size must be at least 1.");
|
||||||
|
}
|
||||||
|
this._batchSize = batchSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets throttle interval for this queue. Setting this longer than 0
|
||||||
|
* milliseconds will cause the queue to call the task handler at most once
|
||||||
|
* every n milliseconds.
|
||||||
|
*
|
||||||
|
* This method updates the queue in place and returns it, so method calls may
|
||||||
|
* be chained.
|
||||||
|
*/
|
||||||
|
throttleMs(intervalMs: number): AsyncTaskQueue<T> {
|
||||||
|
if (intervalMs < 0) {
|
||||||
|
throw new Error("Interval must be zero or more milliseconds.");
|
||||||
|
}
|
||||||
|
this._throttleIntervalMs = intervalMs;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds an item to the back of the queue.
|
||||||
|
*/
|
||||||
|
push(item: T): void {
|
||||||
|
this._queue.push(item);
|
||||||
|
if (!this._processing) {
|
||||||
|
this._processAll().catch(console.error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns all unprocessed items and removes them from the queue.
|
||||||
|
*/
|
||||||
|
drain(): T[] {
|
||||||
|
const items = this._queue;
|
||||||
|
this._queue = [];
|
||||||
|
return items;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of unprocessed items in the queue.
|
||||||
|
*/
|
||||||
|
size(): number {
|
||||||
|
return this._queue.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async _processAll(): Promise<void> {
|
||||||
|
// Control loop is very simple: a `while` loop in a single async "thread",
|
||||||
|
// which runs until the queue is empty and then waits for
|
||||||
|
// `this._processAll()` to be called again. Because the execution
|
||||||
|
// environment is single-threaded, the `this._processing` property is a
|
||||||
|
// sufficient stand-in for a mutex.
|
||||||
|
if (this._processing) {
|
||||||
|
throw new Error(
|
||||||
|
"Assertion error: attq should never process queue concurrently.",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this._processing = true;
|
||||||
|
|
||||||
|
while (this._queue.length > 0) {
|
||||||
|
const throttleIntervalMs = this._throttleIntervalMs;
|
||||||
|
const throttleTimeout = throttleIntervalMs
|
||||||
|
? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs))
|
||||||
|
: Promise.resolve();
|
||||||
|
|
||||||
|
const items = this._queue.splice(0, this._batchSize);
|
||||||
|
await this._handler(items).catch(this._errorHandler);
|
||||||
|
|
||||||
|
await throttleTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
this._processing = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue