优化前:
10万并发请求导致系统崩溃
Redis连接池耗尽,响应延迟超过500ms
商品超卖问题频发
优化后:
- 使用Channel+Redis管道架构
- 支持100万并发请求,QPS突破120万
- 响应延迟稳定在10-20ms
- 彻底解决超卖问题(通过Redis Lua脚本原子操作)
- 七、关键经验总结
- 异步化一切:消除线程阻塞,释放系统资源
- 生产者-消费者模式:分离请求接收和处理逻辑
- 批量处理:减少Redis往返,提高吞吐量
- 背压控制:防止系统过载,优雅降级
- 连接池优化:合理配置Redis连接参数
通过这套"异步核武"方案,我们成功将系统性能提升了30倍,在秒杀活动中轻松应对百万级并发请求。记住:在高并发场景下,异步编程不是选项,而是必须
| 方案 | 吞吐量(QPS) | 平均延迟(ms) | 99%延迟(ms) | 线程数 |
|---|---|---|---|---|
| 同步Redis调用 | 12000 | 85 | 150 | 1000 |
| 异步Redis调用 | 35000 | 52 | 110 | 500 |
| Channel+异步Redis | 180000 | 28 | 65 | 200 |
| Channel+Redis管道+批量 | 1200000 | 12 | 35 | 100 |
单个请求处理
// 创建无界Channel
var requestChannel = Channel.CreateUnbounded<ProductRequest>(
new UnboundedChannelOptions { SingleWriter = false, SingleReader = false });
// 生产者:接收外部请求
async Task RequestProducer()
{
while (true)
{
var request = await ReceiveRequestAsync(); // 从网络接收请求
await requestChannel.Writer.WriteAsync(request);
}
}
// 消费者集群:处理请求
async Task ConsumerCluster(int consumerCount)
{
var consumers = Enumerable.Range(0, consumerCount)
.Select(_ => ConsumerWorker())
.ToList();
await Task.WhenAll(consumers);
}
async Task ConsumerWorker()
{
await foreach (var request in requestChannel.Reader.ReadAllAsync())
{
try
{
var stock = await GetProductStockAsync(request.ProductId);
await SendResponseAsync(request, stock);
}
catch (Exception ex)
{
Logger.LogError(ex, "处理请求失败");
}
}
}
批量请求处理
// 批量读取请求,提高吞吐量
async Task BatchConsumerWorker()
{
var buffer = new List<ProductRequest>(100);
while (await requestChannel.Reader.WaitToReadAsync())
{
buffer.Clear();
while (buffer.Count < 100 && requestChannel.Reader.TryRead(out var request))
{
buffer.Add(request);
}
if (buffer.Count > 0)
{
// 批量查询Redis
var productIds = buffer.Select(r => r.ProductId).ToList();
var stocks = await RedisBatchGetAsync(productIds);
// 批量响应
for (int i = 0; i < buffer.Count; i++)
{
await SendResponseAsync(buffer[i], stocks[i]);
}
}
}
}
Redis异步调用优化
连接池优化
// 创建高性能Redis连接
var multiplexer = ConnectionMultiplexer.Connect(new ConfigurationOptions
{
EndPoints = { "redis-server:6379" },
ConnectTimeout = 5000,
SyncTimeout = 5000,
AsyncTimeout = 5000,
AllowAdmin = true,
ConnectRetry = 3,
ResponseTimeout = 5000,
Ssl = false,
Password = "yourpassword",
DefaultDatabase = 0,
KeepAlive = 180,
AbortOnConnectFail = false,
ConfigCheckSeconds = 30,
Proxy = Proxy.None
});
// 获取专用数据库连接
var database = multiplexer.GetDatabase();
异步管道操作
// 使用Redis管道减少往返
async Task<List<int>> RedisBatchGetAsync(List<string> keys)
{
var batch = database.CreateBatch();
var tasks = new List<Task<RedisValue>>(keys.Count);
foreach (var key in keys)
{
tasks.Add(batch.StringGetAsync(key));
}
batch.Execute();
var results = await Task.WhenAll(tasks);
return results.Select(r => (int)r).ToList();
}
高级技巧
背压控制
// 使用有界Channel实现背压
var requestChannel = Channel.CreateBounded<ProductRequest>(
new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = false,
SingleReader = false
});
// 消费者过载保护
async Task ConsumerWorker()
{
await foreach (var request in requestChannel.Reader.ReadAllAsync())
{
try
{
// 检查系统负载
if (SystemLoadMonitor.IsOverloaded())
{
await SendThrottleResponseAsync(request);
continue;
}
// 正常处理请求
var stock = await GetProductStockAsync(request.ProductId);
await SendResponseAsync(request, stock);
}
catch (Exception ex)
{
Logger.LogError(ex, "处理请求失败");
}
}
}
异步限流
// 使用SemaphoreSlim实现并发控制
private readonly SemaphoreSlim _concurrencyLimiter = new SemaphoreSlim(500);
async Task<ProductStock> GetProductStockAsync(string productId)
{
await _concurrencyLimiter.WaitAsync();
try
{
return await database.StringGetAsync($"stock:{productId}");
}
finally
{
_concurrencyLimiter.Release();
}
}