Netty之ByteBuf以及编解码器

一.数据传输的容器ByteBuf

1.1. 简单理解一下

OK,上一篇文章我们大致了解了 Netty 在运行过程中所需要的一些组件。接下来需要慢慢的深入了解这些容器了。

为了方便回忆,我先贴一段上一篇的代码:

...
ByteBuf in = (ByteBuf) msg;
String message = in.toString(Charset.defaultCharset());
...

我们可以看到,在服务端代码中,使用了 ByteBuf 来读取客户端所传递的消息,然后实现逻辑,再使用 ByteBuf out = Unpooled.copiedBuffer(newMsg.getBytes(Charset.defaultCharset().name())); 来将处理后的数据重新封装成字节,从而写出去,传递给客户端。

所以大概猜一下,ByteBuf 是一个装载着数据字节的容器,在 Netty 中通过网络进行传输。客户端又重新解码,读取出服务端返回的数据。

其实,jdk 自己的 NIO 也有个类似的类 ByteBuffer,但是这个类,他不太灵活,所以 Netty 才决定重写这个类,从而达到一些比较灵活的目的:

  1. 可以被自定义缓冲区类型拓展;
  2. 通过内置的符合缓冲区类型实现透明的零拷贝;
  3. 容量可以自增;
  4. 读写模式不需要来回切换(得益于读写指针);
  5. 支持链式调用、引用计数以及池化计数。

1.2. 深入读写指针

上一节说了,读写模式不需要来回切换,是因为 ByteBuf 内部提供了两个索引 readIndex 以及 writeIndex。当我们从 ByteBuf 读取数据的时候,readIndex 会慢慢的递增已经被读取的字节数,而写入时 writeIndex 同样也会进行移动。

《Netty之ByteBuf以及编解码器》

如上图所示,一个16字节的 ByteBuf,刚开始什么都没有的时候,读索引和写索引同在第 0 位上,随着数据慢慢写入,写索引会向右进行移动。这时候没有读取的发生,所以读索引还停留在第 0 位上,而随着我们业务的需求,会读取消息,所以读指针慢慢向后移动,但是这里有个需要注意的地方是,读索引不能超过写索引的位数(即使超过了读后面的消息也没什么意义),如果强行超过,Netty 会给你来一个 IndexOutOfBoundsException

ByteBuf 自带有一些方法,通过调用 readwrite 开头的方法,将会推进这两个相对应的索引位置,而如果说我们不想要推动索引而是想直接读取,则可以通过调用 get set 开头的方法,便可以直接操作 ByteBuf 中相对应位置的数据。

可以指定 ByteBuf 的最大容量,如果不指定默认是 Integer.MAX_VALUE

1.3. 不同内存下的 ByteBuf

1.1.3.1 堆缓冲区的ByteBuf

最常用的模式下是堆缓冲区的 ByteBuf,顾名思义堆缓冲区 ByteBuf 是用于存储在 JVM 堆内存中的缓冲区。这种模式称为 支撑数组。他可以在 Java 程序中快速的创建以及被垃圾回收器回收,但是,如果需要写出到 IO设备 则需要经过以下这么几个步骤。

《Netty之ByteBuf以及编解码器》

上图所示,Netty 需要先将堆上的数据逐一拷贝到系统直接缓冲区,然后再发送出去。会造成多了一步拷贝的过程。

但是,Java 可以直接操作堆缓冲区的数据呀:

if (heapBuf.hasArray()) {
    byte[] arrays = heapBuf.array();
}

1.1.3.2 直接缓冲区

JDK 1.4 以后官方提供了直接向系统申请内存的方法,申请后的内存也不在垃圾回收器清理范围以内,所以当我们申请了直接内存缓冲区的时候,都需要进行手动释放,否则将会造成系统内存溢出。

直接缓冲区的优点刚好是堆缓冲区最不擅长的点,可以直接调用本地 IO 设备,不需要通过拷贝从而将数据传输出去。

但是另外的缺点也有:相比堆缓冲区,如果程序需要读取操作缓冲区的数据的时候,则需要跟以上第一步逆相反的步骤,将直接内存缓冲区的数据拷贝到堆缓冲区才可以进行操作。而且向系统申请和释放内存也会造成性能的降低。

1.1.3.3 复合缓冲区

jdk 完全没有的一个功能,可以提供消息体复用的优势。

比如 HTTP 协议传输消息的时候,我们知道 HTTP 头部很多时候是相似或者说相同的,那么头部就可以存储在直接内存中,使用复合缓冲区 CompositeByteBuf 来聚合直接缓冲区中的头部信息以及堆内存中的消息体,然后进行写出。

因为可能包含直接内存分配和非直接内存分配,如果只存在一个聚合ByteBuf元素,那么调用 hasArray() 将直接返回这个元素的结果,否则会返回 false

创建复合缓冲区:

CompositeByteBuf compBuf = Unpooled.compositeBuffer();
ByteBuf head = ...;
ByteBuf body = ...;
compBuf.addComponents(head, body);

访问缓冲区数组:

CompositeByteBuf compBuf = Unpooled.compositeBuffer();
int length = compBuf.readableBytes();
byte[] arr = new byte[length];
compBuf.getBytes(compBuf.readerIndex(), arr);// 读取数据到数组中

1.1.3.4 池化缓冲区

一般来说,我们需要池化缓冲区,达到可以复用的效果,也可以减少计算机资源的开销,所以 Netty 提供了 ByteBufAllocator 来实现缓冲区的池化效果。

我们可以通过 ByteBufAllocator.DEFAULT 来获取 ByteBufAllocator 对象,从而调用以下方法,建立我们所需要的缓冲区:

方法名称 说明
buffer()
buffer(int initialCapacity)
buffer(int initialCapacity, int maxCapacity)
返回基于堆或者直接内存的 ByteBuf
heapBuffer()
heapBuffer(int initialCapacity)
heapBuffer(int initialCapacity, int maxCapacity)
返回基于堆缓冲区的 ByteBuf
directBuffer()
directBuffer(int initialCapacity)
directBuffer(int initialCapacity, int maxCapacity)
返回基于直接内存缓冲区的 ByteBuf
compositeBuffer()
compositeBuffer(int maxNumComponents)
compositeDirectBuffer()
compositeDirectBuffer(int maxNumComponents)
compositeHeapBuffer()
compositeHeapBuffer(int maxNumComponents)
返回指定最大元素的基于堆或者直接内存的
缓冲区视图
ioBuffer() 返回基于 SocketIO 操作的 ByteBuf

运行以下代码:

List<ByteBuf> byteBufs = new ArrayList<ByteBuf>();
for (int i = 0; i < 10000000; i++) {
  ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.directBuffer(200);
  byteBuf2.writeBytes("HelloWorld".getBytes(Charset.defaultCharset().name()));
  byteBufs.add(byteBuf2);
}

《Netty之ByteBuf以及编解码器》

可以看到,内存一直不断飙升,直到系统拒绝给出内存,抛出异常,程序终止,进程被干掉才结束。

1.1.3.5 非池化缓冲区

即我第一篇中用到的 Unpooled 类,api 跟上面差不多,不再重复。

主要提供给其他不需要使用 Netty 的项目使用。

1.1.3.6 ByteBufUtil

主要提供两个方法使用:

hexdump() : 主要用于将缓冲区内容写入日志,易于调试,也可以还原成字节数组。

equals(ByteBuf, ByteBuf) : 传递两个 ByteBuf 用于比较相等性。

1.1.3.7 引用计数(TODO)

这块放在后面 ChannelPipeline 再说。

二.解码和编码

2.1 啰嗦一下什么是编解码器

本来应该不需要这个的,但是还是为了篇幅的完整性还是啰嗦一下。

我们知道在传递消息的时候,我们的 Java 对象是不能够实现网络传输的,必须将对象序列化成某种格式(Byte数组),然后网卡再编码成 10101… 传输给另外一台服务器,另外的一台服务器再从 10101... 去重新解码,解成我们所序列化后的数组,然后传递到我们程序再使用我们自己的规则去重新把对象信息还原回来(当然此时客户端的对象的元信息跟服务器端的没有半毛钱关系)

大白话说就是,通过某种规则,在客户端机器上创建一个数据一毛一样的对象。

我们网络开发常见的编解码器有哪些比较耳濡目染的,大概就是 JSON 了吧,可读性强,兼容性棒(各个语言都支持),都 9012 年了就不要来一句 XML 了吧,如果还说 XML 我立马把43码的鞋子pia到你脸上去。

然而,除了 JSON 格式,如果不需要考虑兼容性最强的话,我们也可以使用同行语言都懂的 Byte 数组进行传输,总的来说,使用 Byte 数组可以达到效率更高(编码和解码),传输容量更小,传输速度更快的目的。因为 JSON 格式毕竟都是字符串,传输容量还是属于比较大的,而且频繁操作 String 编码和解码的效率也更低。

2.2 Netty自带的编解码器

  1. ByteToMessageDecoder 和 ReplayingDecoder;
  2. MessageToMessageDecoder;

2.2.1 ByteToMessageDecoder

ByteToMessageDecoder 是一个抽象的基类,通过我们做解码的方式去拓展。

我们可以编写自己的解码类,继承 ByteToMessageDecoder 类,需要编写以下的编码方法:

protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list)
          throws Exception;

比如,我们需要读取多个 int 值,我们也知道一个 int 值的长度是 4,所以我们在读取的时候,就需要判断 ByteBuf 的可读长度是否达到了 4 ,才开始读取。

public class ToIntegerDecoderHandler extends ByteToMessageDecoder {

  protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list)
          throws Exception {
    if (byteBuf.readableBytes() > 4) {
      list.add(byteBuf.readInt());
    }
  }

}

传递的参数中,List<Object> list 是用来保存解码信息的,当我们解码一个对象的时候,信息将保存在这里,Netty 将会为我们把这个 list 传递给 ChannelPipeline 的下一个 ChannelInboundHandler 处理器中。

当然,ByteToMessageDecoder 还提供了一个 decodeLast 方法,用来当 Channel 变成非活动状态的时候,调用最后一次解码。

来个例子:

现在有个需求,要求客户端发送 RandomNum 给服务端,服务端返回 16int 类型的随机数,客户端接收并打印:

服务端Handler
package cn.liweidan.nettydemo.demo02.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.Charset;
import java.util.Random;

/**
 * 随机数处理器.
 *
 * @author liweidan
 * @version 1.0
 * @date 2019-07-20
 * @email toweidan@126.com
 */

/** 标记该 Channel 是线程安全的. */
@ChannelHandler.Sharable
public class IntegerHandler extends ChannelInboundHandlerAdapter {

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf in = (ByteBuf) msg;
    String message = in.toString(Charset.defaultCharset());
    System.out.println(message);
    if (message.equals("RandomNumber")) {
      ByteBuf byteBuf = ctx.channel().alloc().heapBuffer();
      for (int i = 0; i < 16; i++) {
        byteBuf.writeInt(new Random().nextInt() * 10000);
      }
      ctx.writeAndFlush(byteBuf);
    }
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    /** 当客户端读取完毕的时候,关闭客户端 */
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
            .addListener(ChannelFutureListener.CLOSE);
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    /** 可以覆写,实现出现异常的时候执行的逻辑 */
    cause.printStackTrace();
    ctx.close();
  }

}
服务端引导代码
package cn.liweidan.nettydemo.demo02.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 服务器引导类.
 * 主要实现:
 *  1. 绑定哪个端口;
 *  2. 绑定自己写的 Handler 以便执行业务.
 *
 * @author liweidan
 * @version 1.0
 * @date 2019-07-17
 * @email toweidan@126.com
 */
public class ServerLaunch {

  public static void main(String[] args) throws InterruptedException {
    /** 创建 EventLoopGroup */
    EventLoopGroup group = new NioEventLoopGroup();
    final IntegerHandler handler = new IntegerHandler();

    try {
      ServerBootstrap sb = new ServerBootstrap();
      sb.group(group)
              /** 指定所使用的的 NIO 传输的 Channel */
              .channel(NioServerSocketChannel.class)
              /** 绑定服务器端口 */
              .localAddress(8888)
              /** 添加我们自己的业务处理 Handler 到子级的 Channel 的 ChannelPipeline中  */
              .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                  /** 添加我们自己的 Handler 实现,因为线程安全的,所以只使用一个实例 */
                  socketChannel.pipeline()
                          .addLast(handler);
                }
              });
      /** 异步绑定服务器,阻塞到直到绑定完成 */
      ChannelFuture future = sb.bind().sync();
      /** 获取 Channel 的 CloseFuture 阻塞到关闭完成 */
      future.channel().closeFuture().sync();
    } finally {
      /** 关闭 EventLoopGroup 释放资源 */
      group.shutdownGracefully().sync();
    }
  }

}
客户端解码器
package cn.liweidan.nettydemo.demo02.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * .
 *
 * @author liweidan
 * @version 1.0
 * @date 2019-07-20
 * @email toweidan@126.com
 */
public class ToIntegerDecoder extends ByteToMessageDecoder {
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    /** 一个 int 是 4 个字节 */
    while (in.readableBytes() >= 4) {
      out.add(in.readInt());
    }
    /** 触发下一个 handler 的读取操作 */
    ctx.pipeline().fireChannelRead(out);
  }

  @Override
  protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

  }
}
客户端处理Handler
package cn.liweidan.nettydemo.demo02.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;
import java.util.List;

/**
 * 客户端请求处理器.
 *
 * @author liweidan
 * @version 1.0
 * @date 2019-07-17
 * @email toweidan@126.com
 */
@ChannelHandler.Sharable
public class RequestHandler extends SimpleChannelInboundHandler<List<Object>> {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    /** Channel 建立连接完成后,执行的业务,发送一个 HelloWorld 的编码并使用 ByteBuf 包装 */
    ctx.writeAndFlush(
            Unpooled.copiedBuffer(
                    "RandomNumber".getBytes(Charset.defaultCharset().name())));
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    /** 发生异常时关闭 Channel */
    cause.printStackTrace();
    ctx.close();
  }

  protected void channelRead0(ChannelHandlerContext ctx, List<Object> out) throws Exception {
    System.out.println(out);
  }

}
客户端引导代码
package cn.liweidan.nettydemo.demo02.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * 客户端启动类.
 *
 * @author liweidan
 * @version 1.0
 * @date 2019-07-17
 * @email toweidan@126.com
 */
public class ClientLaunch {

  public static void main(String[] args) throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();

    try {
      Bootstrap b = new Bootstrap();
      b.group(group)
              .channel(NioSocketChannel.class)
              .remoteAddress(new InetSocketAddress("127.0.0.1", 8888))
              .handler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                  socketChannel.pipeline()
                          .addLast(new ToIntegerDecoder(), new RequestHandler());
                }
              });
      ChannelFuture future = b.connect().sync();
      future.channel().closeFuture().sync();
    } finally {
      group.shutdownGracefully().sync();
    }
  }

}
运行效果

《Netty之ByteBuf以及编解码器》

2.2.2 ReplayingDecoder

package cn.liweidan.nettydemo.demo02.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

/**
 *
 * @author liweidan
 * @version 1.0
 * @date 2019-07-20
 * @email toweidan@126.com
 */
public class ToIntegerDecoder extends ReplayingDecoder<Void> {
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    /** 触发下一个 handler 的读取操作 */
    out.add(in.readInt());
    ctx.pipeline().fireChannelRead(out);
  }

  @Override
  protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

  }
}

ReplayingDecoder 有个泛型,表示需要处理的状态类型,使用 Void 表示没有状态需要处理。

与之前不同的是,每读取一次就会调用一次后面的处理器,因为每次解码都会发送。

需要注意 ReplayingDecoder 并不是支持所有的 ByteBuf 操作,如果调用不支持的方法,将会抛出异常。而且效率较上面的解码器比较低下。

2.2.3 MessageToMessageDecoder

这个的作用是将一种消息的格式转换为另外一种消息的格式。

不多说上代码就好了:

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {// 泛型表示传入的类型
  protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out)
          throws Exception {
    out.add("String:" + String.valueOf(msg));
    /** 触发下一个 handler 的读取操作 */
    ctx.pipeline().fireChannelRead(out);
  }
}

2.2.4 TooLongFrameException

这是一个异常类,用于让我们自定义抛出异常的,主要作用是为了保护 Netty 程序的内存不至于被过大的消息体耗尽,所以我们可以定义一个 Decoder,用于判断消息体是否超出我们的需求,如果超出可以直接抛出异常,终止调用链的调用:

public class RejectTooLongDecoder extends ByteToMessageDecoder {

  private static final int ALLOW_LENGTH = 1024;

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    int readableBytes = in.readableBytes();
    if (readableBytes > ALLOW_LENGTH) {
        // 清空ByteBuf
      in.skipBytes(readableBytes);
      throw new TooLongFrameException("Bytes Too Long!");
    }
    // ...
  }

}

2.2.5 MessageToByteEncoder/MessageToMessageEncoder

其实与解码器相对应,方法参数差不多,偷懒不打算写了。

2.2.6 编解码一体

项目上,我们一般都会把编解码这种粗活交给一个 Maven 模块来做,当然这样子的话就需要在客户端服务端重复安装编解码器了。

聚合在一起的一种方式是使用 ByteToMessageCodec,通过集成他重写编码和解码两个方法达到重用。

还有另外一种方法是通过继承 CombinedChannelDuplexHandler ,在泛型中指定编解码的类来实现。

当然这两种方式看个人喜好使用,我的话偏向于后者:

public class CombinedIntegerCodec extends CombinedChannelDuplexHandler<IntegerDecoder, IntegerEncoder> {

  public CombinedIntegerCodec() {
    super(new IntegerDecoder(), new IntegerEncoder())
  }

}

未完待续……

点赞