ConcurrencyC#verifiedVerified
Producer-Consumer Pattern in C#
Decouple data production from data consumption using a shared buffer, allowing each side to operate at its own pace.
How to Implement the Producer-Consumer Pattern in C#
1Step 1: Define the work item
public record WorkItem(int Id, string Data);2Step 2: Producer writes items to a channel
public static class Producer
{
public static async Task ProduceAsync(
ChannelWriter<WorkItem> writer, int count)
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(new WorkItem(i, $"Item-{i}"));
Console.WriteLine($"Produced: {i}");
}
writer.Complete();
}
}3Step 3: Consumer reads items from the channel
public static class Consumer
{
public static async Task ConsumeAsync(
ChannelReader<WorkItem> reader, string name)
{
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"{name} consumed: {item.Id}");
await Task.Delay(50); // Simulate processing
}
}
}
// Usage:
// var channel = Channel.CreateBounded<WorkItem>(10);
// await Task.WhenAll(
// Producer.ProduceAsync(channel.Writer, 20),
// Consumer.ConsumeAsync(channel.Reader, "Worker-1"),
// Consumer.ConsumeAsync(channel.Reader, "Worker-2"));using System.Threading.Channels;
using Microsoft.Extensions.Logging;
// [step] Define typed work items with priority
public enum Priority { Low, Normal, High, Critical }
public record WorkItem<T>(
string Id, T Payload, Priority Priority,
DateTime EnqueuedAt)
{
public static WorkItem<T> Create(T payload,
Priority priority = Priority.Normal) =>
new(Guid.NewGuid().ToString(), payload, priority, DateTime.UtcNow);
}
public record ProcessingResult<T>(
string WorkItemId, bool Success, T? Result,
TimeSpan Duration, string? Error = null);
// [step] Producer with backpressure and batching support
public sealed class Producer<T>(
ChannelWriter<WorkItem<T>> writer,
ILogger<Producer<T>> logger)
{
private long _produced;
public long ProducedCount => Interlocked.Read(ref _produced);
public async Task ProduceAsync(
T payload, Priority priority = Priority.Normal,
CancellationToken ct = default)
{
var item = WorkItem<T>.Create(payload, priority);
await writer.WriteAsync(item, ct);
Interlocked.Increment(ref _produced);
logger.LogDebug("Produced {Id} (priority: {Priority})",
item.Id, priority);
}
public async Task ProduceBatchAsync(
IEnumerable<T> payloads, Priority priority = Priority.Normal,
CancellationToken ct = default)
{
foreach (var payload in payloads)
{
ct.ThrowIfCancellationRequested();
await ProduceAsync(payload, priority, ct);
}
}
public void Complete() => writer.TryComplete();
}
// [step] Consumer with parallel processing and error handling
public sealed class Consumer<TIn, TOut>(
ChannelReader<WorkItem<TIn>> reader,
Func<TIn, CancellationToken, Task<TOut>> processor,
ILogger<Consumer<TIn, TOut>> logger)
{
private long _processed;
private long _failed;
public long ProcessedCount => Interlocked.Read(ref _processed);
public long FailedCount => Interlocked.Read(ref _failed);
public async Task RunAsync(
int concurrency = 1,
CancellationToken ct = default)
{
var workers = Enumerable.Range(0, concurrency)
.Select(i => ProcessLoopAsync($"Worker-{i}", ct));
await Task.WhenAll(workers);
}
private async Task ProcessLoopAsync(
string workerName, CancellationToken ct)
{
logger.LogInformation("{Worker} started", workerName);
await foreach (var item in reader.ReadAllAsync(ct))
{
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
var result = await processor(item.Payload, ct);
Interlocked.Increment(ref _processed);
logger.LogDebug("{Worker} processed {Id} in {Duration}ms",
workerName, item.Id, sw.ElapsedMilliseconds);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
Interlocked.Increment(ref _failed);
logger.LogError(ex, "{Worker} failed on {Id}",
workerName, item.Id);
}
}
logger.LogInformation("{Worker} completed", workerName);
}
}
// [step] Pipeline builder for composing producer-consumer chains
public static class PipelineBuilder
{
public static (Producer<T>, Consumer<T, TOut>) Create<T, TOut>(
Func<T, CancellationToken, Task<TOut>> processor,
int capacity,
ILoggerFactory loggerFactory)
{
var channel = Channel.CreateBounded<WorkItem<T>>(
new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false,
});
var producer = new Producer<T>(channel.Writer,
loggerFactory.CreateLogger<Producer<T>>());
var consumer = new Consumer<T, TOut>(channel.Reader, processor,
loggerFactory.CreateLogger<Consumer<T, TOut>>());
return (producer, consumer);
}
}Producer-Consumer Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Producer-Consumer Pattern in the Real World
“Think of a bakery where bakers (producers) place fresh loaves on a display shelf (the queue) and customers (consumers) pick them up at their leisure. The shelf decouples the baking schedule from customer arrival times — bakers keep baking even when no customer is present, and customers keep shopping even when bakers are on break.”