Netty实现WebSocket原理

哪怕明天进步一点点,也比原地踏步或退步要好

前置提要

Netty WebSocket Server代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Slf4j
@Configuration
public class NettyWebSocketServer {
// 开启netty服务端口8090
public static final int WEB_SOCKET_PORT = 8090;
// 创建线程池执行器
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());

/**
* 启动 ws server
*
* @return
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
run();
}

/**
* 销毁
*/
@PreDestroy
public void destroy() {
Future<?> future = bossGroup.shutdownGracefully();
Future<?> future1 = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
future1.syncUninterruptibly();
log.info("关闭 ws server 成功");
}

public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//30秒客户端没有向服务器发送心跳则关闭连接
pipeline.addLast(new IdleStateHandler(30, 0, 0));
// 因为使用http协议,所以需要使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 chunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 说明:
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(8192));
//保存用户ip
// pipeline.addLast(new HttpHeadersHandler());
/**
* 说明:
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
* 2. 可以看到 WebSocketFrame 下面有6个子类
* 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
* 是通过一个状态码 101 来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
// 自定义handler ,处理业务逻辑
pipeline.addLast(new NettyWebSocketServerHandler());
}
});
// 启动服务器,监听端口,阻塞直到启动成功
serverBootstrap.bind(WEB_SOCKET_PORT).sync();
}
}

分析

优雅销毁服务

在代码第25行的destroy方法

1
2
3
4
5
6
7
8
@PreDestroy
public void destroy() {
Future<?> future = bossGroup.shutdownGracefully();
Future<?> future1 = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
future1.syncUninterruptibly();
log.info("关闭 ws server 成功");
}

也就是把连接先进行关闭, 然后再进行等待片刻。

服务结构

重写run方法实现中,在前提代码中第44行

ChannelPipeline pipeline = socketChannel.pipeline();

管道,每一个Netty服务都有一个对应的ChannelPipeline管道,此部分一般用于实现添加对应的处理器。例如HttpServer、WebSocket等处理器

此处我们以WebSocket处理器为例(67行),其代码如下:

1
pipeline.addLast(new WebSocketServerProtocolHandler("/"));

追踪处理器(追踪进去可以找到后面的方法:handlerAdded),在其源码中不难发现有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
// Add the WebSocketHandshakeHandler before this one.
// 处理握手:WebSocketServerProtocolHandshakeHandler
cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
new WebSocketServerProtocolHandshakeHandler(serverConfig));
}
if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
// Add the UFT8 checking before this one.
cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
new Utf8FrameValidator(serverConfig.decoderConfig().closeOnProtocolViolation()));
}
}

通过追踪 握手处理器类WebSocketServerProtocolHandshakeHandler:

可以发现有一个方法为: channelRead,具体如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
final HttpObject httpObject = (HttpObject) msg;
// 如果是Http请求
if (httpObject instanceof HttpRequest) {
final HttpRequest req = (HttpRequest) httpObject;
isWebSocketPath = isWebSocketPath(req);
if (!isWebSocketPath) {
ctx.fireChannelRead(msg);
return;
}

try {
if (!GET.equals(req.method())) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0)));
return;
}
// 组装握手响应
final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
serverConfig.subprotocols(), serverConfig.decoderConfig());
// 创建一个握手的响应类
final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
final ChannelPromise localHandshakePromise = handshakePromise;
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
// Ensure we set the handshaker and replace this handler before we
// trigger the actual handshake. Otherwise we may receive websocket bytes in this handler
// before we had a chance to replace it.
//
// See https://github.com/netty/netty/issues/9471.
WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);
// 😀从请求管道中移除自己,我们WebSocket连接只需要在第一次连接的时候使用然后进行升级,随后移除
ctx.pipeline().remove(this);
// 握手的响应类会通过handshaker进行请求的响应
final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
// 事件监听:对失败与成功情况进行处理
if (!future.isSuccess()) {
localHandshakePromise.tryFailure(future.cause());
ctx.fireExceptionCaught(future.cause());
} else {
localHandshakePromise.trySuccess();
// Kept for compatibility
ctx.fireUserEventTriggered(
WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
ctx.fireUserEventTriggered(
// 成功则发送一个下面的握手事件体
new WebSocketServerProtocolHandler.HandshakeComplete(
req.uri(), req.headers(), handshaker.selectedSubprotocol()));
}
}
});
applyHandshakeTimeout();
}
} finally {
ReferenceCountUtil.release(req);
}
} else if (!isWebSocketPath) {
ctx.fireChannelRead(msg);
} else {
ReferenceCountUtil.release(msg);
}
}

上述的代码样例中有注释解释代码,下面进行说明

  • 第五行:if (httpObject instanceof HttpRequest)

判定请求是否为Http请求

  • 第十八行:组装握手响应
  • 第二十二行:创建一个握手的响应类

为后面的响应进行处理

  • ◈第三十四行:移除自己

从请求管道中移除自己,我们WebSocket连接只需要在第一次连接的时候使用然后进行升级,随后移除。因为只在第一次使用(升级)的时候使用到了,后续不需要哟到

  • 第三十六行:

握手的响应类会通过handshaker进行请求的响应

  • 第五十一行:发送请求

如果事件成功则发送一个握手事件体

重写自定义握手事件

基于前面最后分析的发送握手事件体,则有如下实现(通过重写userEventTriggered方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

/**
* 如果需要在握手的时候进行认证,则可以将认证逻辑填写至此处
* @param ctx
* @param evt 事件类型
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 如果事件是一个WebSocket握手的事件
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
System.out.println("握手完成");
} else if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
System.out.println("读空闲");
//todo 用户下线
ctx.channel().close();
}
}
}


@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String text = msg.text();
System.out.println("text = " + text);
}
}

总结

Netty具体实现WebSocket是通过Http升级一次,依靠的是我们前面分析的处理器(WebSocketServerProtocolHandshakeHandler)帮我们进行连接的升级