Netty组件以及入门体验

零.Netty

其实了解到 Netty 已经很久了,一直想用,但是因为之前的水平还不够格,回调事件 TCP 什么的还没感觉,所以学起来一头雾水,加上官网的文档,哎呀,官网貌似就没有文档只有示例代码,读不懂。

写了挺多的回调函数,渐渐地有了感觉(通常使用 CompleteFuture 来请求其他服务的数据信息,请求完在执行自己的业务)。其实我也不知道我做了什么,貌似什么没做就突然融会贯通了,所以我感觉理解回调还是蛮重要的一点吧。

突然看到自己的书本库有本书《Netty实战》翻起来阅读,还是蛮好的,这篇文章其实是我读这本书,加上自己的一些理解写出来的。

Netty 是什么应该没人不会知道吧,就是 Java 行业中一个能够顶级处理网络通讯的轻量级框架,如果公司在使用 Dubbo 或者 Thrift 的话,那么也是间接在使用 Netty 框架了。所以学一学无伤大雅还可以了解一些很有趣的东西。

一.Netty服务端

所有 Netty服务器 通常需要以下两部分:

  1. 至少一个 ChannelHandler 来接手客户端的数据以及处理数据;
  2. 引导服务器启动的配置,配置启动参数,这个就没啥好说的了。

ChannelHandlerNetty 中一个接口族的父接口,它主要负责接收和响应事件通知。

NettyChannelHandler 有很多默认实现,用来处理服务器中常见的数据传输问题。

因为服务器会响应传入的消息,所以需要实现 ChannelInboundHandler 接口,用来定义响应入站事件的方法。由于刚开始的程序只需要简单的方式即可,所以我们实现 ChannelInboundHandlerAdapter 即可,他提供了 ChannelInboundHandler 接口的默认实现。

我现在想要简单的实现一个服务,就是能够把把我发送的字符串,给反转过来,即发送 abc 服务器给我响应 cba

package cn.liweidan.nettydemo.demo01.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.Charset;

/**
 * 反转服务器的处理逻辑类.
 *
 * @author liweidan
 * @version 1.0
 * @date 2019-07-17
 * @email toweidan@126.com
 */
/** 标记该 Channel 是线程安全的. */
@ChannelHandler.Sharable
public class StringReverseHandler extends ChannelInboundHandlerAdapter {

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf in = (ByteBuf) msg;
    String message = in.toString(Charset.defaultCharset());
    System.out.println("Server Receive Message: " + message);

    String newMsg = "";
    String[] strings = message.split("");
    for (int i = strings.length - 1; i > 0; i--) {
      newMsg += strings[i];
    }

    /** 使用工具类构建 ByteBuf 对象写出去 */
    ByteBuf out = Unpooled.copiedBuffer(newMsg.getBytes());
    ctx.writeAndFlush(out);
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    /** 当客户端读取完毕的时候,关闭客户端 */
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
            .addListener(ChannelFutureListener.CLOSE);
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    /** 可以覆写,实现出现异常的时候执行的逻辑 */
    cause.printStackTrace();
    ctx.close();
  }

}

接下来需要编写服务的引导类,这个引导类主要实现两大功能:

  1. 绑定哪个端口;
  2. 绑定上面写的 Handler 实现业务处理.
package cn.liweidan.nettydemo.demo01.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 服务器引导类.
 * 主要实现:
 *  1. 绑定哪个端口;
 *  2. 绑定自己写的 Handler 以便执行业务.
 *
 * @author liweidan
 * @version 1.0
 * @date 2019-07-17
 * @email toweidan@126.com
 */
public class ServerLaunch {

  public static void main(String[] args) throws InterruptedException {
    /** 创建 EventLoopGroup */
    EventLoopGroup group = new NioEventLoopGroup();
    final StringReverseHandler handler = new StringReverseHandler();

    try {
      ServerBootstrap sb = new ServerBootstrap();
      sb.group(group)
              /** 指定所使用的的 NIO 传输的 Channel */
              .channel(NioServerSocketChannel.class)
              /** 绑定服务器端口 */
              .localAddress(8888)
              /** 添加我们自己的业务处理 Handler 到子级的 Channel 的 ChannelPipeline中  */
              .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                  /** 添加我们自己的 Handler 实现,因为线程安全的,所以只使用一个实例 */
                  socketChannel.pipeline()
                          .addLast(handler);
                }
              });
      /** 异步绑定服务器,阻塞到直到绑定完成 */
      ChannelFuture future = sb.bind().sync();
      /** 获取 Channel 的 CloseFuture 阻塞到关闭完成 */
      future.channel().closeFuture().sync();
    } finally {
      /** 关闭 EventLoopGroup 释放资源 */
      group.shutdownGracefully().sync();
    }
  }

}

至此服务端任务就完成了,这时候只要启动服务端,等待客户端的介入即可处理业务。

二.Netty客户端

同上,所有的 Netty客户端 基本也是跟服务端差不多的事情:

  1. 连接服务端;
  2. 发送消息;
  3. 获取服务端处理的结果;
  4. 关闭连接.

同服务端处理一致,客户端也拥有一个 ChannelInboundHandler 来处理我们需要请求的业务。我们暂时可以使用 SimpleChannelInboundHandler 来执行我们必须的任务。

package cn.liweidan.nettydemo.demo01.client;

import com.sun.deploy.net.CrossDomainXML;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;

/**
 * 客户端请求处理器.
 *
 * @author liweidan
 * @version 1.0
 * @date 2019-07-17
 * @email toweidan@126.com
 */
@ChannelHandler.Sharable
public class RequestHandler extends SimpleChannelInboundHandler<ByteBuf> {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    /** Channel 建立连接完成后,执行的业务,发送一个 HelloWorld 的编码并使用 ByteBuf 包装 */
    ctx.writeAndFlush(
            Unpooled.copiedBuffer(
                    "HelloWorld".getBytes(Charset.defaultCharset().name())));
  }

  @Override
  protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
    /** 服务端发送消息后执行的逻辑,直接打印 */
    System.out.println("Client Receive Message: " + byteBuf.toString(Charset.defaultCharset()));
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    /** 发生异常时关闭 Channel */
    cause.printStackTrace();
    ctx.close();
  }
}

接下来我们需要实现客户端的启动器,除了客户端需要使用 OIO 传输以外,其他需要做的事情基本是一致的。

三.运行服务端和客户端

《Netty组件以及入门体验》

OK,我们分别启动 服务端客户端,可见 客户端 在连接完成的时候,像 服务端 发送了 HelloWorld,服务端处理完成后,客户端即接收到 Client Receive Message: dlroWolleH

三.Netty组件

OK,硬着头皮写到这里,项目也运行还算正常,感觉还不错。那么接下来就需要来了解一下各个组件了。

3.1 Netty主要组件

组件的顺序是从业务处理器,再到软件启动引导:

  1. ChannelHandler
  2. EventLoopGroup
  3. Channel
  4. ServerBootstrap 服务端启动类,而客户端使用的是 Bootstrap
  5. ChannelInitializer 主要用来初始化注册安装 ChannelHandler
  6. ChannelPipeline 存放 ChannelHandler 的链表容器

而下面的顺序则没有按照上面的顺序,因为我想从里面了解到外面,里面相对看起来比较简单。

3.2 ChannelHandler和ChannelPipeline

3.2.1 ChannelHandler

从上面的例子上可以看到,我们在服务端使用了继承 ChannelHandler 的方式去做业务逻辑,其实这块一般也是业务的重要地方,需要做什么处理,然后写出什么数据,跟 Controller 的作用相同。

在上面的服务端例子中,业务处理通过继承 ChannelInboundHandlerAdapter(是一个 ChannelHandler 的子类,下面说) 的方式来处理,它的作用是:

  1. 接收入站事件和数据;
  2. 处理完以后,冲刷数据到客户端;
  3. 可以关闭连接的方式来结束客户端的连接。

通常来说,一个项目会有多个 ChannelInboundHandler 在运行着,处理着业务数据。

3.2.1 ChannelPipeline

在服务端和客户端都可以看到 socketChannel.pipeline().addLast(new RequestHandler()) 这段代码,那么根据编码经验来说,他应该是个容器。

没错,他还真的是一个容器,一个链表容器,里面装着一个一个的 ChannelHandler

具体过程是:

  1. 启动的时候定义 ChannelInitializer,他将在 Bootstrap 或者 ServerBootstrap 启动的时候进行初始化操作;
  2. initChannel 被调用的时候,我们即可安装我们自己的 ChannelHandler 实现,来处理数据传输;
  3. ChannelInitializer 将自己从 ChannelPipeline 中移除。

ChannelHandler 以及子类:

《Netty组件以及入门体验》

《Netty组件以及入门体验》

数据入站的时候,将按照安装的顺序,依次执行 ChannelInboundHandler 中的逻辑,其实说到底就是处理链吧,当数据到达 Pipeline 尾端的时候,表示数据处理已经结束。

数据的出站运动(正在被写的数据)则是从 Pipeline 末端开始执行,与 ChannelInboundHandler 执行顺序相反的情况下依次处理。

Netty 中 提供了 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 两个适配类,其实这两个类就是已经解决了简单顺序传值的问题,Netty 会简单的帮你按照上面的顺序执行 ChannelHandler 我们只需要覆写与业务相关的处理即可。所以如果我们只是想简单的传递的话可以直接使用这两个类。

在覆写我们感兴趣的函数的时候,通常都可以看到有一个 ChannelHandlerContext 而且示例中也是使用他来写出消息的,除了这种方法写出消息,还有另外一种方法就是使用 Channel 写出(调用:ctx.channel().writeAndFlush())。前者写出会将消息写到下一个 ChannelHandler 而后者则是让消息从上图中的 ChannelPipeline 末端开始走(与上面区别就是跳过下一个 ChannelInboundHandler

3.3 Channel和EventLoop

3.3.1 Channel

Java NIO

说到 Channel 就要说到 Java NIO ,说到 NIO 就要说到 SelectorSocket

说到 NIO 就要先说说这个有趣的名字~

NIO 刚开始我感觉就是 New IO,可是这么多年过去了,再叫 New IO 就有点不合适了。

所以现在大多数人认为应该叫 Non-blocking IO,而阻塞IO则是 block IO 或者 old IO (BIO/OIO)

其实聊到 NIO 就应该是,传统的 IO 如果同时执行同一个业务的话,而且想要多人都可以同时并行处理的话,那么就需要开启多个线程来同时执行。

《Netty组件以及入门体验》

那么每一个新的客户端进来,我就需要预留一个线程来处理,线程中 BIO 在读取文件或者其他 IO 输入的时候,需要阻塞进入等待,这都算是一种资源浪费(CPU还需要切换线程去查看哪个线程已经阻塞完成了)。据我们所知,一个线程占用栈空间 64k1m,理论线程越多每个线程拿到的栈空间就更少了。这时候线程他就在那里等待了什么事情都不做,然后还占用了系统上一个线程的位置(系统限制可开启线程数)。如果小数量的线程数(用户数)那么勉强还是撑得过去的,而且工作的也还不错。那么如果上万个用户上十万个用户呢,这时候,CPU 需要浪费很大的力气来切换轮询。

于是乎这时候,NIO 横空出世(其实系统早就支持了,在 jdk1.4 之前都没有支持)

NIO 有个很牛逼的管理员 Selector,他的任务就是提交 IO 任务并且告诉系统,他做完了告诉我,我会执行下一步操作。于是乎模型就编程这样:

《Netty组件以及入门体验》

这个模型只要一个线程就够了,他找 Selector 要已经完成 IO 操作的名单,然后放到自己的线程开始执行我们的业务逻辑,如果没有 IO 那么这个线程还可以去做其他的事情。

Netty中的Channel

Netty 中,一个 Channel 代表一个实体(硬件设备,文件,Socket,能够执行一个或不同 IO 操作的程序组件)的连接。这里可以套用 Linux 中万物皆文件的理念,只要是一个物,他就有输入输出,那么她就是 Channel

Channel 中我们实现了他的一些方法如 channelRead channelReadComplete,其实这些是回调事件,我们也可以称实现这些动作是实现回调事件。那么啥是回调事件:

某件事情执行时间很长,你让他执行完告诉你你去接收他的参数并且接下去做。

比如洗衣服,你扔进洗衣机,洗衣机一般要洗1个小时,洗完了发出滴滴滴的声音。这就是回调了,在这1个小时里面你这个线程就可以去做其他事情,他滴滴滴响了你拿到了结果(衣服洗完了)再去执行一个函数:晾衣服。

如果你熟悉 JavaScript 那么这一切都很自然,异步请求 Promise 类,Promise.then((result) => {...}) 里面的 function 她就是回调函数。

jdk8 中提供了很好的回调事件方式的线程类 CompleteFuture 就是用来做这个事情的,你可以使用这个类来体验一下回调事件的感受。(参考文章:jdk8 多线程处理的使用

Netty 时代 jdk 还没有到 j8 呀,只提供了 CompleteFuture 的爸爸 Future,那怎么办嘛,Netty 就自己写提个,这就是 ChannelFuture 的出现了。ChannelFuture 也提供了可以自定义的 ChannelFutureListener 来拓展,可以说比 jdk8CompleteFuture 还厉害,可以监听连接完成时做什么(比如检查连接是否正常,远程服务是否能够正确返回信息)。只需要在引导代码里面,使用 ChannelFuture.addListener 即可添加相对应的逻辑。这么说的话,那么 ChannelFutureListener 就是 Future 生命周期中执行的钩子函数。

完结

点赞