1、复习
在我们以前的文章中,我们看了一些关于Channels如何工作的简单示例,并且看到了一些漂亮的功能,但是在大多数情况下,它与任何其他Queue实现都非常相似。
因此,让我们深入探讨一些更高级的主题。
好吧,虽然说得高级,但是很多事情还是很简单。不过,为了获取更多有价值的信息,还是值得我们去探索的!
2、读写分离
大家听过最多的可能是数据库的读写分离,嗯嗯,很多概念都很相似的,正所谓,英雄所见略同,略同而已!
如果您曾经在两个类之间共享过队列,那么您肯定知道两个类都可以读取/写入队列,即使它们不应该读取/写入。例如 :
下面的例子就是没法做到真正读写分离的例子。
class MyProducer
{
private readonly Queue _queue;
public MyProducer(Queue queue)
{
_queue=queue;
}
}
class MyConsumer
{
private readonly Queue _queue;
public MyConsumer(Queue queue)
{
_queue=queue;
}
}
12345678910111213141516171819
因此,尽管生产者应该只写队列,而消费者应该只读队列,但在使用Queue实现的生产、消费者类中,他们都可以对队列进行所有操作。
尽管你可以告知开发人员别那么做,但是可能会有一个任性的/心情糟糕的开发人员,和你对着干,也可能他们根本没听到心里,当然只有二手手游账号交易代码审查才能阻止他们犯此错误。
但是使用通道,我们就不必担心这样的事情了。
class Program
{
static async Task Main(string[] args)
{
var myChannel=Channel.CreateUnbounded();
var producer=new MyProducer(myChannel.Writer);
var consumer=new MyConsumer(myChannel.Reader);
}
}
class MyProducer
{
private readonly ChannelWriter _channelWriter;
public MyProducer(ChannelWriter channelWriter)
{
_channelWriter=channelWriter;
}
}
class MyConsumer
{
private readonly ChannelReader _channelReader;
public MyConsumer(ChannelReader channelReader)
{
_channelReader=channelReader;
}
}
1234567891011121314151617181920212223242526272829
在这个例子中,我添加了一个main方法来向你展示如何创建writer/reader,非常简单。
这里我们可以看到,对于生产者,我只传递给它一个ChannelWriter,所以它只能做写操作。
对于消费者,我们传递给它一个ChannelReader,所以它只能读取。
当然,这并不意味着其他开发人员不能修改代码并开始注入根Channel对象,或者同时传入
ChannelWriter/ChannelReader,但这至少比之前的情况要好得多。
3、关闭一个通道
前面我们看到,当我们在通道上调用ReadAsync()时,它实际上会坐在那里等待消息,但是如果没有更多消息来了怎么办?
也许这是一个一次性的批处理作业,并且批处理已完成。
通常,对于.NET中的其他队列,我们?将必须传递某种共享的布尔值或CancellationToken。但是使用Channels,它更容易。
例子为证:
static async Task Main(string[] args)
{
var myChannel=Channel.CreateUnbounded();
_=Task.Factory.StartNew(async ()=>
{
for (int i=0; i < 10; i++)
{
await myChannel.Writer.WriteAsync(i);
}
myChannel.Writerplete();
});
try
{
while (true)
{
var item=await myChannel.Reader.ReadAsync();
Console.WriteLine(item);
await Task.Delay(1000);
}
}catch(ChannelClosedException e)
{
Console.WriteLine("Channel was closed!");
}
}
123456789101112131415161718192021222324252627
以上例子,尽可能快地写入通道,然后完成它。
然后,我们的消费者会在两次读取之间延迟1秒钟,以缓慢的速度进行读取。
请注意,我们捕获了ChannelClosedExecption,当您尝试在关闭后的通道中读取最终消息之后,将返回该异常。
这样编写代码,我还想告诉你的是,在通道上调用Complete()不会立即关闭该通道,并销毁所有从该通道读取的内容。
相反,这是一种通知所有读者的方法,即一旦读取了最后一条消息,便完成了。
这点非常重要,因为这意味着在等待新消息时,在队列为空,队列已满等情况下是否调用Complete()都没关系。
我们可以确定,我们将消费完所有可用的消息后结束。
4 使用IAsyncEnumerable
你有没有异常恐惧症? 是的,上述代码看起来不爽的地方,就是需要捕获异常!
“就没有更好的方式吗?”
“别急,有点有点,面包牛奶都会有的!”
使用返回IAsyncEnumerable的命令“ ReadAllAsync()” 可以帮助我们解决了这些问题。代码看起来像这样:
static async Task Main(string[] args)
{
var myChannel=Channel.CreateUnbounded();
_=Task.Factory.StartNew(async ()=>
{
for (int i=0; i < 10; i++)
{
await myChannel.Writer.WriteAsync(i);
}
myChannel.Writerplete();
});
await foreach(var item in myChannel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
await Task.Delay(1000);
}
}
1234567891011121314151617181920
现在,真正实现了完美编程了。
因为我们使用的是IAsyncEnumerable,所以我们仍然可以像以前一样等待每个项目,但是我们不再需要捕获异常,因为当通道完成时,它只是说它什么也没有了,并且优雅退出。
同样,这消除了您在处理队列时曾经不得不编写的一些凌乱代码。以前您必须使用Breakout子句编写某种无限循环,而现在它只是一个真正的整洁循环!
5、下一步是什么
到目前为止,我们一直在使用“无限”通道。您可能已经猜到了,当然可以选择使用BoundedChannel。但是,这是什么?以及“背压”一词与之有何关系?查看本系列的下一部分,以更好地了解背压(Backpressure)。