前置提要
Netty WebSocket Server代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| @Slf4j @Configuration public class NettyWebSocketServer { public static final int WEB_SOCKET_PORT = 8090; private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());
@PostConstruct public void start() throws InterruptedException { run(); }
@PreDestroy public void destroy() { Future<?> future = bossGroup.shutdownGracefully(); Future<?> future1 = workerGroup.shutdownGracefully(); future.syncUninterruptibly(); future1.syncUninterruptibly(); log.info("关闭 ws server 成功"); }
public void run() throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new IdleStateHandler(30, 0, 0)); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8192));
pipeline.addLast(new WebSocketServerProtocolHandler("/")); pipeline.addLast(new NettyWebSocketServerHandler()); } }); serverBootstrap.bind(WEB_SOCKET_PORT).sync(); } }
|
分析
优雅销毁服务
在代码第25行的destroy方法
1 2 3 4 5 6 7 8
| @PreDestroy public void destroy() { Future<?> future = bossGroup.shutdownGracefully(); Future<?> future1 = workerGroup.shutdownGracefully(); future.syncUninterruptibly(); future1.syncUninterruptibly(); log.info("关闭 ws server 成功"); }
|
也就是把连接先进行关闭, 然后再进行等待片刻。
服务结构
重写run方法实现中,在前提代码中第44行
ChannelPipeline pipeline = socketChannel.pipeline();
管道,每一个Netty服务都有一个对应的ChannelPipeline管道,此部分一般用于实现添加对应的处理器。例如HttpServer、WebSocket等处理器
此处我们以WebSocket处理器为例(67行),其代码如下:
1
| pipeline.addLast(new WebSocketServerProtocolHandler("/"));
|
追踪处理器(追踪进去可以找到后面的方法:handlerAdded),在其源码中不难发现有:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Override public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline cp = ctx.pipeline(); if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) { cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(), new WebSocketServerProtocolHandshakeHandler(serverConfig)); } if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) { cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(), new Utf8FrameValidator(serverConfig.decoderConfig().closeOnProtocolViolation())); } }
|
通过追踪 握手处理器类WebSocketServerProtocolHandshakeHandler:
可以发现有一个方法为: channelRead,具体如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { final HttpObject httpObject = (HttpObject) msg; if (httpObject instanceof HttpRequest) { final HttpRequest req = (HttpRequest) httpObject; isWebSocketPath = isWebSocketPath(req); if (!isWebSocketPath) { ctx.fireChannelRead(msg); return; }
try { if (!GET.equals(req.method())) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0))); return; } final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()), serverConfig.subprotocols(), serverConfig.decoderConfig()); final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); final ChannelPromise localHandshakePromise = handshakePromise; if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker); ctx.pipeline().remove(this); final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); handshakeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { localHandshakePromise.tryFailure(future.cause()); ctx.fireExceptionCaught(future.cause()); } else { localHandshakePromise.trySuccess(); ctx.fireUserEventTriggered( WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); ctx.fireUserEventTriggered( new WebSocketServerProtocolHandler.HandshakeComplete( req.uri(), req.headers(), handshaker.selectedSubprotocol())); } } }); applyHandshakeTimeout(); } } finally { ReferenceCountUtil.release(req); } } else if (!isWebSocketPath) { ctx.fireChannelRead(msg); } else { ReferenceCountUtil.release(msg); } }
|
上述的代码样例中有注释解释代码,下面进行说明
- 第五行:
if (httpObject instanceof HttpRequest)
判定请求是否为Http请求
- 第十八行:组装握手响应
- 第二十二行:创建一个握手的响应类
为后面的响应进行处理
从请求管道中移除自己,我们WebSocket连接只需要在第一次连接的时候使用然后进行升级,随后移除。因为只在第一次使用(升级)的时候使用到了,后续不需要哟到
握手的响应类会通过handshaker进行请求的响应
如果事件成功则发送一个握手事件体
重写自定义握手事件
基于前面最后分析的发送握手事件体,则有如下实现(通过重写userEventTriggered方法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Slf4j public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { System.out.println("握手完成"); } else if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { System.out.println("读空闲"); ctx.channel().close(); } } }
@Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String text = msg.text(); System.out.println("text = " + text); } }
|
总结
Netty具体实现WebSocket是通过Http升级一次,依靠的是我们前面分析的处理器(WebSocketServerProtocolHandshakeHandler)帮我们进行连接的升级