张坤的个人博客

  • 首页
  • 分类
  • 标签
  • 日志

  • 搜索
Jenkins RabbitMQ Zookeeper IDEA Logstash Kibana ELK NIO Netty Spring Cloud Golang DataX Elasticsearch React Native Mysql H2 Socket Spring Boot Kafka Mybatis Sqlmap Vue Postgresql Docker Vert.x Flutter Flink Redis

ChannelHandler执行顺序

发表于 2020-06-25 | 分类于 Netty | 0 | 阅读次数 27

首先需要在ChannelInitializer的initChannel方法中获取ChannelPipeline对象,这个ChannelPipeline是一个双向链表。需要向ChannelPipeline中添加ChannelHandler。需要注意添加顺序,因为这影响到ChannelHandler的执行顺序,甚至可能导致ChannelHandler没有被执行。

Netty服务端启动类

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();

                // 写IO事件倒序执行,从下网上,因为是在MyChannelServerHandler2中向客户端发消息的。
                // 写IO事件时从下往上执行的,所以要定义在读IO事件处理器的前面,否则写IO事件处理器不会被调用
                pipeline.addLast(new MyChannelServerOutHandler());
                pipeline.addLast(new MyChannelServerOutHandler2());

                // 读IO事件顺序执行,从上往下
                pipeline.addLast(new MyChannelServerHandler());
                pipeline.addLast(new MyChannelServerHandler2());

                // 上面那种写法不人性话,所以netty为我们提供了addFirst方法,可以将写IO的事件添加到管道头部。
                // 这就不需要ChannelOutboundHandlerAdapter一定要定义在ChannelInboundHandlerAdapter的前面
                //                            pipeline.addFirst(new MyChannelServerOutHandler());
                //                            pipeline.addFirst(new MyChannelServerOutHandler2());
            }
        })
        .option(ChannelOption.SO_BACKLOG, 128)
        .childOption(ChannelOption.SO_KEEPALIVE, true);

    ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
    channelFuture.channel().closeFuture().sync();
} finally {
    // 退出线程
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

服务端ChannelHandler

MyChannelServerHandler.java

public class MyChannelServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 在channelRead中调用了这个方法,并会自动释放msg占用的内存
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("MyChannelServerHandler:" + byteBuf.toString(CharsetUtil.UTF_8));

        // 跳到下一个ChannelHandler,否则不调用下一个ChannelHandler
        // 如果这里实现的是SimpleChannelInboundHandler那么在传递给下一个ChannelHandler时会抛异常
        // 因为SimpleChannelInboundHandler中的channelRead方法调用完channelRead0会自动销毁消息对象
        ctx.fireChannelRead(msg);
    }
}

可以通过fireChannelRead方法传递给ChannelPipeline中的下一个ChannelInboundHandlerAdapter实现的ChannelHandler

MyChannelServerHandler2.java

public class MyChannelServerHandler2 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
        ReferenceCountUtil.release(msg);

        // 像客户端发送消息,如果有ChannelOutboundHandlerAdapter的IO处理器,就转给下一个ChannelHandler处理
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端响应结果", CharsetUtil.UTF_8));
    }
}

在MyChannelServerHandler2处理中MyChannelServerHandler传递过来的消息,并向客户端发送消息。writeAndFlush方法会传递给ChannelPipeline中的下一个ChannelOutboundHandlerAdapter实现的ChannelHandler。如果没有的话,就直接返回给客户端。

MyChannelServerOutHandler.java

public class MyChannelServerOutHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        String s = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println("MyChannelServerOutHandler1:" + s);

        // 向管道写入数据到下一个ChannelHandler,如果没有下一个ChannelHandler,就直接返回给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer(s + "->MyChannelServerOutHandler", CharsetUtil.UTF_8), promise);
    }
}

继续返回给下一个ChannelOutboundHandlerAdapter实现的ChannelHandler

MyChannelServerOutHandler2.java

public class MyChannelServerOutHandler2 extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        String s = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println("MyChannelServerOutHandler2:" + s);

        // 向管道写入数据到下一个ChannelHandler,如果没有下一个ChannelHandler,就直接返回给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer(s + "->MyChannelServerOutHandler2", CharsetUtil.UTF_8), promise);
    }
}

因为ChannelPipeline中已经没有ChannelOutboundHandlerAdapter实现的ChannelHandler了,所以直接返回给客户端。

服务端在接收客户消息时,会给客户端也发送消息。
其中MyChannelServerHandler和MyChannelServerHandler2是处理读IO事件的,继承自ChannelInboundHandlerAdapter。在服务端给客户端发送消息时,会触发写事件,所以会执行MyChannelServerOutHandler和MyChannelServerOutHandler2,继承自ChannelOutboundHandlerAdapter。

ChannelHandler的执行顺序是MyChannelServerHandler -> MyChannelServerHandler2 -> MyChannelServerOutHandler2 -> MyChannelServerOutHandler。

客户端启动类

public class Client {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ClientHandler());

            Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();


            channel.writeAndFlush(Unpooled.copiedBuffer("大哥哥哥", CharsetUtil.UTF_8));

            channel.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端ChannelHandler

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println(msg.toString(CharsetUtil.UTF_8));
    }
}

测试结果

运行服务端,再运行客户端之后,客户端打印消息

服务端响应结果->MyChannelServerOutHandler2->MyChannelServerOutHandler

服务端打印消息

服务启动成功..
MyChannelServerHandler:大哥哥哥
大哥哥哥
MyChannelServerOutHandler2:服务端响应结果
MyChannelServerOutHandler1:服务端响应结果->MyChannelServerOutHandler2

所以刚才的推断是正确的,得出结论:
ChannelInboundHandlerAdapter是读IO事件处理器,是从上到下执行的。
ChannelOutboundHandlerAdapter是写IO事件处理器,是从下到上执行的。


源码:https://codox.coding.net/public/netty-learning/netty-learning/git/files

# Jenkins # RabbitMQ # Zookeeper # IDEA # Logstash # Kibana # ELK # NIO # Netty # Spring Cloud # Golang # DataX # Elasticsearch # React Native # Mysql # H2 # Socket # Spring Boot # Kafka # Mybatis # Sqlmap # Vue # Postgresql # Docker # Vert.x # Flutter # Flink # Redis
Netty快速入门
编解码器使用
  • 文章目录
  • 站点概览
会Coding的猴子

会Coding的猴子

57 日志
19 分类
28 标签
RSS
Github
Creative Commons
© 2021 会Coding的猴子
由 Halo 强力驱动
|
主题 - NexT.Gemini v5.1.4

湘ICP备18011740号