.NET中的Channel是用于生产者与消费者间异步通信的高性能、线程安全队列,分有界(BoundedChannel)和无界(UnboundedChannel)两种类型,支持多生产者多消费者并发,通过Writer写入、Reader读取数据,常用于解耦任务、控制并发与实现背压,推荐使用有界通道防止内存溢出,并结合CancellationToken实现优雅关闭。
.NET中的Channel 是一种用于在生产者和消费者之间进行异步通信的高性能、线程安全的数据结构。它本质上是一个先进先出(FIFO)的消息队列,支持多生产者和多消费者场景,底层基于 System.Threading.Channels 命名空间实现。Channel 特别适合用于解耦任务处理、控制并发、避免内存溢出以及实现背压(backpressure)机制。
Channel 分为两种主要类型:
// 创建一个最多容纳100个消息的有界通道 var channel = Channel.CreateBounded(100); // 或创建无界通道(不推荐用于高负载场景) var unboundedChannel = Channel.CreateUnbounded ();
生产者通过 Writer 向 Channel 写入数据。写入可以是同步或异步的,推荐使用异步方式以避免阻塞线程。
await channel.Writer.WriteAsync("消息1");
如果使用有界 Channel 且已满,WriteAsync 会等待直到有空间可用(除非配置了其他策略,如丢弃)。你也可以手动完成写入,表示不再有新消息:
channel.Writer.Complete();
消费者通过 Reader 读取消息。通常在一个循环中使用 WaitToReadAsync 和 TryRead 来持续处理消息。
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"处理: {item}");
}
或者手动控制读取逻辑:
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out var item))
{
Console.WriteLine($"处理: {item}");
}
}
IHostedService 启动后台消费者。var channel = Channel.CreateBounded基本上就这些。Channel 提供了一种简洁、高效且可控的方式来实现生产者-消费者模式,尤其适合需要流式处理、任务队列或事件分发的场景。(10); // 启动消费者 _ = Task.Run(async () => { await foreach (var item in channel.Reader.ReadAllAsync()) { Console.WriteLine($"消费: {item}"); await Task.Delay(100); // 模拟处理时间 } }); // 生产者发送消息 for (int i = 0; i < 5; i++) { await channel.Writer.WriteAsync(i); await Task.Delay(50); } channe l.Writer.Complete();