前提提要代码 此部分详细代码见Netty实现WebSocket原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public void run () throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128 ) .option(ChannelOption.SO_KEEPALIVE, true ) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new IdleStateHandler (30 , 0 , 0 )); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new ChunkedWriteHandler ()); pipeline.addLast(new HttpObjectAggregator (8192 )); pipeline.addLast(new WebSocketServerProtocolHandler ("/" )); pipeline.addLast(new NettyWebSocketServerHandler ()); } }); serverBootstrap.bind(WEB_SOCKET_PORT).sync(); }
心跳包 如果用户突然关闭网页,是不会有断开通知给服务端的。那么服务端永远感知不到用户下线。因此需要客户端维持一个心跳,当指定时间没有心跳,服务端主动断开,进行用户下线操作。
直接接入netty的现有组件new IdleStateHandler(30, 0, 0)
可以实现30秒链接没有读请求,就主动关闭链接。我们的web前端需要保持每10s发送一个心跳包。
在此部分的代码第34行, WebSocket自带有一个心跳检测组件 如下
1 2 pipeline.addLast(new IdleStateHandler (30 , 0 , 0 ));
第一个参数: 读空闲时间
前端主动请求, 给客户端主动发送一条消息, 如果在30秒内有新消息则为正常状态, 没用则关闭连接
第二个参数: 写空闲时间
后端有没有主动给前端写消息, 一般适用于客户端, 客户端加了心跳, 则客户端会自己主动写心跳
测试心跳 此项与前面提到的文章代码一致(在 #自定义握手事件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { log.info("[握手完成]" ); } else if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("[读空闲]" ); ctx.channel().close(); } } }
在设置了前面的第一个参数后, 启动服务, 连接WebSocket然后等待30秒则会输出日志: [读空闲]
结尾 其实就是, 在设定的30秒内没有新消息, 就会发送一个事件, 在追踪IdleStateHandler 源码可以发现, 它有一些读消息的地方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (this .readerIdleTimeNanos > 0L || this .allIdleTimeNanos > 0L ) { this .reading = true ; this .firstReaderIdleEvent = this .firstAllIdleEvent = true ; } ctx.fireChannelRead(msg); } public void channelReadComplete (ChannelHandlerContext ctx) throws Exception { if ((this .readerIdleTimeNanos > 0L || this .allIdleTimeNanos > 0L ) && this .reading) { this .lastReadTime = this .ticksInNanos(); this .reading = false ; } ctx.fireChannelReadComplete(); }
其实就是读Channel, 记录最后一次读的时间与之当前时间进行对比, 最后一次读的时间的更新通过定时任务 完成, 通过追踪lastReadTime属性
可以发现源码中存在如下的定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private final class ReaderIdleTimeoutTask extends AbstractIdleTask { ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { super (ctx); } @Override protected void run (ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0 ) { readerIdleTimeout = schedule(ctx, this , readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false ; try { IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { readerIdleTimeout = schedule(ctx, this , nextDelay, TimeUnit.NANOSECONDS); } } }
在每一次有新消息进来的时候, 我们会更新最后一次事件时间(此项详情在schedule定时任务), 最后组成心跳机制
心跳处理器只是发出事件, 具体处理交给我们自己, 例如笔者样例代码中存在一处为// todo 用户下线, 所以还能对用户下线进行业务逻辑处理, 那么这样(只是通过发出事件)的好处可以解耦
写在最后 为什么我们使用netty, 因为它以及封装了很多实用的组件了, 以至于没有使用原生的服务