理解 .NET Core中的Channel篇之二——高级通道

简介:   1、复习  在我们以前的文章中,我们看了一些关于Channels如何工作的简单示例,并且看到了一些漂亮的功能,但是在大多数情况下,它与任何其他Queue实现都非常相似。  因此,让我们深入探讨一些更高级的主题。  好吧,虽然说得高级,但是很多事情还是很简单。不过,为了获取更多有价值的信息,还是值得我们去探索的!  2、读写分离

  1、复习

  在我们以前的文章中,我们看了一些关于Channels如何工作的简单示例,并且看到了一些漂亮的功能,但是在大多数情况下,它与任何其他Queue实现都非常相似。

  因此,让我们深入探讨一些更高级的主题。

  好吧,虽然说得高级,但是很多事情还是很简单。不过,为了获取更多有价值的信息,还是值得我们去探索的!

  2、读写分离

  大家听过最多的可能是数据库的读写分离,嗯嗯,很多概念都很相似的,正所谓,英雄所见略同,略同而已!

  如果您曾经在两个类之间共享过队列,那么您肯定知道两个类都可以读取/写入队列,即使它们不应该读取/写入。例如 :

  下面的例子就是没法做到真正读写分离的例子。

  class MyProducer

  {

  private readonly Queue _queue;

  public MyProducer(Queue queue)

  {

  _queue=queue;

  }

  }

  class MyConsumer

  {

  private readonly Queue _queue;

  public MyConsumer(Queue queue)

  {

  _queue=queue;

  }

  }

  12345678910111213141516171819

  因此,尽管生产者应该只写队列,而消费者应该只读队列,但在使用Queue实现的生产、消费者类中,他们都可以对队列进行所有操作。

  尽管你可以告知开发人员别那么做,但是可能会有一个任性的/心情糟糕的开发人员,和你对着干,也可能他们根本没听到心里,当然只有二手手游账号交易代码审查才能阻止他们犯此错误。

  但是使用通道,我们就不必担心这样的事情了。

  class Program

  {

  static async Task Main(string[] args)

  {

  var myChannel=Channel.CreateUnbounded();

  var producer=new MyProducer(myChannel.Writer);

  var consumer=new MyConsumer(myChannel.Reader);

  }

  }

  class MyProducer

  {

  private readonly ChannelWriter _channelWriter;

  public MyProducer(ChannelWriter channelWriter)

  {

  _channelWriter=channelWriter;

  }

  }

  class MyConsumer

  {

  private readonly ChannelReader _channelReader;

  public MyConsumer(ChannelReader channelReader)

  {

  _channelReader=channelReader;

  }

  }

  1234567891011121314151617181920212223242526272829

  在这个例子中,我添加了一个main方法来向你展示如何创建writer/reader,非常简单。

  这里我们可以看到,对于生产者,我只传递给它一个ChannelWriter,所以它只能做写操作。

  对于消费者,我们传递给它一个ChannelReader,所以它只能读取。

  当然,这并不意味着其他开发人员不能修改代码并开始注入根Channel对象,或者同时传入

  ChannelWriter/ChannelReader,但这至少比之前的情况要好得多。

  3、关闭一个通道

  前面我们看到,当我们在通道上调用ReadAsync()时,它实际上会坐在那里等待消息,但是如果没有更多消息来了怎么办?

  也许这是一个一次性的批处理作业,并且批处理已完成。

  通常,对于.NET中的其他队列,我们?将必须传递某种共享的布尔值或CancellationToken。但是使用Channels,它更容易。

  例子为证:

  static async Task Main(string[] args)

  {

  var myChannel=Channel.CreateUnbounded();

  _=Task.Factory.StartNew(async ()=>

  {

  for (int i=0; i < 10; i++)

  {

  await myChannel.Writer.WriteAsync(i);

  }

  myChannel.Writerplete();

  });

  try

  {

  while (true)

  {

  var item=await myChannel.Reader.ReadAsync();

  Console.WriteLine(item);

  await Task.Delay(1000);

  }

  }catch(ChannelClosedException e)

  {

  Console.WriteLine("Channel was closed!");

  }

  }

  123456789101112131415161718192021222324252627

  以上例子,尽可能快地写入通道,然后完成它。

  然后,我们的消费者会在两次读取之间延迟1秒钟,以缓慢的速度进行读取。

  请注意,我们捕获了ChannelClosedExecption,当您尝试在关闭后的通道中读取最终消息之后,将返回该异常。

  这样编写代码,我还想告诉你的是,在通道上调用Complete()不会立即关闭该通道,并销毁所有从该通道读取的内容。

  相反,这是一种通知所有读者的方法,即一旦读取了最后一条消息,便完成了。

  这点非常重要,因为这意味着在等待新消息时,在队列为空,队列已满等情况下是否调用Complete()都没关系。

  我们可以确定,我们将消费完所有可用的消息后结束。

  4 使用IAsyncEnumerable

  你有没有异常恐惧症? 是的,上述代码看起来不爽的地方,就是需要捕获异常!

  “就没有更好的方式吗?”

  “别急,有点有点,面包牛奶都会有的!”

  使用返回IAsyncEnumerable的命令“ ReadAllAsync()” 可以帮助我们解决了这些问题。代码看起来像这样:

  static async Task Main(string[] args)

  {

  var myChannel=Channel.CreateUnbounded();

  _=Task.Factory.StartNew(async ()=>

  {

  for (int i=0; i < 10; i++)

  {

  await myChannel.Writer.WriteAsync(i);

  }

  myChannel.Writerplete();

  });

  await foreach(var item in myChannel.Reader.ReadAllAsync())

  {

  Console.WriteLine(item);

  await Task.Delay(1000);

  }

  }

  1234567891011121314151617181920

  现在,真正实现了完美编程了。

  因为我们使用的是IAsyncEnumerable,所以我们仍然可以像以前一样等待每个项目,但是我们不再需要捕获异常,因为当通道完成时,它只是说它什么也没有了,并且优雅退出。

  同样,这消除了您在处理队列时曾经不得不编写的一些凌乱代码。以前您必须使用Breakout子句编写某种无限循环,而现在它只是一个真正的整洁循环!

  5、下一步是什么

  到目前为止,我们一直在使用“无限”通道。您可能已经猜到了,当然可以选择使用BoundedChannel。但是,这是什么?以及“背压”一词与之有何关系?查看本系列的下一部分,以更好地了解背压(Backpressure)。

目录
相关文章
|
7月前
|
开发框架 .NET C#
ASP.NET Core Blazor 路由配置和导航
大家好,我是码农刚子。本文系统介绍Blazor单页应用的路由机制,涵盖基础配置、路由参数、编程式导航及高级功能。通过@page指令定义路由,支持参数约束、可选参数与通配符捕获,结合NavigationManager实现页面跳转与参数传递,并演示用户管理、产品展示等典型场景,全面掌握Blazor路由从入门到实战的完整方案。
600 6
|
开发框架 .NET 开发者
简化 ASP.NET Core 依赖注入(DI)注册-Scrutor
Scrutor 是一个简化 ASP.NET Core 应用程序中依赖注入(DI)注册过程的开源库,支持自动扫描和注册服务。通过简单的配置,开发者可以轻松地从指定程序集中筛选、注册服务,并设置其生命周期,同时支持服务装饰等高级功能。适用于大型项目,提高代码的可维护性和简洁性。仓库地址:&lt;https://github.com/khellang/Scrutor&gt;
612 5
|
存储 开发框架 JSON
ASP.NET Core OData 9 正式发布
【10月更文挑战第8天】Microsoft 在 2024 年 8 月 30 日宣布推出 ASP.NET Core OData 9,此版本与 .NET 8 的 OData 库保持一致,改进了数据编码以符合 OData 规范,并放弃了对旧版 .NET Framework 的支持,仅支持 .NET 8 及更高版本。新版本引入了更快的 JSON 编写器 `System.Text.UTF8JsonWriter`,优化了内存使用和序列化速度。
364 0
|
开发框架 .NET C#
在 ASP.NET Core 中创建 gRPC 客户端和服务器
本文介绍了如何使用 gRPC 框架搭建一个简单的“Hello World”示例。首先创建了一个名为 GrpcDemo 的解决方案,其中包含一个 gRPC 服务端项目 GrpcServer 和一个客户端项目 GrpcClient。服务端通过定义 `greeter.proto` 文件中的服务和消息类型,实现了一个简单的问候服务 `GreeterService`。客户端则通过 gRPC 客户端库连接到服务端并调用其 `SayHello` 方法,展示了 gRPC 在 C# 中的基本使用方法。
434 5
在 ASP.NET Core 中创建 gRPC 客户端和服务器
|
开发框架 算法 中间件
ASP.NET Core 中的速率限制中间件
在ASP.NET Core中,速率限制中间件用于控制客户端请求速率,防止服务器过载并提高安全性。通过`AddRateLimiter`注册服务,并配置不同策略如固定窗口、滑动窗口、令牌桶和并发限制。这些策略可在全局、控制器或动作级别应用,支持自定义响应处理。使用中间件`UseRateLimiter`启用限流功能,并可通过属性禁用特定控制器或动作的限流。这有助于有效保护API免受滥用和过载。 欢迎关注我的公众号:Net分享 (239字符)
393 1
|
开发框架 缓存 .NET
GraphQL 与 ASP.NET Core 集成:从入门到精通
本文详细介绍了如何在ASP.NET Core中集成GraphQL,包括安装必要的NuGet包、创建GraphQL Schema、配置GraphQL服务等步骤。同时,文章还探讨了常见问题及其解决方法,如处理复杂查询、错误处理、性能优化和实现认证授权等,旨在帮助开发者构建灵活且高效的API。
460 3
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
291 3
|
开发框架 .NET 中间件
ASP.NET Core Web 开发浅谈
本文介绍ASP.NET Core,一个轻量级、开源的跨平台框架,专为构建高性能Web应用设计。通过简单步骤,你将学会创建首个Web应用。文章还深入探讨了路由配置、依赖注入及安全性配置等常见问题,并提供了实用示例代码以助于理解与避免错误,帮助开发者更好地掌握ASP.NET Core的核心概念。
400 3
|
开发框架 NoSQL .NET
利用分布式锁在ASP.NET Core中实现防抖
【9月更文挑战第5天】在 ASP.NET Core 中,可通过分布式锁实现防抖功能,仅处理连续相同请求中的首个请求,其余请求返回 204 No Content,直至锁释放。具体步骤包括:安装分布式锁库如 `StackExchange.Redis`;创建分布式锁服务接口及其实现;构建防抖中间件;并在 `Startup.cs` 中注册相关服务和中间件。这一机制有效避免了短时间内重复操作的问题。
398 4