17370845950

C# gRPC流式处理方法 C#如何实现服务器和客户端流
双向流式 RPC是gRPC中唯一支持客户端与服务器同时持续双向通信的模式,双方各自维护独立读写通道,可随时异步读写且互不阻塞,需在.proto中用两个stream关键字定义,并由应用层自行约定消息协议。

什么是 gRPC 的双向流式 RPC?

gRPC 的 BidirectionalStreamingRpc 是唯一支持服务器和客户端同时持续发消息的调用模式。它不是“先发后收”或“发一次收多次”,而是双方各自维护独立的读写通道,可随时

WriteAsync、随时 ReadAsync,彼此不阻塞。

常见误用是把它当“增强版客户端流”或“带回传的服务器流”——实际它没有隐含顺序约定,应用层必须自行定义协议(比如用 messageType 字段区分心跳、数据、结束信号)。

定义 .proto 时必须用 stream 修饰符两次

服务端流、客户端流、双向流在 .proto 中全靠 stream 出现次数区分,少一个关键字就生成完全不同的 C# 方法签名。

  • rpc Chat(stream ChatMessage) returns (ChatMessage) → 客户端流(IAsyncEnumerable 入,单次返回)
  • rpc Chat(ChatMessage) returns (stream ChatMessage) → 服务器流(单次入,IAsyncStreamReader 出)
  • rpc Chat(stream ChatMessage) returns (stream ChatMessage) → 双向流(两个 stream)→ 生成 Task Chat(IAsyncStreamReader, IServerStreamWriter, ServerCallContext)

如果生成的 C# 类里没看到 IServerStreamWriter 参数,八成是 .proto 少写了一个 stream

客户端发起双向流:别卡在 await foreach 里等响应

典型错误是写成:

var call = client.Chat();
await call.RequestStream.WriteAsync(new ChatMessage { Text = "hi" });
await foreach (var msg in call.ResponseStream.ReadAllAsync()) { /* ... */ } // ❌ 阻塞在此,后续 WriteAsync 不会执行

正确做法是并发驱动读写:

  • Task.Run(() => WriteLoop(call.RequestStream)) 单独发消息
  • await foreach 在主线程收消息
  • 或用 Channel + Writer/Reader 解耦生产和消费

注意 RequestStream.CompleteAsync() 必须显式调用,否则服务端 ReadAsync() 永远不会返回 null

服务端处理双向流:避免在循环里 await 所有操作

最易被忽略的是线程调度开销。下面这段代码在高并发下会迅速堆积 Task

while (await requestStream.ReadAsync(out var req)) {
    var resp = Process(req);
    await responseStream.WriteAsync(resp); // ❌ 每次 WriteAsync 都可能跨线程调度
}

优化方向:

  • 批量写入:缓存多个响应,用 WriteBatchAsync(需自实现或用 Channel 聚合)
  • 取消检查:在循环头部加 context.CancellationToken.ThrowIfCancellationRequested(),否则客户端断连后服务端仍空转
  • 异常隔离:ReadAsync 抛异常时,WriteAsync 可能已失败,不要假设响应一定送达

真正难调试的不是连接断开,而是某次 WriteAsync 因网络抖动超时后,后续所有写入都静默失败——因为 gRPC 的 IServerStreamWriter 不抛异常,只记录日志。