Netty处理WebSocket,Nginx配置

一.实时消息WebSocket

传统的 WEB 开发中,通常我们渲染数据或者请求增删改的时候,都需要通过发送 HTTP 请求。

而每次发送 HTTP 请求基本都需要经历下面的历程(其实和 TCP 大致相同):

《Netty处理WebSocket,Nginx配置》

而当我们需要一些实时消息的需求的时候,比如聊天或者消息推送,那么我们有一种做法就是。浏览器每隔几秒轮训一次服务器(因为不请求服务器就没法回复消息),走一下上面的步骤,然后服务器如果有数据响应数据,没有数据就响应空的数据。哇啊啊啊啊你看这个过程,如果我的后台系统这个账号刚好没什么生意,基本很少需要推送有人下单的消息给我,可是我还是要走这些流程,服务器也还是需要处理我这个没用的卖家的请求。

当然并不是这种方式没人采用,还是有的,因为开发简单。但是性能其实并不怎么样,而且还要浪费一个服务来处理这个请求。

这个问题看起来还是比较棘手的而且很迫切的一个需求,于是乎我们的 W3C 组织站了出来,为浏览器新增了一个新的协议 WebSocket 协议,在 HTML5 发布的时候新增进去的。

WebSocketHTTP 在同一层,利用 HTTP 向服务器请求升级 WebSocket,服务器应答即可将当前的连接升级为 WebSocket 连接。

GET ws://localhost:9999/ HTTP/1.1
Host: localhost
Upgrade: websocket # 请求升级WebSocket
Connection: Upgrade # 同样
Origin: http://localhost:8000
Sec-WebSocket-Key: client-random-string
Sec-WebSocket-Version: 13

服务器如果答应了,就会响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: server-random-string

这时候相当于说,我浏览器和服务器经历了三次握手,然后在中间建设了一个管道。

这个管道,谁都可以随时使用,客户端和服务端都可以同时发送消息。也就是说在上面订单通知的例子中,我服务器收到了一个订单支付成功,即可立马发送一条消息给你客户端。

二.Netty处理WebSocket

OK,接下来轮到 Netty 来干活了。

我们知道 Netty 基本是由一堆 Handler 组成一个链条来处理请求的。

2.1 Netty的WebSocket解码器

所以我们组装 Pipeline 的时候需要安装一个 WebSocketServerProtocolHandler 处理器,他会将 WebSocket 请求处理并且传递给下一个 handler(一般是我们的业务处理器)。构造器需要指定 WebSocket 处理的 uri

安装:pipeline.addLast(new WebSocketServerProtocolHandler("/")); 具体详细的引导安装下面再说。

2.2 编写我们自己的Handler

因为浏览器发送的数据也是二进制的进行,有自己的帧规则,而上一步我们安装了 Netty 提供的处理器以后,这时候我们已经可以取出来浏览器发送的字符串了,所以我们自己的处理器需要处理 TextWebSocketFrame。这是一个数据封装的包裹(前面说的 ByteBuf,只不过内部已经进行了翻译)所以我们可以很简单的根据业务需求封装一些请求类以及响应类,这里为了简单不做进一步封装。

我现在有个需求,连接了 Netty 服务器,发送一个消息后,假装服务器有通知需要发送,每隔 2 秒就给浏览器发送,浏览器打印。

@ChannelHandler.Sharable
public class NotifyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    /** 获取浏览器发送的消息 */
    String text = msg.text();
    System.out.println("接收到消息:" + text);

    ctx.writeAndFlush(new TextWebSocketFrame(text));

    new Thread(() -> {
      while (true) {
        try {
          Thread.sleep(1000L);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }

        /** 每隔2秒发送一个消息 */
        ctx.writeAndFlush(new TextWebSocketFrame("你有一个新的淘金订单,请尽快处理"));
      }
    }).start();
  }
}

其实应该不是很难理解这个代码…

2.3 引导安装服务器

我们知道 WebSocket 是通过 HTTP 来升级的,所以这时,HTTP 处理器还是不能少。

public class WebSocketApplication {

  public static void main(String[] args) throws InterruptedException {
    /** 创建 EventLoopGroup */
    EventLoopGroup group = new NioEventLoopGroup();

    try {
      ServerBootstrap sb = new ServerBootstrap();
      final NotifyHandler notifyHandler = new NotifyHandler();
      sb.group(group)
              /** 指定所使用的的 NIO 传输的 Channel */
              .channel(NioServerSocketChannel.class)
              /** 绑定服务器端口 */
              .localAddress(9999)
              /** 添加我们自己的业务处理 Handler 到子级的 Channel 的 ChannelPipeline中  */
              .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                  /** 新增HTTP处理器,处理请求升级问题 */
                  socketChannel.pipeline().addLast(new HttpServerCodec());
                  socketChannel.pipeline().addLast(new HttpObjectAggregator(64 * 1024));
                  /** 安装 WebSocket 处理器 */
                  socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/"));
                  /** 安装我自己的Handler */
                  socketChannel.pipeline().addLast(notifyHandler);
                }
              });
      /** 异步绑定服务器,阻塞到直到绑定完成 */
      ChannelFuture future = sb.bind().sync();
      /** 获取 Channel 的 CloseFuture 阻塞到关闭完成 */
      future.channel().closeFuture().sync();
    } finally {
      /** 关闭 EventLoopGroup 释放资源 */
      group.shutdownGracefully().sync();
    }
  }

}

然后启动,进行测试。

2.4 测试WebSocket接口

在开发的时候发现了这个很有用的小工具 在线测试WebSocket

OK,我们的端口是 9999,这时候只要在左侧窗口的连接地址写 ws://localhost:9999/,然后尝试写消息给服务器,让服务器不断的写消息给我们。

《Netty处理WebSocket,Nginx配置》

三.开发实时通知页面

服务端有了,现在需要页面来做对接呀。

直接上代码好了:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>TEST-WEBSOCKET</title>
</head>
<body>

</body>

<script type="text/javascript">
    let retry = null
    let wbs = null
    function openWebSocket() {
        /** 创建WebSocket客户端 */
        wbs = new WebSocket("ws://localhost:9999/")
        /** 定义接收消息弹窗显示 */
        wbs.onmessage = (evt) => {
            /** 弹窗打印服务器发送的消息内容 */
            alert(evt.data)
        }
        /** 定义连接打开后需要发送的消息 */
        wbs.onopen = () => {
            console.log("WebSocket连接已打开")
            /** 打开连接即发送消息 */
            wbs.send("HELLO");

            /** 打开后如果重试还在 清楚重试对象 */
            if (retry !== null) {
                clearInterval(retry)
                retry = null
            }
        }
        /** 连接被关闭后需要做的事情,这里是关闭后重新轮询服务端是否已经准备好了,准备好了就打开连接 */
        wbs.onclose = () => {
            console.log('WebSocket被关闭了')
            if (null === retry) {
                retry = setInterval(function () {
                    console.log("尝试重连....")
                    openWebSocket()
                }, 3000);
            }
        }
    }

    openWebSocket();

</script>

</html>

《Netty处理WebSocket,Nginx配置》

四.Nginx反向代理

Nginx 通常在生产中被用作反向代理服务器,什么意思咧,相当于请求进门,进门之前,证书什么的都在这里配置。

我跟往常一样在这里配置 WebSocket 服务端的证书呀反向代理服务器呀。

唯独没有关注连接超时被断开的问题,上了测试,我的 Web 项目居然过了 1 分钟就总是偷偷断开。导致后面用户接受不到订单的更新。

# 升级请求
map $http_upgrade $connection_upgrade {  
    default upgrade;  
    '' close;  
}  
server {
    listen 443 ssl;
    ssl on ;
    server_name wbs.liweidan.com;
    ssl_certificate   XXXXX.pem;
    ssl_certificate_key  XXXXX.key;
    ssl_session_timeout 5m;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
    ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
    ssl_prefer_server_ciphers on;
    location / {
        proxy_pass http://localhost:9999/;  
        proxy_http_version 1.1;  
                proxy_connect_timeout 4s; 
        proxy_read_timeout 1200s; # 20分钟没有接受请求才超时断开
        proxy_send_timeout 12s; # 十二秒没有发送成功即发送超时
        proxy_set_header Upgrade $http_upgrade;  
        proxy_set_header Connection "Upgrade";  
    }
}

Emm,没错,有中文注释的是配置关键点。表示 20分钟 没有数据读取才关闭连接。

但是有些需求不需要关闭连接啊,怎么办,来个 PING PONG 心跳咯:

然而浏览器没有这个机制啊,而且只认得服务器发送的 PONG 帧。也就是说,我们需要服务端发送一个 PONG 他浏览器会返回一个 PING(怪怪的然而事实就是这样)

可是我不想服务器来做这个事情,因为毕竟连接的都有时间差嘛。所以我准备在前端发送 PING 服务端收到后返回 PONG 即可。

改一改我们自己的处理器:

@ChannelHandler.Sharable
public class NotifyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    /** 获取浏览器发送的消息 */
    String text = msg.text();
    System.out.println("接收到消息:" + text);

    if ("PING".equals(text)) {
      ctx.writeAndFlush(new TextWebSocketFrame("PONG"));
      return;
    }

    /** 省略代码 */
  }
}

再改一改前端:

let retry = null
let ping = null
let wbs = null
function openWebSocket() {
  /** 创建WebSocket客户端 */
  wbs = new WebSocket("ws://localhost:9999/")
  /** 定义接收消息弹窗显示 */
  wbs.onmessage = (evt) => {
    /** 弹窗打印服务器发送的消息内容 */
    if (evt.data !== 'PONG') {
      alert(evt.data);
    } else {
      console.log(evt.data)
    }
  }
  /** 定义连接打开后需要发送的消息 */
  wbs.onopen = () => {
    console.log("WebSocket连接已打开")
    /** 打开连接即发送消息 */
    wbs.send("HELLO");

    /** 打开后如果重试还在 清楚重试对象 */
    if (retry !== null) {
      clearInterval(retry)
      retry = null
    }

    /** 连接后启动 PING,PONG 心跳 */
    if (null === ping) {
      /** 7分钟心跳一次,注意这个值需要设置的比 Nginx 设置的值要小,不然被关闭了就没意义了。 */
      ping = setInterval(() => {
        wbs.send("PING");
      }, 60000 * 7)
    }
  }
  /** 连接被关闭后需要做的事情,这里是关闭后重新轮询服务端是否已经准备好了,准备好了就打开连接 */
  wbs.onclose = () => {
    console.log('WebSocket被关闭了')
    if (null === retry) {
      retry = setInterval(function () {
        console.log("尝试重连....")
        openWebSocket()
      }, 3000);
    }
    /** 连接被关闭后停止 PING-PONG 心跳 */
    if (null !== ping) {
      clearInterval(ping)
      ping = null
    }
  }
}

openWebSocket();

测试:

《Netty处理WebSocket,Nginx配置》

WebSocket 相关内容完结。

点赞