split source files based on contents
This commit is contained in:
parent
3c2ef378bc
commit
fed6337a7e
4 changed files with 138 additions and 137 deletions
139
src/index.ts
139
src/index.ts
|
|
@ -1,137 +1,2 @@
|
||||||
/**
|
export { AsyncTaskQueue } from "./task-queue";
|
||||||
* Simple queue that executes an asynchronous handler on data in order.
|
export { withRetry } from "./retries";
|
||||||
*/
|
|
||||||
export class AsyncTaskQueue<T> {
|
|
||||||
private _queue: T[] = [];
|
|
||||||
private readonly _handler: (items: T[]) => Promise<unknown>;
|
|
||||||
private _errorHandler: (err: unknown) => unknown = console.error;
|
|
||||||
private _batchSize = 1;
|
|
||||||
private _throttleIntervalMs = 0;
|
|
||||||
private _processing: boolean = false;
|
|
||||||
|
|
||||||
constructor(handler: (items: T[]) => Promise<unknown>) {
|
|
||||||
this._handler = handler;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Replaces the error handler for this queue. The error handler is called
|
|
||||||
* whenever the task handler throws.
|
|
||||||
*
|
|
||||||
* This method updates the queue in place and returns it, so method calls may
|
|
||||||
* be chained.
|
|
||||||
*/
|
|
||||||
onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue<T> {
|
|
||||||
this._errorHandler = errorHandler;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets batch size for this queue. Setting this value greater than 1 allows
|
|
||||||
* the queue to pass up to n items to the task handler each time it is called.
|
|
||||||
*
|
|
||||||
* This method updates the queue in place and returns it, so method calls may
|
|
||||||
* be chained.
|
|
||||||
*/
|
|
||||||
batchSize(batchSize: number): AsyncTaskQueue<T> {
|
|
||||||
if (batchSize < 1) {
|
|
||||||
throw new Error("Batch size must be at least 1.");
|
|
||||||
}
|
|
||||||
this._batchSize = batchSize;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets throttle interval for this queue. Setting this longer than 0
|
|
||||||
* milliseconds will cause the queue to call the task handler at most once
|
|
||||||
* every n milliseconds.
|
|
||||||
*
|
|
||||||
* This method updates the queue in place and returns it, so method calls may
|
|
||||||
* be chained.
|
|
||||||
*/
|
|
||||||
throttleMs(intervalMs: number): AsyncTaskQueue<T> {
|
|
||||||
if (intervalMs < 0) {
|
|
||||||
throw new Error("Interval must be zero or more milliseconds.");
|
|
||||||
}
|
|
||||||
this._throttleIntervalMs = intervalMs;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds an item to the back of the queue.
|
|
||||||
*/
|
|
||||||
push(item: T): void {
|
|
||||||
this._queue.push(item);
|
|
||||||
if (!this._processing) {
|
|
||||||
this._processAll().catch(console.error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns all unprocessed items and removes them from the queue.
|
|
||||||
*/
|
|
||||||
drain(): T[] {
|
|
||||||
const items = this._queue;
|
|
||||||
this._queue = [];
|
|
||||||
return items;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the number of unprocessed items in the queue.
|
|
||||||
*/
|
|
||||||
size(): number {
|
|
||||||
return this._queue.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
private async _processAll(): Promise<void> {
|
|
||||||
// Control loop is very simple: a `while` loop in a single async "thread",
|
|
||||||
// which runs until the queue is empty and then waits for
|
|
||||||
// `this._processAll()` to be called again. Because the execution
|
|
||||||
// environment is single-threaded, the `this._processing` property is a
|
|
||||||
// sufficient stand-in for a mutex.
|
|
||||||
if (this._processing) {
|
|
||||||
throw new Error(
|
|
||||||
"Assertion error: attq should never process queue concurrently.",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
this._processing = true;
|
|
||||||
|
|
||||||
while (this._queue.length > 0) {
|
|
||||||
const throttleIntervalMs = this._throttleIntervalMs;
|
|
||||||
const throttleTimeout = throttleIntervalMs
|
|
||||||
? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs))
|
|
||||||
: Promise.resolve();
|
|
||||||
|
|
||||||
const items = this._queue.splice(0, this._batchSize);
|
|
||||||
await this._handler(items).catch(this._errorHandler);
|
|
||||||
|
|
||||||
await throttleTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
this._processing = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wraps an arbitrary async function with retry logic for exceptions.
|
|
||||||
*/
|
|
||||||
export function withRetry<T extends unknown[], U>(
|
|
||||||
fn: (...args: T) => Promise<U>,
|
|
||||||
{ attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: {
|
|
||||||
attempts: number;
|
|
||||||
backoffMs(attempt: number): number;
|
|
||||||
},
|
|
||||||
): (...args: T) => Promise<U> {
|
|
||||||
return async (...args: T) => {
|
|
||||||
for (let attempt = 0; attempt < attempts - 1; attempt += 1) {
|
|
||||||
try {
|
|
||||||
return await fn(...args);
|
|
||||||
} catch {
|
|
||||||
const delay = backoffMs(attempt);
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Preferably run the final attempt without a try/catch block so that thrown
|
|
||||||
// exceptions get a clean stack trace.
|
|
||||||
return await fn(...args);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
|
||||||
24
src/retries.ts
Normal file
24
src/retries.ts
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
/**
|
||||||
|
* Wraps an arbitrary async function with retry logic for exceptions.
|
||||||
|
*/
|
||||||
|
export function withRetry<T extends unknown[], U>(
|
||||||
|
fn: (...args: T) => Promise<U>,
|
||||||
|
{ attempts = 6, backoffMs = (attempt) => 1000 * 2 ** attempt }: {
|
||||||
|
attempts: number;
|
||||||
|
backoffMs(attempt: number): number;
|
||||||
|
},
|
||||||
|
): (...args: T) => Promise<U> {
|
||||||
|
return async (...args: T) => {
|
||||||
|
for (let attempt = 0; attempt < attempts - 1; attempt += 1) {
|
||||||
|
try {
|
||||||
|
return await fn(...args);
|
||||||
|
} catch {
|
||||||
|
const delay = backoffMs(attempt);
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Preferably run the final attempt without a try/catch block so that thrown
|
||||||
|
// exceptions get a clean stack trace.
|
||||||
|
return await fn(...args);
|
||||||
|
};
|
||||||
|
}
|
||||||
112
src/task-queue.ts
Normal file
112
src/task-queue.ts
Normal file
|
|
@ -0,0 +1,112 @@
|
||||||
|
/**
|
||||||
|
* Simple queue that executes an asynchronous handler on data in order.
|
||||||
|
*/
|
||||||
|
export class AsyncTaskQueue<T> {
|
||||||
|
private _queue: T[] = [];
|
||||||
|
private readonly _handler: (items: T[]) => Promise<unknown>;
|
||||||
|
private _errorHandler: (err: unknown) => unknown = console.error;
|
||||||
|
private _batchSize = 1;
|
||||||
|
private _throttleIntervalMs = 0;
|
||||||
|
private _processing: boolean = false;
|
||||||
|
|
||||||
|
constructor(handler: (items: T[]) => Promise<unknown>) {
|
||||||
|
this._handler = handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replaces the error handler for this queue. The error handler is called
|
||||||
|
* whenever the task handler throws.
|
||||||
|
*
|
||||||
|
* This method updates the queue in place and returns it, so method calls may
|
||||||
|
* be chained.
|
||||||
|
*/
|
||||||
|
onError(errorHandler: (err: unknown) => unknown): AsyncTaskQueue<T> {
|
||||||
|
this._errorHandler = errorHandler;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets batch size for this queue. Setting this value greater than 1 allows
|
||||||
|
* the queue to pass up to n items to the task handler each time it is called.
|
||||||
|
*
|
||||||
|
* This method updates the queue in place and returns it, so method calls may
|
||||||
|
* be chained.
|
||||||
|
*/
|
||||||
|
batchSize(batchSize: number): AsyncTaskQueue<T> {
|
||||||
|
if (batchSize < 1) {
|
||||||
|
throw new Error("Batch size must be at least 1.");
|
||||||
|
}
|
||||||
|
this._batchSize = batchSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets throttle interval for this queue. Setting this longer than 0
|
||||||
|
* milliseconds will cause the queue to call the task handler at most once
|
||||||
|
* every n milliseconds.
|
||||||
|
*
|
||||||
|
* This method updates the queue in place and returns it, so method calls may
|
||||||
|
* be chained.
|
||||||
|
*/
|
||||||
|
throttleMs(intervalMs: number): AsyncTaskQueue<T> {
|
||||||
|
if (intervalMs < 0) {
|
||||||
|
throw new Error("Interval must be zero or more milliseconds.");
|
||||||
|
}
|
||||||
|
this._throttleIntervalMs = intervalMs;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds an item to the back of the queue.
|
||||||
|
*/
|
||||||
|
push(item: T): void {
|
||||||
|
this._queue.push(item);
|
||||||
|
if (!this._processing) {
|
||||||
|
this._processAll().catch(console.error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns all unprocessed items and removes them from the queue.
|
||||||
|
*/
|
||||||
|
drain(): T[] {
|
||||||
|
const items = this._queue;
|
||||||
|
this._queue = [];
|
||||||
|
return items;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of unprocessed items in the queue.
|
||||||
|
*/
|
||||||
|
size(): number {
|
||||||
|
return this._queue.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async _processAll(): Promise<void> {
|
||||||
|
// Control loop is very simple: a `while` loop in a single async "thread",
|
||||||
|
// which runs until the queue is empty and then waits for
|
||||||
|
// `this._processAll()` to be called again. Because the execution
|
||||||
|
// environment is single-threaded, the `this._processing` property is a
|
||||||
|
// sufficient stand-in for a mutex.
|
||||||
|
if (this._processing) {
|
||||||
|
throw new Error(
|
||||||
|
"Assertion error: attq should never process queue concurrently.",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this._processing = true;
|
||||||
|
|
||||||
|
while (this._queue.length > 0) {
|
||||||
|
const throttleIntervalMs = this._throttleIntervalMs;
|
||||||
|
const throttleTimeout = throttleIntervalMs
|
||||||
|
? new Promise((resolve) => setTimeout(resolve, throttleIntervalMs))
|
||||||
|
: Promise.resolve();
|
||||||
|
|
||||||
|
const items = this._queue.splice(0, this._batchSize);
|
||||||
|
await this._handler(items).catch(this._errorHandler);
|
||||||
|
|
||||||
|
await throttleTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
this._processing = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue