标签:消息 async clone() ram its code 很多 读取 没有
在用 rust + tokio 的网络处理时,碰到一个很有意思的 tcp 连接并关闭的问题。
具体是这样的,首先一个 tcp stream 拆分(split)为 SplitSink + SplitStream 的方式,各自单独工作。
通过 SplitStream 读取到客户端的请求,进行处理并发送给用户,同时,由于服务器会广播消息,因此使用一个 mpsc 的通道,把收到的东西整合发到 SplitSink。
代码如下:
let event_rx = ... // 服务器广播
...
let (sink, mut stream) = framed.split(); // 拆分
let (tx, rx) = unbounded(); // 通道,mpsc
...
// 客户端消息处理
let tx_clone = tx.clone();
tokio::spawn(async move {
let mut tx = tx_clone;
while let Some(Ok(req)) = stream.next().await {
let resp = handle(req);
tx.send(resp).await.expecte("Channel failed");
}
});
// 广播处理
tokio::spawn(event_rx.forward(tx));
// 发送回客户端
tokio::spawn(rx.map(|s| Ok(Bytes::from(s))).forward(sink));
本来好像也没什么问题,但是用 netcat 连上访问时,就发现有问题
nc 127.0.0.1 8080
...
用 ctrl+c
强制结束后,用 lsof -i -n -P
看一下监听的端口,发现有很多连接处于 CLOSE_WAIT
状态,即半关闭状态。
从 tcp 的状态转换来看,当客户端主动关闭时,会发个 FIN 给服务器,服务器进行 CLOSE_WAIT
,如果此时服务正在忙着输出或其它事情,就会卡在这个状态。
问题就在广播处理上,尽管 SplitStream 已进行了关闭(可在 while 后加个日志确定),但是广播还在继续,因此输出的 SplitSink 并没有关闭。
因此,必须让广播中断,或 SplitSink 知道已完成了输出。
方法1,通道中用 Option ,然后在 Sink 转换为 Err
// 客户端消息处理
let resp = handle(req);
tx.send(Some(resp)).await.expect("Channel failed");
// 发送回客户端
tokio::spawn(rx.map(|s| {
match s {
Some(s) => Ok(Bytes::from(s)),
None => Err(io::ErrorKink::Other.into()) // None 时中断
}
}).forward(sink));
方法2,在广播时进行处理
let close_flag = Arc::new(AtomicBool::new(false));
let close_flag_clone = close_flag.clone();
// 客户端消息处理
while ... {
let resp = handle(req);
tx.send(resp).await.expect("Channel failed");
}
close_flag_clone.store(false, Ordering::Relaxed);
// 广播消息处理
tokio::spawn(async move {
while let Some(event) = event_rx.next().await {
if close_flag_clone.load(Ordering::Relaxed) {
break;
}
...
}
});
2种方法,都可以停止相应的输出,解决因为存在广播其它的处理等导致半连接问题。
标签:消息 async clone() ram its code 很多 读取 没有
原文地址:https://www.cnblogs.com/fengyc/p/12783539.html