编解码器使用

传输自定义对象

netty中所有数据都是二进制传输的,要想传输自定义对象,需要自定义传输规则,也就是自定义编解码器。
让发消息端知道怎么去将自定义对象编码成什么样的二进制格式,消息接收端知道以什么方式解析二进制成对象。

客户端编码器实现

客户端在写入string类型属性时,在末尾加上一个空字符'\0',解码器在读取的时候以'\0'判断字符串是否结束。

public class MessageEncoder extends MessageToByteEncoder<MessageObj> {

    /**
     * 将对象编码成二进制
     */
    protected void encode(ChannelHandlerContext ctx, MessageObj msg, ByteBuf out) throws Exception {
        // 写入userId
        out.writeInt(msg.getUserId());

        // 在编码器加上'\0'来标记字符串结束
        // 写入nickname
        out.writeCharSequence(msg.getNickname(), CharsetUtil.UTF_8);
        out.writeByte('\0');

        // 写入sex
        out.writeCharSequence(msg.getSex(), CharsetUtil.UTF_8);
        out.writeByte('\0');
    }
}

MessageObj对象

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageObj implements Serializable {
    private Integer userId;
    private String nickname;
    private String sex;
}

服务端解码器实现

public class MessageDecoder extends ByteToMessageDecoder {

    /**
     * 将二进制解析成对象
     * 如果是字符串,长度是不确定的,但是可以通过'\0'这个空字符的位置来判断字符串的长度
     */
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        MessageObj obj = new MessageObj();

        // 在解码器中进行数据解码时,需要判断缓冲区数据是否足够
        if (in.readableBytes() >= 4) {
            // 读取userId
            obj.setUserId(in.readInt());
        }

        if (in.readableBytes() >= 1) {
            // 读取nickname
            obj.setNickname(readStr(in));
        }

        if (in.readableBytes() >= 1) {
            // 读取sex
            obj.setSex(readStr(in));
        }

        out.add(obj);
    }

    /**
     * 专门读取字符串的方法
     */
    private String readStr(ByteBuf in) {
        // 字符串长度
        int index = 0;

        // 标记当前位置
        int i = in.readerIndex();

        // 通过找空字符位置来获取字符串的长度
        byte[] array = new byte[in.readableBytes()];
        in.readBytes(array);
        for (byte b : array) {
            if ((char) b == '\0') {
                break;
            }
            ++index;
        }
        // in.readBytes()影响了读取的位置,重置到上次标记的位置
        in.readerIndex(i);
        String s = (String) in.readCharSequence(index, CharsetUtil.UTF_8);

        // 把最后的'\0'给读取了,不然下一次读取字符串会从'\0'位置开始读取,引发异常
        in.readByte();

        return s;
    }
}

服务端读取之前需要判断是否有数据可读,没有数据读取的话会报错。
因为int类型是固定长度的,直接用readInt就行了。
而String类型不一样,无法确定长度,所以在编码器编码的时候就在每个字符串属性的结尾加上了一个'\0'作标记。

ByteBuf的readerIndex()会记录读取之前的位置,在读取之后再readerIndex(index)设置回去。不然readerIndex就到末尾了,导致下一个属性无法正常读取。

在读取完一个字符串时需要把'\0'这个无效字符给读取一下,让readerIndex +1,不然下一个也是String类型的话,就会从'\0'开始读取,也会无法正常读取数据。另一个原因是,需要将ByteBuf中所有的数据都读完才能正常处理结果,否则抛出异常,提示还有未读完的数据。

编码器写入属性的顺序一定要和解码器读取的顺序一样

测试

客户端发送MessageObj对象到服务端

MessageObj obj = new MessageObj(1, "大哥哥", "变态");
ctx.writeAndFlush(obj);

服务端接收并打印
image.png

解决粘包和拆包问题

问题的由来:客户端在向服务端发送消息时,会用将多个数据包打包成一个包发送到服务端,以此来提高传输效率。这就是著名的Nagle算法。

而这种优化却带来了一些问题:
服务端很难分辨出完整的数据包,因为TCP传输是面向字节流的,实际上没有包的概念。
客户端发送了3个包,可能会把1.5个包发送一次,这样服务端就会接收到一个不完整的数据包,解析的时候可能会丢数据。也就是客户端发送了三个包,服务端却只做了2次解析。如果想让服务端正常解析数据,也需要用到编解码器。

看看不做任何处理的情况,客户端连续向服务端发送10个消息

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; ++i) {
            ctx.writeAndFlush(Unpooled.copiedBuffer("client:" + i, CharsetUtil.UTF_8));
        }
    }
}

服务端的接收情况
image.png
这10个消息被服务端一次性处理了

自定义编解码器解决粘包和拆包问题

和上面的代码差不多,把MessageObj换成String类型

编码器

public class MessageEncoder extends MessageToByteEncoder<String> {

    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        out.writeCharSequence(msg, CharsetUtil.UTF_8);
        out.writeByte('\0');
    }
}

解码器

public class MessageDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= 1) {
            // 获取字符串并设置到对象中
            out.add(readStr(in));
        }
    }

    /**
     * 专门读取字符串的方法
     */
    private String readStr(ByteBuf in) {
        // 字符串长度
        int index = 0;

        // 标记当前位置
        int i = in.readerIndex();

        // 通过找空字符位置来获取字符串的长度
        byte[] array = new byte[in.readableBytes()];
        in.readBytes(array);
        for (byte b : array) {
            if ((char) b == '\0') {
                break;
            }
            ++index;
        }
        // in.readBytes()影响了读取的位置,重置到上次标记的位置
        in.readerIndex(i);

        String s = (String) in.readCharSequence(index, CharsetUtil.UTF_8);

        // 把最后的'\0'给读取了,不然下一次读取字符串会从'\0'位置开始读取,引发异常
        in.readByte();

        return s;
    }
}

服务单已经能按顺序读取客户端数据
image.png

编解码器是Netty数据接收和发送的关键,消息发送端可以按编码器的规则发送二进制消息,消息接收端可以按解码器的解析顺序读取消息。


源码:https://codox.coding.net/public/netty-learning/netty-learning/git/files

# Netty 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×