一.数据传输的容器ByteBuf
1.1. 简单理解一下
OK,上一篇文章我们大致了解了 Netty
在运行过程中所需要的一些组件。接下来需要慢慢的深入了解这些容器了。 为了方便回忆,我先贴一段上一篇的代码:
1 2 3 4
| ... ByteBuf in = (ByteBuf) msg; String message = in.toString(Charset.defaultCharset()); ...
|
我们可以看到,在服务端代码中,使用了 ByteBuf
来读取客户端所传递的消息,然后实现逻辑,再使用 ByteBuf out = Unpooled.copiedBuffer(newMsg.getBytes(Charset.defaultCharset().name()));
来将处理后的数据重新封装成字节,从而写出去,传递给客户端。 所以大概猜一下,ByteBuf
是一个装载着数据字节的容器,在 Netty
中通过网络进行传输。客户端又重新解码,读取出服务端返回的数据。 其实,jdk
自己的 NIO
也有个类似的类 ByteBuffer
,但是这个类,他不太灵活,所以 Netty
才决定重写这个类,从而达到一些比较灵活的目的:
- 可以被自定义缓冲区类型拓展;
- 通过内置的符合缓冲区类型实现透明的零拷贝;
- 容量可以自增;
- 读写模式不需要来回切换(得益于读写指针);
- 支持链式调用、引用计数以及池化计数。
1.2. 深入读写指针
上一节说了,读写模式不需要来回切换,是因为 ByteBuf
内部提供了两个索引 readIndex
以及 writeIndex
。当我们从 ByteBuf
读取数据的时候,readIndex
会慢慢的递增已经被读取的字节数,而写入时 writeIndex
同样也会进行移动。
如上图所示,一个16字节的 ByteBuf
,刚开始什么都没有的时候,读索引和写索引同在第 0
位上,随着数据慢慢写入,写索引会向右进行移动。这时候没有读取的发生,所以读索引还停留在第 0
位上,而随着我们业务的需求,会读取消息,所以读指针慢慢向后移动,但是这里有个需要注意的地方是,读索引不能超过写索引的位数(即使超过了读后面的消息也没什么意义),如果强行超过,Netty
会给你来一个 IndexOutOfBoundsException
。 ByteBuf
自带有一些方法,通过调用 read
和 write
开头的方法,将会推进这两个相对应的索引位置,而如果说我们不想要推动索引而是想直接读取,则可以通过调用 get
set
开头的方法,便可以直接操作 ByteBuf
中相对应位置的数据。 可以指定 ByteBuf
的最大容量,如果不指定默认是 Integer.MAX_VALUE
。
1.3. 不同内存下的 ByteBuf
1.1.3.1 堆缓冲区的ByteBuf
最常用的模式下是堆缓冲区的 ByteBuf
,顾名思义堆缓冲区 ByteBuf
是用于存储在 JVM
堆内存中的缓冲区。这种模式称为 支撑数组
。他可以在 Java
程序中快速的创建以及被垃圾回收器回收,但是,如果需要写出到 IO设备
则需要经过以下这么几个步骤。
上图所示,Netty
需要先将堆上的数据逐一拷贝到系统直接缓冲区,然后再发送出去。会造成多了一步拷贝的过程。 但是,Java
可以直接操作堆缓冲区的数据呀:
1 2 3
| if (heapBuf.hasArray()) { byte[] arrays = heapBuf.array(); }
|
1.1.3.2 直接缓冲区
JDK 1.4
以后官方提供了直接向系统申请内存的方法,申请后的内存也不在垃圾回收器清理范围以内,所以当我们申请了直接内存缓冲区的时候,都需要进行手动释放,否则将会造成系统内存溢出。 直接缓冲区的优点刚好是堆缓冲区最不擅长的点,可以直接调用本地 IO
设备,不需要通过拷贝从而将数据传输出去。 但是另外的缺点也有:相比堆缓冲区,如果程序需要读取操作缓冲区的数据的时候,则需要跟以上第一步逆相反的步骤,将直接内存缓冲区的数据拷贝到堆缓冲区才可以进行操作。而且向系统申请和释放内存也会造成性能的降低。
1.1.3.3 复合缓冲区
jdk
完全没有的一个功能,可以提供消息体复用的优势。 比如 HTTP
协议传输消息的时候,我们知道 HTTP
头部很多时候是相似或者说相同的,那么头部就可以存储在直接内存中,使用复合缓冲区 CompositeByteBuf
来聚合直接缓冲区中的头部信息以及堆内存中的消息体,然后进行写出。
因为可能包含直接内存分配和非直接内存分配,如果只存在一个聚合ByteBuf元素,那么调用 hasArray()
将直接返回这个元素的结果,否则会返回 false
创建复合缓冲区:
1 2 3 4
| CompositeByteBuf compBuf = Unpooled.compositeBuffer(); ByteBuf head = ...; ByteBuf body = ...; compBuf.addComponents(head, body);
|
访问缓冲区数组:
1 2 3 4
| CompositeByteBuf compBuf = Unpooled.compositeBuffer(); int length = compBuf.readableBytes(); byte[] arr = new byte[length]; compBuf.getBytes(compBuf.readerIndex(), arr);
|
1.1.3.4 池化缓冲区
一般来说,我们需要池化缓冲区,达到可以复用的效果,也可以减少计算机资源的开销,所以 Netty
提供了 ByteBufAllocator
来实现缓冲区的池化效果。 我们可以通过 ByteBufAllocator.DEFAULT
来获取 ByteBufAllocator
对象,从而调用以下方法,建立我们所需要的缓冲区:
方法名称
说明
buffer()
buffer(int initialCapacity)
buffer(int initialCapacity, int maxCapacity)
返回基于堆或者直接内存的 ByteBuf
heapBuffer()
heapBuffer(int initialCapacity)
heapBuffer(int initialCapacity, int maxCapacity)
返回基于堆缓冲区的 ByteBuf
directBuffer()
directBuffer(int initialCapacity)
directBuffer(int initialCapacity, int maxCapacity)
返回基于直接内存缓冲区的 ByteBuf
compositeBuffer()
compositeBuffer(int maxNumComponents)
compositeDirectBuffer()
compositeDirectBuffer(int maxNumComponents)
compositeHeapBuffer()
compositeHeapBuffer(int maxNumComponents)
返回指定最大元素的基于堆或者直接内存的
缓冲区视图
ioBuffer()
返回基于 Socket
的 IO
操作的 ByteBuf
运行以下代码:
1 2 3 4 5 6
| List<ByteBuf> byteBufs = new ArrayList<ByteBuf>(); for (int i = 0; i < 10000000; i++) { ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.directBuffer(200); byteBuf2.writeBytes("HelloWorld".getBytes(Charset.defaultCharset().name())); byteBufs.add(byteBuf2); }
|
可以看到,内存一直不断飙升,直到系统拒绝给出内存,抛出异常,程序终止,进程被干掉才结束。
1.1.3.5 非池化缓冲区
即我第一篇中用到的 Unpooled
类,api
跟上面差不多,不再重复。 主要提供给其他不需要使用 Netty
的项目使用。
1.1.3.6 ByteBufUtil
主要提供两个方法使用: hexdump()
: 主要用于将缓冲区内容写入日志,易于调试,也可以还原成字节数组。 equals(ByteBuf, ByteBuf)
: 传递两个 ByteBuf
用于比较相等性。
1.1.3.7 引用计数(TODO)
这块放在后面 ChannelPipeline
再说。
二.解码和编码
2.1 啰嗦一下什么是编解码器
本来应该不需要这个的,但是还是为了篇幅的完整性还是啰嗦一下。 我们知道在传递消息的时候,我们的 Java
对象是不能够实现网络传输的,必须将对象序列化成某种格式(Byte数组),然后网卡再编码成 10101…
传输给另外一台服务器,另外的一台服务器再从 10101...
去重新解码,解成我们所序列化后的数组,然后传递到我们程序再使用我们自己的规则去重新把对象信息还原回来(当然此时客户端的对象的元信息跟服务器端的没有半毛钱关系) 大白话说就是,通过某种规则,在客户端机器上创建一个数据一毛一样的对象。 我们网络开发常见的编解码器有哪些比较耳濡目染的,大概就是 JSON
了吧,可读性强,兼容性棒(各个语言都支持),都 9012
年了就不要来一句 XML
了吧,如果还说 XML
我立马把43码的鞋子pia到你脸上去。 然而,除了 JSON
格式,如果不需要考虑兼容性最强的话,我们也可以使用同行语言都懂的 Byte
数组进行传输,总的来说,使用 Byte
数组可以达到效率更高(编码和解码),传输容量更小,传输速度更快的目的。因为 JSON
格式毕竟都是字符串,传输容量还是属于比较大的,而且频繁操作 String
编码和解码的效率也更低。
2.2 Netty自带的编解码器
- ByteToMessageDecoder 和 ReplayingDecoder;
- MessageToMessageDecoder;
2.2.1 ByteToMessageDecoder
ByteToMessageDecoder
是一个抽象的基类,通过我们做解码的方式去拓展。 我们可以编写自己的解码类,继承 ByteToMessageDecoder
类,需要编写以下的编码方法:
1 2
| protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception;
|
比如,我们需要读取多个 int
值,我们也知道一个 int
值的长度是 4
,所以我们在读取的时候,就需要判断 ByteBuf
的可读长度是否达到了 4
,才开始读取。
1 2 3 4 5 6 7 8 9 10
| public class ToIntegerDecoderHandler extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception { if (byteBuf.readableBytes() > 4) { list.add(byteBuf.readInt()); } }
}
|
传递的参数中,List<Object> list
是用来保存解码信息的,当我们解码一个对象的时候,信息将保存在这里,Netty
将会为我们把这个 list
传递给 ChannelPipeline
的下一个 ChannelInboundHandler
处理器中。 当然,ByteToMessageDecoder
还提供了一个 decodeLast
方法,用来当 Channel
变成非活动状态的时候,调用最后一次解码。 来个例子: 现在有个需求,要求客户端发送 RandomNum
给服务端,服务端返回 16
个 int
类型的随机数,客户端接收并打印:
服务端Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package cn.liweidan.nettydemo.demo02.server;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset; import java.util.Random;
@ChannelHandler.Sharable public class IntegerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; String message = in.toString(Charset.defaultCharset()); System.out.println(message); if (message.equals("RandomNumber")) { ByteBuf byteBuf = ctx.channel().alloc().heapBuffer(); for (int i = 0; i < 16; i++) { byteBuf.writeInt(new Random().nextInt() * 10000); } ctx.writeAndFlush(byteBuf); } }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }
}
|
服务端引导代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package cn.liweidan.nettydemo.demo02.server;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
public class ServerLaunch {
public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); final IntegerHandler handler = new IntegerHandler();
try { ServerBootstrap sb = new ServerBootstrap(); sb.group(group) .channel(NioServerSocketChannel.class) .localAddress(8888) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(handler); } }); ChannelFuture future = sb.bind().sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } }
}
|
客户端解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package cn.liweidan.nettydemo.demo02.client;
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class ToIntegerDecoder extends ByteToMessageDecoder { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() >= 4) { out.add(in.readInt()); } ctx.pipeline().fireChannelRead(out); }
@Override protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
} }
|
客户端处理Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package cn.liweidan.nettydemo.demo02.client;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset; import java.util.List;
@ChannelHandler.Sharable public class RequestHandler extends SimpleChannelInboundHandler<List<Object>> {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush( Unpooled.copiedBuffer( "RandomNumber".getBytes(Charset.defaultCharset().name()))); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }
protected void channelRead0(ChannelHandlerContext ctx, List<Object> out) throws Exception { System.out.println(out); }
}
|
客户端引导代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package cn.liweidan.nettydemo.demo02.client;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class ClientLaunch {
public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("127.0.0.1", 8888)) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ToIntegerDecoder(), new RequestHandler()); } }); ChannelFuture future = b.connect().sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } }
}
|
运行效果
2.2.2 ReplayingDecoder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package cn.liweidan.nettydemo.demo02.client;
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class ToIntegerDecoder extends ReplayingDecoder<Void> { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(in.readInt()); ctx.pipeline().fireChannelRead(out); }
@Override protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
} }
|
ReplayingDecoder
有个泛型,表示需要处理的状态类型,使用 Void
表示没有状态需要处理。 与之前不同的是,每读取一次就会调用一次后面的处理器,因为每次解码都会发送。 需要注意 ReplayingDecoder
并不是支持所有的 ByteBuf
操作,如果调用不支持的方法,将会抛出异常。而且效率较上面的解码器比较低下。
2.2.3 MessageToMessageDecoder
这个的作用是将一种消息的格式转换为另外一种消息的格式。 不多说上代码就好了:
1 2 3 4 5 6 7 8
| public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> { protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add("String:" + String.valueOf(msg)); ctx.pipeline().fireChannelRead(out); } }
|
这是一个异常类,用于让我们自定义抛出异常的,主要作用是为了保护 Netty
程序的内存不至于被过大的消息体耗尽,所以我们可以定义一个 Decoder
,用于判断消息体是否超出我们的需求,如果超出可以直接抛出异常,终止调用链的调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class RejectTooLongDecoder extends ByteToMessageDecoder {
private static final int ALLOW_LENGTH = 1024;
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int readableBytes = in.readableBytes(); if (readableBytes > ALLOW_LENGTH) { in.skipBytes(readableBytes); throw new TooLongFrameException("Bytes Too Long!"); } }
}
|
2.2.5 MessageToByteEncoder/MessageToMessageEncoder
其实与解码器相对应,方法参数差不多,偷懒不打算写了。
2.2.6 编解码一体
项目上,我们一般都会把编解码这种粗活交给一个 Maven
模块来做,当然这样子的话就需要在客户端服务端重复安装编解码器了。 聚合在一起的一种方式是使用 ByteToMessageCodec
,通过集成他重写编码和解码两个方法达到重用。 还有另外一种方法是通过继承 CombinedChannelDuplexHandler
,在泛型中指定编解码的类来实现。 当然这两种方式看个人喜好使用,我的话偏向于后者:
1 2 3 4 5 6 7
| public class CombinedIntegerCodec extends CombinedChannelDuplexHandler<IntegerDecoder, IntegerEncoder> {
public CombinedIntegerCodec() { super(new IntegerDecoder(), new IntegerEncoder()) }
}
|
未完待续……