.NET 中 Channel 类简单使用

简介: `System.Threading.Channels` 提供异步生产者-消费者数据结构,用于.NET Standard上的跨平台同步。频道实现生产者/消费者模型,允许在任务间异步传递数据。简单示例展示如何创建无界和有界频道,以及多生产者和消费者共享频道的场景。频道常用于内存中的消息队列,通过控制生产者和消费者的速率来调整系统流量。

Channel 是干什么的#

The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consumers asynchronously. The library targets .NET Standard and works on all .NET implementations.

Channels are an implementation of the producer/consumer conceptual programming model.

以上是微软官方的解释 channels。用中文说的话就是这个类提供了在生产者跟消费者之间异步传统数据的能力,简单来说可以认为是一个内存消息队列。

示例 1#

下面是一个简单的示例,说明如何使用 Channel 类来创建一个生产者-消费者模型:

static async Task Main(string[] args)
    {
        var channel = Channel.CreateUnbounded<int>();
        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                await channel.Writer.WriteAsync(i);
                await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
            }
            channel.Writer.Complete();
        });
        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"消费者接收到: {item}");
            }
        });
        await Task.WhenAll(producer, consumer);
    }

在这个例子中,我们创建了一个无界的通道,然后创建了两个任务,一个是生产者,一个是消费者。生产者每秒生成一个数字,然后写入通道。消费者从通道中读取数据并打印出来。当生产者完成写入后,它会调用 channel.Writer.Complete() 来通知消费者没有更多的数据可以读取。

示例 2#

你可以使用 Channel.CreateBounded(capacity) 方法来创建一个有界的通道,其中 capacity 参数指定了通道的容量。当通道满时,尝试写入的操作将会阻塞,直到有空间可用。

static async Task Main(string[] args)
    {
        var channel = Channel.CreateBounded<int>(5); // 创建一个容量为5的有界通道
        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                await channel.Writer.WriteAsync(i);
                Console.WriteLine($"生产者生成了: {i}");
                await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
            }
            channel.Writer.Complete();
        });
        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"消费者接收到: {item}");
                await Task.Delay(2000); // 模拟消费者需要一些时间来处理数据
            }
        });
        await Task.WhenAll(producer, consumer);
    }

在这个例子中,我们创建了一个容量为5的有界通道。生产者每秒生成一个数字,然后写入通道。消费者从通道中读取数据并打印出来,但消费者处理数据的速度比生产者慢,所以当通道满时,生产者的 WriteAsync 操作将会阻塞,直到消费者读取了一些数据,使得通道有空间可用。

示例 3#

下面是一个示例,展示了如何在多个生产者和消费者之间共享一个通道:

static async Task Main(string[] args)
    {
        var channel = Channel.CreateUnbounded<int>();
        // 创建两个生产者
        var producer1 = Produce(channel.Writer, id: 1);
        var producer2 = Produce(channel.Writer, id: 2);
        // 创建两个消费者
        var consumer1 = Consume(channel.Reader, id: 1);
        var consumer2 = Consume(channel.Reader, id: 2);
        // 等待所有生产者和消费者完成
        await Task.WhenAll(producer1, producer2, consumer1, consumer2);
    }
    static async Task Produce(ChannelWriter<int> writer, int id)
    {
        for (int i = 0; i < 10; i++)
        {
            await writer.WriteAsync(i);
            Console.WriteLine($"生产者{id}生成了: {i}");
            await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
        }
        writer.Complete();
    }
    static async Task Consume(ChannelReader<int> reader, int id)
    {
        await foreach (var item in reader.ReadAllAsync())
        {
            Console.WriteLine($"消费者{id}接收到: {item}");
            await Task.Delay(2000); // 模拟消费者需要一些时间来处理数据
        }
    }

在这个例子中,我们创建了两个生产者和两个消费者,它们都共享同一个通道。这是一个非常重要使用模式。因为当我们使用消息队列的时候往往会有多个生产者跟多个消费者。我们可以通过控制生产者生产的速度来控制推入队列的数据量。我们还可以通过控制消费者的数量来控制消费数据的速度,从而来调节系统的流量,达到消峰填谷的作用。

相关文章
|
IDE API 开发工具
拦截|篡改|伪造.NET类库中不限于public的类和方法
本文除了回顾拦截.NET类库中的方法,实现方法参数的篡改、方法返回结果的伪造,再着重介绍.NET类库中非public类及方法如何拦截。
|
3月前
|
开发框架 .NET C#
C#|.net core 基础 - 删除字符串最后一个字符的七大类N种实现方式
【10月更文挑战第9天】在 C#/.NET Core 中,有多种方法可以删除字符串的最后一个字符,包括使用 `Substring` 方法、`Remove` 方法、`ToCharArray` 与 `Array.Copy`、`StringBuilder`、正则表达式、循环遍历字符数组以及使用 LINQ 的 `SkipLast` 方法。
|
11天前
|
JSON 安全 API
.net 自定义日志类
在.NET中,创建自定义日志类有助于更好地管理日志信息。示例展示了如何创建、配置和使用日志记录功能,包括写入日志文件、设置日志级别、格式化消息等。注意事项涵盖时间戳、日志级别、JSON序列化、线程安全、日志格式、文件处理及示例使用。请根据需求调整代码。
36 13
|
17天前
|
JSON 数据格式
.net HTTP请求类封装
`HttpRequestHelper` 是一个用于简化 HTTP 请求的辅助类,支持发送 GET 和 POST 请求。它使用 `HttpClient` 发起请求,并通过 `Newtonsoft.Json` 处理 JSON 数据。示例展示了如何使用该类发送请求并处理响应。注意事项包括:简单的错误处理、需安装 `Newtonsoft.Json` 依赖,以及建议重用 `HttpClient` 实例以优化性能。
61 2
|
3月前
|
安全 网络安全 数据安全/隐私保护
【Azure Developer】System.Net.WebException: The request was aborted: Could not create SSL/TLS secure channel.
System.Net.WebException: The request was aborted: Could not create SSL/TLS secure channel.
|
3月前
.NET 4.0下实现.NET4.5的Task类相似功能组件
【10月更文挑战第29天】在.NET 4.0 环境下,可以使用 `BackgroundWorker` 类来实现类似于 .NET 4.5 中 `Task` 类的功能。`BackgroundWorker` 允许在后台执行耗时操作,同时不会阻塞用户界面线程,并支持进度报告和取消操作。尽管它有一些局限性,如复杂的事件处理模型和不灵活的任务管理方式,但在某些情况下仍能有效替代 `Task` 类。
|
3月前
|
API
使用`System.Net.WebClient`类发送HTTP请求来调用阿里云短信API
使用`System.Net.WebClient`类发送HTTP请求来调用阿里云短信API
53 0
|
5月前
|
缓存 程序员
封装一个给 .NET Framework 用的内存缓存帮助类
封装一个给 .NET Framework 用的内存缓存帮助类
|
7月前
|
存储 Go C#
【.NET Core】深入理解IO之File类
【.NET Core】深入理解IO之File类
121 6
|
7月前
|
存储 开发框架 缓存
【.NET Core】你真的了解HttpRuntime类吗
【.NET Core】你真的了解HttpRuntime类吗
72 0