Netty相关知识

聊天室引出该文

前置提要

Netty WebSocket Server代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Slf4j
@Configuration
public class NettyWebSocketServer {
// 开启netty服务端口8090
public static final int WEB_SOCKET_PORT = 8090;
// 创建线程池执行器
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());

/**
* 启动 ws server
*
* @return
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
run();
}

/**
* 销毁
*/
@PreDestroy
public void destroy() {
Future<?> future = bossGroup.shutdownGracefully();
Future<?> future1 = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
future1.syncUninterruptibly();
log.info("关闭 ws server 成功");
}

public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//30秒客户端没有向服务器发送心跳则关闭连接
pipeline.addLast(new IdleStateHandler(30, 0, 0));
// 因为使用http协议,所以需要使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 chunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 说明:
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(8192));
//保存用户ip
// pipeline.addLast(new HttpHeadersHandler());
/**
* 说明:
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
* 2. 可以看到 WebSocketFrame 下面有6个子类
* 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
* 是通过一个状态码 101 来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
// 自定义handler ,处理业务逻辑
pipeline.addLast(new NettyWebSocketServerHandler());
}
});
// 启动服务器,监听端口,阻塞直到启动成功
serverBootstrap.bind(WEB_SOCKET_PORT).sync();
}
}

Netty实现WebSocket

优雅销毁服务

在代码第25行的destroy方法

1
2
3
4
5
6
7
8
@PreDestroy
public void destroy() {
Future<?> future = bossGroup.shutdownGracefully();
Future<?> future1 = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
future1.syncUninterruptibly();
log.info("关闭 ws server 成功");
}

也就是把连接先进行关闭, 然后再进行等待片刻。

服务结构

重写run方法实现中,在前提代码中第44行

ChannelPipeline pipeline = socketChannel.pipeline();

管道,每一个Netty服务都有一个对应的ChannelPipeline管道,此部分一般用于实现添加对应的处理器。例如HttpServer、WebSocket等处理器

此处我们以WebSocket处理器为例(67行),其代码如下:

1
pipeline.addLast(new WebSocketServerProtocolHandler("/"));

追踪处理器(追踪进去可以找到后面的方法:handlerAdded),在其源码中不难发现有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
// Add the WebSocketHandshakeHandler before this one.
// 处理握手:WebSocketServerProtocolHandshakeHandler
cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
new WebSocketServerProtocolHandshakeHandler(serverConfig));
}
if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
// Add the UFT8 checking before this one.
cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
new Utf8FrameValidator(serverConfig.decoderConfig().closeOnProtocolViolation()));
}
}

通过追踪 握手处理器类WebSocketServerProtocolHandshakeHandler:

可以发现有一个方法为: channelRead,具体如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
final HttpObject httpObject = (HttpObject) msg;
// 如果是Http请求
if (httpObject instanceof HttpRequest) {
final HttpRequest req = (HttpRequest) httpObject;
isWebSocketPath = isWebSocketPath(req);
if (!isWebSocketPath) {
ctx.fireChannelRead(msg);
return;
}

try {
if (!GET.equals(req.method())) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0)));
return;
}
// 组装握手响应
final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
serverConfig.subprotocols(), serverConfig.decoderConfig());
// 创建一个握手的响应类
final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
final ChannelPromise localHandshakePromise = handshakePromise;
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
// Ensure we set the handshaker and replace this handler before we
// trigger the actual handshake. Otherwise we may receive websocket bytes in this handler
// before we had a chance to replace it.
//
// See https://github.com/netty/netty/issues/9471.
WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);
// 😀从请求管道中移除自己,我们WebSocket连接只需要在第一次连接的时候使用然后进行升级,随后移除
ctx.pipeline().remove(this);
// 握手的响应类会通过handshaker进行请求的响应
final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
// 事件监听:对失败与成功情况进行处理
if (!future.isSuccess()) {
localHandshakePromise.tryFailure(future.cause());
ctx.fireExceptionCaught(future.cause());
} else {
localHandshakePromise.trySuccess();
// Kept for compatibility
ctx.fireUserEventTriggered(
WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
ctx.fireUserEventTriggered(
// 成功则发送一个下面的握手事件体
new WebSocketServerProtocolHandler.HandshakeComplete(
req.uri(), req.headers(), handshaker.selectedSubprotocol()));
}
}
});
applyHandshakeTimeout();
}
} finally {
ReferenceCountUtil.release(req);
}
} else if (!isWebSocketPath) {
ctx.fireChannelRead(msg);
} else {
ReferenceCountUtil.release(msg);
}
}

上述的代码样例中有注释解释代码,下面进行说明

  • 第五行:if (httpObject instanceof HttpRequest)

判定请求是否为Http请求

  • 第十八行:组装握手响应
  • 第二十二行:创建一个握手的响应类

为后面的响应进行处理

  • ◈第三十四行:移除自己

从请求管道中移除自己,我们WebSocket连接只需要在第一次连接的时候使用然后进行升级,随后移除。因为只在第一次使用(升级)的时候使用到了,后续不需要哟到

  • 第三十六行:

握手的响应类会通过handshaker进行请求的响应

  • 第五十一行:发送请求

如果事件成功则发送一个握手事件体

重写自定义握手事件

基于前面最后分析的发送握手事件体,则有如下实现(通过重写userEventTriggered方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

/**
* 如果需要在握手的时候进行认证,则可以将认证逻辑填写至此处
* @param ctx
* @param evt 事件类型
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 如果事件是一个WebSocket握手的事件
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
System.out.println("握手完成");
} else if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
System.out.println("读空闲");
//todo 用户下线
ctx.channel().close();
}
}
}


@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String text = msg.text();
System.out.println("text = " + text);
}
}

总结

Netty具体实现WebSocket是通过Http升级一次,依靠的是我们前面分析的处理器(WebSocketServerProtocolHandshakeHandler)帮我们进行连接的升级

Netty心跳原理

此部分详细代码见前置提要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//30秒客户端没有向服务器发送心跳则关闭连接
pipeline.addLast(new IdleStateHandler(30, 0, 0));
// 因为使用http协议,所以需要使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 chunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 说明:
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(8192));
//保存用户ip
// pipeline.addLast(new HttpHeadersHandler());
/**
* 说明:
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
* 2. 可以看到 WebSocketFrame 下面有6个子类
* 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
* 是通过一个状态码 101 来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
// 自定义handler ,处理业务逻辑
pipeline.addLast(new NettyWebSocketServerHandler());
}
});
// 启动服务器,监听端口,阻塞直到启动成功
serverBootstrap.bind(WEB_SOCKET_PORT).sync();
}

心跳包

如果用户突然关闭网页,是不会有断开通知给服务端的。那么服务端永远感知不到用户下线。因此需要客户端维持一个心跳,当指定时间没有心跳,服务端主动断开,进行用户下线操作。

直接接入netty的现有组件new IdleStateHandler(30, 0, 0)可以实现30秒链接没有读请求,就主动关闭链接。我们的web前端需要保持每10s发送一个心跳包。

在此部分的代码第34行, WebSocket自带有一个心跳检测组件 如下

1
2
//30秒客户端没有向服务器发送心跳则关闭连接
pipeline.addLast(new IdleStateHandler(30, 0, 0)); // 读空闲时间 写空闲时间 全局时间
  1. 第一个参数: 读空闲时间

前端主动请求, 给客户端主动发送一条消息, 如果在30秒内有新消息则为正常状态, 没用则关闭连接

  1. 第二个参数: 写空闲时间

后端有没有主动给前端写消息, 一般适用于客户端, 客户端加了心跳, 则客户端会自己主动写心跳

测试心跳

此项与前面提到的文章代码一致(在 #自定义握手事件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 如果事件是一个WebSocket握手的事件
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
log.info("[握手完成]");
} else if (evt instanceof IdleStateEvent) {
// 心跳
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("[读空闲]");
//todo 用户下线
ctx.channel().close();
}
}
}

在设置了前面的第一个参数后, 启动服务, 连接WebSocket然后等待30秒则会输出日志: [读空闲]

结尾

其实就是, 在设定的30秒内没有新消息, 就会发送一个事件, 在追踪IdleStateHandler源码可以发现, 它有一些读消息的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (this.readerIdleTimeNanos > 0L || this.allIdleTimeNanos > 0L) {
this.reading = true;
this.firstReaderIdleEvent = this.firstAllIdleEvent = true;
}

ctx.fireChannelRead(msg);
}
// 读完成
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((this.readerIdleTimeNanos > 0L || this.allIdleTimeNanos > 0L) && this.reading) {
// 记录最后一次读的时间
this.lastReadTime = this.ticksInNanos(); // 此项通过定时任务实现
this.reading = false;
}

ctx.fireChannelReadComplete();
}

其实就是读Channel, 记录最后一次读的时间与之当前时间进行对比, 最后一次读的时间的更新通过定时任务完成, 通过追踪lastReadTime属性

可以发现源码中存在如下的定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}

@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
// 时间处理
nextDelay -= ticksInNanos() - lastReadTime;
}

if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
// 不论什么情况都会执行下一次周期的定时任务,通过线程池执行定时任务,具体定时多久读者自行查阅
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;

try {
// 事件通知, 构建一个读空闲时间
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}

在每一次有新消息进来的时候, 我们会更新最后一次事件时间(此项详情在schedule定时任务), 最后组成心跳机制

心跳处理器只是发出事件, 具体处理交给我们自己, 例如笔者样例代码中存在一处为// todo 用户下线, 所以还能对用户下线进行业务逻辑处理, 那么这样(只是通过发出事件)的好处可以解耦

写在最后

为什么我们使用netty, 因为它以及封装了很多实用的组件了, 以至于没有使用原生的服务