张坤的个人博客

  • 首页
  • 分类
  • 标签
  • 日志

  • 搜索
Jenkins RabbitMQ Zookeeper IDEA Logstash Kibana ELK NIO Netty Spring Cloud Golang DataX Elasticsearch React Native Mysql H2 Socket Spring Boot Kafka Mybatis Sqlmap Vue Postgresql Docker Vert.x Flutter Flink Redis

编解码器使用

发表于 2020-06-26 | 分类于 Netty | 0 | 阅读次数 28

传输自定义对象

netty中所有数据都是二进制传输的,要想传输自定义对象,需要自定义传输规则,也就是自定义编解码器。
让发消息端知道怎么去将自定义对象编码成什么样的二进制格式,消息接收端知道以什么方式解析二进制成对象。

客户端编码器实现

客户端在写入string类型属性时,在末尾加上一个空字符'\0',解码器在读取的时候以'\0'判断字符串是否结束。

public class MessageEncoder extends MessageToByteEncoder<MessageObj> {

    /**
     * 将对象编码成二进制
     */
    protected void encode(ChannelHandlerContext ctx, MessageObj msg, ByteBuf out) throws Exception {
        // 写入userId
        out.writeInt(msg.getUserId());

        // 在编码器加上'\0'来标记字符串结束
        // 写入nickname
        out.writeCharSequence(msg.getNickname(), CharsetUtil.UTF_8);
        out.writeByte('\0');

        // 写入sex
        out.writeCharSequence(msg.getSex(), CharsetUtil.UTF_8);
        out.writeByte('\0');
    }
}

MessageObj对象

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageObj implements Serializable {
    private Integer userId;
    private String nickname;
    private String sex;
}

服务端解码器实现

public class MessageDecoder extends ByteToMessageDecoder {

    /**
     * 将二进制解析成对象
     * 如果是字符串,长度是不确定的,但是可以通过'\0'这个空字符的位置来判断字符串的长度
     */
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        MessageObj obj = new MessageObj();

        // 在解码器中进行数据解码时,需要判断缓冲区数据是否足够
        if (in.readableBytes() >= 4) {
            // 读取userId
            obj.setUserId(in.readInt());
        }

        if (in.readableBytes() >= 1) {
            // 读取nickname
            obj.setNickname(readStr(in));
        }

        if (in.readableBytes() >= 1) {
            // 读取sex
            obj.setSex(readStr(in));
        }

        out.add(obj);
    }

    /**
     * 专门读取字符串的方法
     */
    private String readStr(ByteBuf in) {
        // 字符串长度
        int index = 0;

        // 标记当前位置
        int i = in.readerIndex();

        // 通过找空字符位置来获取字符串的长度
        byte[] array = new byte[in.readableBytes()];
        in.readBytes(array);
        for (byte b : array) {
            if ((char) b == '\0') {
                break;
            }
            ++index;
        }
        // in.readBytes()影响了读取的位置,重置到上次标记的位置
        in.readerIndex(i);
        String s = (String) in.readCharSequence(index, CharsetUtil.UTF_8);

        // 把最后的'\0'给读取了,不然下一次读取字符串会从'\0'位置开始读取,引发异常
        in.readByte();

        return s;
    }
}

服务端读取之前需要判断是否有数据可读,没有数据读取的话会报错。
因为int类型是固定长度的,直接用readInt就行了。
而String类型不一样,无法确定长度,所以在编码器编码的时候就在每个字符串属性的结尾加上了一个'\0'作标记。

ByteBuf的readerIndex()会记录读取之前的位置,在读取之后再readerIndex(index)设置回去。不然readerIndex就到末尾了,导致下一个属性无法正常读取。

在读取完一个字符串时需要把'\0'这个无效字符给读取一下,让readerIndex +1,不然下一个也是String类型的话,就会从'\0'开始读取,也会无法正常读取数据。另一个原因是,需要将ByteBuf中所有的数据都读完才能正常处理结果,否则抛出异常,提示还有未读完的数据。

编码器写入属性的顺序一定要和解码器读取的顺序一样

测试

客户端发送MessageObj对象到服务端

MessageObj obj = new MessageObj(1, "大哥哥", "变态");
ctx.writeAndFlush(obj);

服务端接收并打印
image.png

解决粘包和拆包问题

问题的由来:客户端在向服务端发送消息时,会用将多个数据包打包成一个包发送到服务端,以此来提高传输效率。这就是著名的Nagle算法。

而这种优化却带来了一些问题:
服务端很难分辨出完整的数据包,因为TCP传输是面向字节流的,实际上没有包的概念。
客户端发送了3个包,可能会把1.5个包发送一次,这样服务端就会接收到一个不完整的数据包,解析的时候可能会丢数据。也就是客户端发送了三个包,服务端却只做了2次解析。如果想让服务端正常解析数据,也需要用到编解码器。

看看不做任何处理的情况,客户端连续向服务端发送10个消息

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; ++i) {
            ctx.writeAndFlush(Unpooled.copiedBuffer("client:" + i, CharsetUtil.UTF_8));
        }
    }
}

服务端的接收情况
image.png
这10个消息被服务端一次性处理了

自定义编解码器解决粘包和拆包问题

和上面的代码差不多,把MessageObj换成String类型

编码器

public class MessageEncoder extends MessageToByteEncoder<String> {

    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        out.writeCharSequence(msg, CharsetUtil.UTF_8);
        out.writeByte('\0');
    }
}

解码器

public class MessageDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= 1) {
            // 获取字符串并设置到对象中
            out.add(readStr(in));
        }
    }

    /**
     * 专门读取字符串的方法
     */
    private String readStr(ByteBuf in) {
        // 字符串长度
        int index = 0;

        // 标记当前位置
        int i = in.readerIndex();

        // 通过找空字符位置来获取字符串的长度
        byte[] array = new byte[in.readableBytes()];
        in.readBytes(array);
        for (byte b : array) {
            if ((char) b == '\0') {
                break;
            }
            ++index;
        }
        // in.readBytes()影响了读取的位置,重置到上次标记的位置
        in.readerIndex(i);

        String s = (String) in.readCharSequence(index, CharsetUtil.UTF_8);

        // 把最后的'\0'给读取了,不然下一次读取字符串会从'\0'位置开始读取,引发异常
        in.readByte();

        return s;
    }
}

服务单已经能按顺序读取客户端数据
image.png

编解码器是Netty数据接收和发送的关键,消息发送端可以按编码器的规则发送二进制消息,消息接收端可以按解码器的解析顺序读取消息。


源码:https://codox.coding.net/public/netty-learning/netty-learning/git/files

# Jenkins # RabbitMQ # Zookeeper # IDEA # Logstash # Kibana # ELK # NIO # Netty # Spring Cloud # Golang # DataX # Elasticsearch # React Native # Mysql # H2 # Socket # Spring Boot # Kafka # Mybatis # Sqlmap # Vue # Postgresql # Docker # Vert.x # Flutter # Flink # Redis
ChannelHandler执行顺序
ELK深入浅出
  • 文章目录
  • 站点概览
会Coding的猴子

会Coding的猴子

57 日志
19 分类
28 标签
RSS
Github
Creative Commons
© 2021 会Coding的猴子
由 Halo 强力驱动
|
主题 - NexT.Gemini v5.1.4

湘ICP备18011740号