Netty心跳原理

前提提要代码

此部分详细代码见Netty实现WebSocket原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//30秒客户端没有向服务器发送心跳则关闭连接
pipeline.addLast(new IdleStateHandler(30, 0, 0));
// 因为使用http协议,所以需要使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 chunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 说明:
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(8192));
//保存用户ip
// pipeline.addLast(new HttpHeadersHandler());
/**
* 说明:
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
* 2. 可以看到 WebSocketFrame 下面有6个子类
* 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
* 是通过一个状态码 101 来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
// 自定义handler ,处理业务逻辑
pipeline.addLast(new NettyWebSocketServerHandler());
}
});
// 启动服务器,监听端口,阻塞直到启动成功
serverBootstrap.bind(WEB_SOCKET_PORT).sync();
}

心跳包

如果用户突然关闭网页,是不会有断开通知给服务端的。那么服务端永远感知不到用户下线。因此需要客户端维持一个心跳,当指定时间没有心跳,服务端主动断开,进行用户下线操作。

直接接入netty的现有组件new IdleStateHandler(30, 0, 0)可以实现30秒链接没有读请求,就主动关闭链接。我们的web前端需要保持每10s发送一个心跳包。

在此部分的代码第34行, WebSocket自带有一个心跳检测组件 如下

1
2
//30秒客户端没有向服务器发送心跳则关闭连接
pipeline.addLast(new IdleStateHandler(30, 0, 0)); // 读空闲时间 写空闲时间 全局时间
  1. 第一个参数: 读空闲时间

前端主动请求, 给客户端主动发送一条消息, 如果在30秒内有新消息则为正常状态, 没用则关闭连接

  1. 第二个参数: 写空闲时间

后端有没有主动给前端写消息, 一般适用于客户端, 客户端加了心跳, 则客户端会自己主动写心跳

测试心跳

此项与前面提到的文章代码一致(在 #自定义握手事件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 如果事件是一个WebSocket握手的事件
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
log.info("[握手完成]");
} else if (evt instanceof IdleStateEvent) {
// 心跳
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("[读空闲]");
//todo 用户下线
ctx.channel().close();
}
}
}

在设置了前面的第一个参数后, 启动服务, 连接WebSocket然后等待30秒则会输出日志: [读空闲]

结尾

其实就是, 在设定的30秒内没有新消息, 就会发送一个事件, 在追踪IdleStateHandler源码可以发现, 它有一些读消息的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (this.readerIdleTimeNanos > 0L || this.allIdleTimeNanos > 0L) {
this.reading = true;
this.firstReaderIdleEvent = this.firstAllIdleEvent = true;
}

ctx.fireChannelRead(msg);
}
// 读完成
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((this.readerIdleTimeNanos > 0L || this.allIdleTimeNanos > 0L) && this.reading) {
// 记录最后一次读的时间
this.lastReadTime = this.ticksInNanos(); // 此项通过定时任务实现
this.reading = false;
}

ctx.fireChannelReadComplete();
}

其实就是读Channel, 记录最后一次读的时间与之当前时间进行对比, 最后一次读的时间的更新通过定时任务完成, 通过追踪lastReadTime属性

可以发现源码中存在如下的定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}

@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
// 时间处理
nextDelay -= ticksInNanos() - lastReadTime;
}

if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
// 不论什么情况都会执行下一次周期的定时任务,通过线程池执行定时任务,具体定时多久读者自行查阅
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;

try {
// 事件通知, 构建一个读空闲时间
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}

在每一次有新消息进来的时候, 我们会更新最后一次事件时间(此项详情在schedule定时任务), 最后组成心跳机制

心跳处理器只是发出事件, 具体处理交给我们自己, 例如笔者样例代码中存在一处为// todo 用户下线, 所以还能对用户下线进行业务逻辑处理, 那么这样(只是通过发出事件)的好处可以解耦

写在最后

为什么我们使用netty, 因为它以及封装了很多实用的组件了, 以至于没有使用原生的服务