看看大神的文章
//创建一个buffer大小为maxMessagesToBuffer的channel
var channel = Channel.CreateBounded<Envelope>(maxMessagesToBuffer);
//创建处理consumer的TaskList,名称为consumerTasks
var consumerTasks = Enumerable.Range(1, consumersCount)
.Select(i =>
new Consumer(channel.Reader, i)
.BeginConsumeAsync(cancellationToken))
.ToArray();
//创建产生消息的生产者列表并立即运行
var producers = Enumerable.Range(1, producersCount)
.Select(i => new Producer(channel.Writer, i))
.ToArray();
int index = 0;
//下面这么多代码是为了让多个producer运行的时候好区分,index是用来区分
var tasks = Enumerable.Range(1, messagesCount)
.Select(i =>
{
index = ++index % producersCount;
var producer = producers[index];
var msg = new Envelope($"message {i}");
return producer.PublishAsync(msg, tokenSource.Token);
})
.ToArray();
await Task.WhenAll(tasks);
channel.Writer.Complete();
await channel.Reader.Completion;
tokenSource.Cancel();
//等待消费者任务列表consumerTasks执行完成
await Task.WhenAll(consumerTasks.ToList());