Netty处理WebSocket,Nginx配置

一.实时消息WebSocket

传统的 WEB 开发中,通常我们渲染数据或者请求增删改的时候,都需要通过发送 HTTP 请求。 而每次发送 HTTP 请求基本都需要经历下面的历程(其实和 TCP 大致相同): image-20190804143610798 而当我们需要一些实时消息的需求的时候,比如聊天或者消息推送,那么我们有一种做法就是。浏览器每隔几秒轮训一次服务器(因为不请求服务器就没法回复消息),走一下上面的步骤,然后服务器如果有数据响应数据,没有数据就响应空的数据。哇啊啊啊啊你看这个过程,如果我的后台系统这个账号刚好没什么生意,基本很少需要推送有人下单的消息给我,可是我还是要走这些流程,服务器也还是需要处理我这个没用的卖家的请求。 当然并不是这种方式没人采用,还是有的,因为开发简单。但是性能其实并不怎么样,而且还要浪费一个服务来处理这个请求。 这个问题看起来还是比较棘手的而且很迫切的一个需求,于是乎我们的 W3C 组织站了出来,为浏览器新增了一个新的协议 WebSocket 协议,在 HTML5 发布的时候新增进去的。 WebSocketHTTP 在同一层,利用 HTTP 向服务器请求升级 WebSocket,服务器应答即可将当前的连接升级为 WebSocket 连接。

1
2
3
4
5
6
7
GET ws://localhost:9999/ HTTP/1.1
Host: localhost
Upgrade: websocket # 请求升级WebSocket
Connection: Upgrade # 同样
Origin: http://localhost:8000
Sec-WebSocket-Key: client-random-string
Sec-WebSocket-Version: 13

服务器如果答应了,就会响应:

1
2
3
4
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: server-random-string

这时候相当于说,我浏览器和服务器经历了三次握手,然后在中间建设了一个管道。 这个管道,谁都可以随时使用,客户端和服务端都可以同时发送消息。也就是说在上面订单通知的例子中,我服务器收到了一个订单支付成功,即可立马发送一条消息给你客户端。

二.Netty处理WebSocket

OK,接下来轮到 Netty 来干活了。 我们知道 Netty 基本是由一堆 Handler 组成一个链条来处理请求的。

2.1 Netty的WebSocket解码器

所以我们组装 Pipeline 的时候需要安装一个 WebSocketServerProtocolHandler 处理器,他会将 WebSocket 请求处理并且传递给下一个 handler(一般是我们的业务处理器)。构造器需要指定 WebSocket 处理的 uri。 安装:pipeline.addLast(new WebSocketServerProtocolHandler("/")); 具体详细的引导安装下面再说。

2.2 编写我们自己的Handler

因为浏览器发送的数据也是二进制的进行,有自己的帧规则,而上一步我们安装了 Netty 提供的处理器以后,这时候我们已经可以取出来浏览器发送的字符串了,所以我们自己的处理器需要处理 TextWebSocketFrame。这是一个数据封装的包裹(前面说的 ByteBuf,只不过内部已经进行了翻译)所以我们可以很简单的根据业务需求封装一些请求类以及响应类,这里为了简单不做进一步封装。 我现在有个需求,连接了 Netty 服务器,发送一个消息后,假装服务器有通知需要发送,每隔 2 秒就给浏览器发送,浏览器打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@ChannelHandler.Sharable
public class NotifyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
/** 获取浏览器发送的消息 */
String text = msg.text();
System.out.println("接收到消息:" + text);

ctx.writeAndFlush(new TextWebSocketFrame(text));

new Thread(() -> {
while (true) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}

/** 每隔2秒发送一个消息 */
ctx.writeAndFlush(new TextWebSocketFrame("你有一个新的淘金订单,请尽快处理"));
}
}).start();
}
}

其实应该不是很难理解这个代码…

2.3 引导安装服务器

我们知道 WebSocket 是通过 HTTP 来升级的,所以这时,HTTP 处理器还是不能少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class WebSocketApplication {

public static void main(String[] args) throws InterruptedException {
/** 创建 EventLoopGroup */
EventLoopGroup group = new NioEventLoopGroup();

try {
ServerBootstrap sb = new ServerBootstrap();
final NotifyHandler notifyHandler = new NotifyHandler();
sb.group(group)
/** 指定所使用的的 NIO 传输的 Channel */
.channel(NioServerSocketChannel.class)
/** 绑定服务器端口 */
.localAddress(9999)
/** 添加我们自己的业务处理 Handler 到子级的 Channel 的 ChannelPipeline中 */
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
/** 新增HTTP处理器,处理请求升级问题 */
socketChannel.pipeline().addLast(new HttpServerCodec());
socketChannel.pipeline().addLast(new HttpObjectAggregator(64 * 1024));
/** 安装 WebSocket 处理器 */
socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/"));
/** 安装我自己的Handler */
socketChannel.pipeline().addLast(notifyHandler);
}
});
/** 异步绑定服务器,阻塞到直到绑定完成 */
ChannelFuture future = sb.bind().sync();
/** 获取 Channel 的 CloseFuture 阻塞到关闭完成 */
future.channel().closeFuture().sync();
} finally {
/** 关闭 EventLoopGroup 释放资源 */
group.shutdownGracefully().sync();
}
}

}

然后启动,进行测试。

2.4 测试WebSocket接口

在开发的时候发现了这个很有用的小工具 在线测试WebSocket OK,我们的端口是 9999,这时候只要在左侧窗口的连接地址写 ws://localhost:9999/,然后尝试写消息给服务器,让服务器不断的写消息给我们。

三.开发实时通知页面

服务端有了,现在需要页面来做对接呀。 直接上代码好了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>TEST-WEBSOCKET</title>
</head>
<body>

</body>

<script type="text/javascript">
let retry = null
let wbs = null
function openWebSocket() {
/** 创建WebSocket客户端 */
wbs = new WebSocket("ws://localhost:9999/")
/** 定义接收消息弹窗显示 */
wbs.onmessage = (evt) => {
/** 弹窗打印服务器发送的消息内容 */
alert(evt.data)
}
/** 定义连接打开后需要发送的消息 */
wbs.onopen = () => {
console.log("WebSocket连接已打开")
/** 打开连接即发送消息 */
wbs.send("HELLO");

/** 打开后如果重试还在 清楚重试对象 */
if (retry !== null) {
clearInterval(retry)
retry = null
}
}
/** 连接被关闭后需要做的事情,这里是关闭后重新轮询服务端是否已经准备好了,准备好了就打开连接 */
wbs.onclose = () => {
console.log('WebSocket被关闭了')
if (null === retry) {
retry = setInterval(function () {
console.log("尝试重连....")
openWebSocket()
}, 3000);
}
}
}

openWebSocket();

</script>

</html>

四.Nginx反向代理

Nginx 通常在生产中被用作反向代理服务器,什么意思咧,相当于请求进门,进门之前,证书什么的都在这里配置。 我跟往常一样在这里配置 WebSocket 服务端的证书呀反向代理服务器呀。 唯独没有关注连接超时被断开的问题,上了测试,我的 Web 项目居然过了 1 分钟就总是偷偷断开。导致后面用户接受不到订单的更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 升级请求
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 443 ssl;
ssl on ;
server_name wbs.liweidan.com;
ssl_certificate XXXXX.pem;
ssl_certificate_key XXXXX.key;
ssl_session_timeout 5m;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_prefer_server_ciphers on;
location / {
proxy_pass http://localhost:9999/;
proxy_http_version 1.1;
proxy_connect_timeout 4s;
proxy_read_timeout 1200s; # 20分钟没有接受请求才超时断开
proxy_send_timeout 12s; # 十二秒没有发送成功即发送超时
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
}
}

Emm,没错,有中文注释的是配置关键点。表示 20分钟 没有数据读取才关闭连接。 但是有些需求不需要关闭连接啊,怎么办,来个 PING PONG 心跳咯: 然而浏览器没有这个机制啊,而且只认得服务器发送的 PONG 帧。也就是说,我们需要服务端发送一个 PONG 他浏览器会返回一个 PING(怪怪的然而事实就是这样) 可是我不想服务器来做这个事情,因为毕竟连接的都有时间差嘛。所以我准备在前端发送 PING 服务端收到后返回 PONG 即可。 改一改我们自己的处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@ChannelHandler.Sharable
public class NotifyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
/** 获取浏览器发送的消息 */
String text = msg.text();
System.out.println("接收到消息:" + text);

if ("PING".equals(text)) {
ctx.writeAndFlush(new TextWebSocketFrame("PONG"));
return;
}

/** 省略代码 */
}
}

再改一改前端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
let retry = null
let ping = null
let wbs = null
function openWebSocket() {
/** 创建WebSocket客户端 */
wbs = new WebSocket("ws://localhost:9999/")
/** 定义接收消息弹窗显示 */
wbs.onmessage = (evt) => {
/** 弹窗打印服务器发送的消息内容 */
if (evt.data !== 'PONG') {
alert(evt.data);
} else {
console.log(evt.data)
}
}
/** 定义连接打开后需要发送的消息 */
wbs.onopen = () => {
console.log("WebSocket连接已打开")
/** 打开连接即发送消息 */
wbs.send("HELLO");

/** 打开后如果重试还在 清楚重试对象 */
if (retry !== null) {
clearInterval(retry)
retry = null
}

/** 连接后启动 PING,PONG 心跳 */
if (null === ping) {
/** 7分钟心跳一次,注意这个值需要设置的比 Nginx 设置的值要小,不然被关闭了就没意义了。 */
ping = setInterval(() => {
wbs.send("PING");
}, 60000 * 7)
}
}
/** 连接被关闭后需要做的事情,这里是关闭后重新轮询服务端是否已经准备好了,准备好了就打开连接 */
wbs.onclose = () => {
console.log('WebSocket被关闭了')
if (null === retry) {
retry = setInterval(function () {
console.log("尝试重连....")
openWebSocket()
}, 3000);
}
/** 连接被关闭后停止 PING-PONG 心跳 */
if (null !== ping) {
clearInterval(ping)
ping = null
}
}
}

openWebSocket();

测试:

WebSocket 相关内容完结。