net.core System.Threading.Channels

看看大神的文章

//创建一个buffer大小为maxMessagesToBuffer的channel
var channel = Channel.CreateBounded<Envelope>(maxMessagesToBuffer);

//创建处理consumer的TaskList,名称为consumerTasks
var consumerTasks = Enumerable.Range(1, consumersCount)
		.Select(i => 
							new Consumer(channel.Reader, i)
							.BeginConsumeAsync(cancellationToken))
		.ToArray();

//创建产生消息的生产者列表并立即运行
var producers = Enumerable.Range(1, producersCount)
							.Select(i => new Producer(channel.Writer, i))
							.ToArray();
 int index = 0;
//下面这么多代码是为了让多个producer运行的时候好区分,index是用来区分
var tasks = Enumerable.Range(1, messagesCount)
		.Select(i =>
		{
				index = ++index % producersCount;
				var producer = producers[index];
				var msg = new Envelope($"message {i}");
				return producer.PublishAsync(msg, tokenSource.Token);
		})
		.ToArray();
await Task.WhenAll(tasks);
channel.Writer.Complete();
await channel.Reader.Completion;
tokenSource.Cancel();
//等待消费者任务列表consumerTasks执行完成
await Task.WhenAll(consumerTasks.ToList());

作者:spike

分类: Net

创作时间:2023-06-25

更新时间:2024-12-09

联系方式放在中括号之中例如[[email protected]],回复评论在开头加上标号例如:#1