ConcurrencyTypeScriptverifiedVerified
Producer-Consumer Pattern in TypeScript
Decouple data production from data consumption using a shared buffer, allowing each side to operate at its own pace.
How to Implement the Producer-Consumer Pattern in TypeScript
1Step 1: Define the bounded queue interface
interface BoundedQueue<T> {
enqueue(item: T): Promise<void>;
dequeue(): Promise<T>;
readonly size: number;
}2Step 2: Implement the async queue with backpressure
class AsyncQueue<T> implements BoundedQueue<T> {
private items: T[] = [];
private waitingProducers: Array<(v: void) => void> = [];
private waitingConsumers: Array<(item: T) => void> = [];
constructor(private capacity: number) {}
get size(): number { return this.items.length; }
async enqueue(item: T): Promise<void> {
if (this.waitingConsumers.length > 0) {
this.waitingConsumers.shift()!(item);
return;
}
if (this.items.length < this.capacity) {
this.items.push(item);
return;
}
// Block producer: queue is full
await new Promise<void>(resolve => this.waitingProducers.push(resolve));
this.items.push(item);
}
async dequeue(): Promise<T> {
if (this.items.length > 0) {
const item = this.items.shift()!;
this.waitingProducers.shift()?.();
return item;
}
// Block consumer: queue is empty
return new Promise<T>(resolve => this.waitingConsumers.push(resolve));
}
}3Step 3: Create producer and consumer coroutines
async function producer(queue: BoundedQueue<number>): Promise<void> {
for (let i = 0; i < 10; i++) {
await queue.enqueue(i);
console.log(`Produced: ${i}`);
}
}
async function consumer(queue: BoundedQueue<number>): Promise<void> {
for (let i = 0; i < 10; i++) {
const item = await queue.dequeue();
console.log(`Consumed: ${item}`);
}
}4Step 4: Run producer and consumer concurrently
const queue = new AsyncQueue<number>(3);
await Promise.all([producer(queue), consumer(queue)]);// ── Task Queue with Workers and Backpressure ─────────────────────
interface Task<T> {
id: string;
payload: T;
priority?: number;
retries?: number;
}
interface WorkerResult<T, R> {
taskId: string;
result?: R;
error?: Error;
attempts: number;
durationMs: number;
}
interface QueueOptions {
capacity: number;
workers: number;
maxRetries: number;
backpressureStrategy: "block" | "drop" | "error";
}
class TaskQueue<T, R> {
private queue: Task<T>[] = [];
private waitingProducers: Array<(v: void) => void> = [];
private activeWorkers = 0;
private isShutdown = false;
private results: WorkerResult<T, R>[] = [];
constructor(
private handler: (task: Task<T>) => Promise<R>,
private options: QueueOptions
) {
for (let i = 0; i < options.workers; i++) {
void this.runWorker();
}
}
async submit(task: Task<T>): Promise<void> {
if (this.isShutdown) throw new Error("Queue is shut down");
if (this.queue.length >= this.options.capacity) {
switch (this.options.backpressureStrategy) {
case "drop":
return;
case "error":
throw new Error(`Queue full (capacity: ${this.options.capacity})`);
case "block":
await new Promise<void>(r => this.waitingProducers.push(r));
}
}
this.queue.push(task);
this.queue.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
}
private async runWorker(): Promise<void> {
while (!this.isShutdown || this.queue.length > 0) {
const task = this.queue.shift();
if (!task) {
await new Promise(r => setTimeout(r, 10));
continue;
}
this.waitingProducers.shift()?.();
this.activeWorkers++;
const start = Date.now();
let attempts = 0;
let lastError: Error | undefined;
while (attempts <= (task.retries ?? this.options.maxRetries)) {
try {
const result = await this.handler(task);
this.results.push({
taskId: task.id,
result,
attempts: attempts + 1,
durationMs: Date.now() - start,
});
break;
} catch (e) {
lastError = e instanceof Error ? e : new Error(String(e));
attempts++;
if (attempts <= (task.retries ?? this.options.maxRetries)) {
await new Promise(r => setTimeout(r, 100 * 2 ** attempts));
}
}
}
if (lastError && attempts > (task.retries ?? this.options.maxRetries)) {
this.results.push({
taskId: task.id,
error: lastError,
attempts,
durationMs: Date.now() - start,
});
}
this.activeWorkers--;
}
}
async drain(): Promise<WorkerResult<T, R>[]> {
while (this.queue.length > 0 || this.activeWorkers > 0) {
await new Promise(r => setTimeout(r, 50));
}
this.isShutdown = true;
return this.results;
}
}
export { TaskQueue, type Task, type WorkerResult, type QueueOptions };Producer-Consumer Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Producer-Consumer Pattern in the Real World
“Think of a bakery where bakers (producers) place fresh loaves on a display shelf (the queue) and customers (consumers) pick them up at their leisure. The shelf decouples the baking schedule from customer arrival times — bakers keep baking even when no customer is present, and customers keep shopping even when bakers are on break.”