聊天室引出该文
前置提要 Netty WebSocket Server代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 @Slf4j @Configuration public class NettyWebSocketServer { public static final int WEB_SOCKET_PORT = 8090 ; private EventLoopGroup bossGroup = new NioEventLoopGroup (1 ); private EventLoopGroup workerGroup = new NioEventLoopGroup (NettyRuntime.availableProcessors()); @PostConstruct public void start () throws InterruptedException { run(); } @PreDestroy public void destroy () { Future<?> future = bossGroup.shutdownGracefully(); Future<?> future1 = workerGroup.shutdownGracefully(); future.syncUninterruptibly(); future1.syncUninterruptibly(); log.info("关闭 ws server 成功" ); } public void run () throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128 ) .option(ChannelOption.SO_KEEPALIVE, true ) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new IdleStateHandler (30 , 0 , 0 )); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new ChunkedWriteHandler ()); pipeline.addLast(new HttpObjectAggregator (8192 )); pipeline.addLast(new WebSocketServerProtocolHandler ("/" )); pipeline.addLast(new NettyWebSocketServerHandler ()); } }); serverBootstrap.bind(WEB_SOCKET_PORT).sync(); } }
Netty实现WebSocket 优雅销毁服务 在代码第25行的destroy方法
1 2 3 4 5 6 7 8 @PreDestroy public void destroy () { Future<?> future = bossGroup.shutdownGracefully(); Future<?> future1 = workerGroup.shutdownGracefully(); future.syncUninterruptibly(); future1.syncUninterruptibly(); log.info("关闭 ws server 成功" ); }
也就是把连接先进行关闭, 然后再进行等待 片刻。
服务结构 重写run方法实现中,在前提代码中第44行
ChannelPipeline pipeline = socketChannel.pipeline();
管道,每一个Netty服务都有一个对应的ChannelPipeline管道,此部分一般用于实现添加对应的处理器 。例如HttpServer、WebSocket等处理器
此处我们以WebSocket处理器为例(67行),其代码如下:
1 pipeline.addLast(new WebSocketServerProtocolHandler ("/" ));
追踪处理器(追踪进去可以找到后面的方法:handlerAdded),在其源码中不难发现有:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void handlerAdded (ChannelHandlerContext ctx) { ChannelPipeline cp = ctx.pipeline(); if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null ) { cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(), new WebSocketServerProtocolHandshakeHandler (serverConfig)); } if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null ) { cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(), new Utf8FrameValidator (serverConfig.decoderConfig().closeOnProtocolViolation())); } }
通过追踪 握手处理器类WebSocketServerProtocolHandshakeHandler:
可以发现有一个方法为: channelRead ,具体如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 @Override public void channelRead (final ChannelHandlerContext ctx, Object msg) throws Exception { final HttpObject httpObject = (HttpObject) msg; if (httpObject instanceof HttpRequest) { final HttpRequest req = (HttpRequest) httpObject; isWebSocketPath = isWebSocketPath(req); if (!isWebSocketPath) { ctx.fireChannelRead(msg); return ; } try { if (!GET.equals(req.method())) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse (HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0 ))); return ; } final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory ( getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()), serverConfig.subprotocols(), serverConfig.decoderConfig()); final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); final ChannelPromise localHandshakePromise = handshakePromise; if (handshaker == null ) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker); ctx.pipeline().remove(this ); final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); handshakeFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) { if (!future.isSuccess()) { localHandshakePromise.tryFailure(future.cause()); ctx.fireExceptionCaught(future.cause()); } else { localHandshakePromise.trySuccess(); ctx.fireUserEventTriggered( WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); ctx.fireUserEventTriggered( new WebSocketServerProtocolHandler .HandshakeComplete( req.uri(), req.headers(), handshaker.selectedSubprotocol())); } } }); applyHandshakeTimeout(); } } finally { ReferenceCountUtil.release(req); } } else if (!isWebSocketPath) { ctx.fireChannelRead(msg); } else { ReferenceCountUtil.release(msg); } }
上述的代码样例中有注释解释代码,下面进行说明
第五行:if (httpObject instanceof HttpRequest)
判定请求是否为Http请求
第十八行:组装握手响应
第二十二行:创建一个握手的响应类
为后面的响应进行处理
从请求管道中移除自己,我们WebSocket连接只需要在第一次连接的时候使用然后进行升级,随后移除。因为只在第一次使用(升级)的时候使用到了,后续不需要哟到
握手的响应类会通过handshaker进行请求的响应
如果事件成功则发送一个握手事件体
重写自定义握手事件 基于前面最后分析的发送握手事件体,则有如下实现(通过重写userEventTriggered方法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler <TextWebSocketFrame> { @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { System.out.println("握手完成" ); } else if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { System.out.println("读空闲" ); ctx.channel().close(); } } } @Override protected void channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String text = msg.text(); System.out.println("text = " + text); } }
总结 Netty具体实现WebSocket是通过Http升级一次 ,依靠的是我们前面分析的处理器(WebSocketServerProtocolHandshakeHandler )帮我们进行连接的升级
Netty心跳原理 此部分详细代码见前置提要
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public void run () throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128 ) .option(ChannelOption.SO_KEEPALIVE, true ) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new IdleStateHandler (30 , 0 , 0 )); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new ChunkedWriteHandler ()); pipeline.addLast(new HttpObjectAggregator (8192 )); pipeline.addLast(new WebSocketServerProtocolHandler ("/" )); pipeline.addLast(new NettyWebSocketServerHandler ()); } }); serverBootstrap.bind(WEB_SOCKET_PORT).sync(); }
心跳包 如果用户突然关闭网页,是不会有断开通知给服务端的。那么服务端永远感知不到用户下线。因此需要客户端维持一个心跳,当指定时间没有心跳,服务端主动断开,进行用户下线操作。
直接接入netty的现有组件new IdleStateHandler(30, 0, 0)
可以实现30秒链接没有读请求,就主动关闭链接。我们的web前端需要保持每10s发送一个心跳包。
在此部分的代码第34行, WebSocket自带有一个心跳检测组件 如下
1 2 pipeline.addLast(new IdleStateHandler (30 , 0 , 0 ));
第一个参数: 读空闲时间
前端主动请求, 给客户端主动发送一条消息, 如果在30秒内有新消息则为正常状态, 没用则关闭连接
第二个参数: 写空闲时间
后端有没有主动给前端写消息, 一般适用于客户端, 客户端加了心跳, 则客户端会自己主动写心跳
测试心跳 此项与前面提到的文章代码一致(在 #自定义握手事件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { log.info("[握手完成]" ); } else if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("[读空闲]" ); ctx.channel().close(); } } }
在设置了前面的第一个参数后, 启动服务, 连接WebSocket然后等待30秒则会输出日志: [读空闲]
结尾 其实就是, 在设定的30秒内没有新消息, 就会发送一个事件, 在追踪IdleStateHandler 源码可以发现, 它有一些读消息的地方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (this .readerIdleTimeNanos > 0L || this .allIdleTimeNanos > 0L ) { this .reading = true ; this .firstReaderIdleEvent = this .firstAllIdleEvent = true ; } ctx.fireChannelRead(msg); } public void channelReadComplete (ChannelHandlerContext ctx) throws Exception { if ((this .readerIdleTimeNanos > 0L || this .allIdleTimeNanos > 0L ) && this .reading) { this .lastReadTime = this .ticksInNanos(); this .reading = false ; } ctx.fireChannelReadComplete(); }
其实就是读Channel, 记录最后一次读的时间与之当前时间进行对比, 最后一次读的时间的更新通过定时任务 完成, 通过追踪lastReadTime属性
可以发现源码中存在如下的定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private final class ReaderIdleTimeoutTask extends AbstractIdleTask { ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { super (ctx); } @Override protected void run (ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0 ) { readerIdleTimeout = schedule(ctx, this , readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false ; try { IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { readerIdleTimeout = schedule(ctx, this , nextDelay, TimeUnit.NANOSECONDS); } } }
在每一次有新消息进来的时候, 我们会更新最后一次事件时间(此项详情在schedule定时任务), 最后组成心跳机制
心跳处理器只是发出事件, 具体处理交给我们自己, 例如笔者样例代码中存在一处为// todo 用户下线, 所以还能对用户下线进行业务逻辑处理, 那么这样(只是通过发出事件)的好处可以解耦
写在最后 为什么我们使用netty, 因为它以及封装了很多实用的组件了, 以至于没有使用原生的服务