传输自定义对象
netty中所有数据都是二进制传输的,要想传输自定义对象,需要自定义传输规则,也就是自定义编解码器。
让发消息端知道怎么去将自定义对象编码成什么样的二进制格式,消息接收端知道以什么方式解析二进制成对象。
客户端编码器实现
客户端在写入string类型属性时,在末尾加上一个空字符'\0',解码器在读取的时候以'\0'判断字符串是否结束。
public class MessageEncoder extends MessageToByteEncoder<MessageObj> {
/**
* 将对象编码成二进制
*/
protected void encode(ChannelHandlerContext ctx, MessageObj msg, ByteBuf out) throws Exception {
// 写入userId
out.writeInt(msg.getUserId());
// 在编码器加上'\0'来标记字符串结束
// 写入nickname
out.writeCharSequence(msg.getNickname(), CharsetUtil.UTF_8);
out.writeByte('\0');
// 写入sex
out.writeCharSequence(msg.getSex(), CharsetUtil.UTF_8);
out.writeByte('\0');
}
}
MessageObj对象
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageObj implements Serializable {
private Integer userId;
private String nickname;
private String sex;
}
服务端解码器实现
public class MessageDecoder extends ByteToMessageDecoder {
/**
* 将二进制解析成对象
* 如果是字符串,长度是不确定的,但是可以通过'\0'这个空字符的位置来判断字符串的长度
*/
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
MessageObj obj = new MessageObj();
// 在解码器中进行数据解码时,需要判断缓冲区数据是否足够
if (in.readableBytes() >= 4) {
// 读取userId
obj.setUserId(in.readInt());
}
if (in.readableBytes() >= 1) {
// 读取nickname
obj.setNickname(readStr(in));
}
if (in.readableBytes() >= 1) {
// 读取sex
obj.setSex(readStr(in));
}
out.add(obj);
}
/**
* 专门读取字符串的方法
*/
private String readStr(ByteBuf in) {
// 字符串长度
int index = 0;
// 标记当前位置
int i = in.readerIndex();
// 通过找空字符位置来获取字符串的长度
byte[] array = new byte[in.readableBytes()];
in.readBytes(array);
for (byte b : array) {
if ((char) b == '\0') {
break;
}
++index;
}
// in.readBytes()影响了读取的位置,重置到上次标记的位置
in.readerIndex(i);
String s = (String) in.readCharSequence(index, CharsetUtil.UTF_8);
// 把最后的'\0'给读取了,不然下一次读取字符串会从'\0'位置开始读取,引发异常
in.readByte();
return s;
}
}
服务端读取之前需要判断是否有数据可读,没有数据读取的话会报错。
因为int类型是固定长度的,直接用readInt就行了。
而String类型不一样,无法确定长度,所以在编码器编码的时候就在每个字符串属性的结尾加上了一个'\0'作标记。
ByteBuf的readerIndex()会记录读取之前的位置,在读取之后再readerIndex(index)设置回去。不然readerIndex就到末尾了,导致下一个属性无法正常读取。
在读取完一个字符串时需要把'\0'这个无效字符给读取一下,让readerIndex +1,不然下一个也是String类型的话,就会从'\0'开始读取,也会无法正常读取数据。另一个原因是,需要将ByteBuf中所有的数据都读完才能正常处理结果,否则抛出异常,提示还有未读完的数据。
编码器写入属性的顺序一定要和解码器读取的顺序一样
测试
客户端发送MessageObj对象到服务端
MessageObj obj = new MessageObj(1, "大哥哥", "变态");
ctx.writeAndFlush(obj);
服务端接收并打印
解决粘包和拆包问题
问题的由来:客户端在向服务端发送消息时,会用将多个数据包打包成一个包发送到服务端,以此来提高传输效率。这就是著名的Nagle算法。
而这种优化却带来了一些问题:
服务端很难分辨出完整的数据包,因为TCP传输是面向字节流的,实际上没有包的概念。
客户端发送了3个包,可能会把1.5个包发送一次,这样服务端就会接收到一个不完整的数据包,解析的时候可能会丢数据。也就是客户端发送了三个包,服务端却只做了2次解析。如果想让服务端正常解析数据,也需要用到编解码器。
看看不做任何处理的情况,客户端连续向服务端发送10个消息
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; ++i) {
ctx.writeAndFlush(Unpooled.copiedBuffer("client:" + i, CharsetUtil.UTF_8));
}
}
}
服务端的接收情况
这10个消息被服务端一次性处理了
自定义编解码器解决粘包和拆包问题
和上面的代码差不多,把MessageObj换成String类型
编码器
public class MessageEncoder extends MessageToByteEncoder<String> {
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
out.writeCharSequence(msg, CharsetUtil.UTF_8);
out.writeByte('\0');
}
}
解码器
public class MessageDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 1) {
// 获取字符串并设置到对象中
out.add(readStr(in));
}
}
/**
* 专门读取字符串的方法
*/
private String readStr(ByteBuf in) {
// 字符串长度
int index = 0;
// 标记当前位置
int i = in.readerIndex();
// 通过找空字符位置来获取字符串的长度
byte[] array = new byte[in.readableBytes()];
in.readBytes(array);
for (byte b : array) {
if ((char) b == '\0') {
break;
}
++index;
}
// in.readBytes()影响了读取的位置,重置到上次标记的位置
in.readerIndex(i);
String s = (String) in.readCharSequence(index, CharsetUtil.UTF_8);
// 把最后的'\0'给读取了,不然下一次读取字符串会从'\0'位置开始读取,引发异常
in.readByte();
return s;
}
}
服务单已经能按顺序读取客户端数据
编解码器是Netty数据接收和发送的关键,消息发送端可以按编码器的规则发送二进制消息,消息接收端可以按解码器的解析顺序读取消息。
源码:https://codox.coding.net/public/netty-learning/netty-learning/git/files