首先需要在ChannelInitializer的initChannel方法中获取ChannelPipeline对象,这个ChannelPipeline是一个双向链表。需要向ChannelPipeline中添加ChannelHandler。需要注意添加顺序,因为这影响到ChannelHandler的执行顺序,甚至可能导致ChannelHandler没有被执行。
Netty服务端启动类
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 写IO事件倒序执行,从下网上,因为是在MyChannelServerHandler2中向客户端发消息的。
// 写IO事件时从下往上执行的,所以要定义在读IO事件处理器的前面,否则写IO事件处理器不会被调用
pipeline.addLast(new MyChannelServerOutHandler());
pipeline.addLast(new MyChannelServerOutHandler2());
// 读IO事件顺序执行,从上往下
pipeline.addLast(new MyChannelServerHandler());
pipeline.addLast(new MyChannelServerHandler2());
// 上面那种写法不人性话,所以netty为我们提供了addFirst方法,可以将写IO的事件添加到管道头部。
// 这就不需要ChannelOutboundHandlerAdapter一定要定义在ChannelInboundHandlerAdapter的前面
// pipeline.addFirst(new MyChannelServerOutHandler());
// pipeline.addFirst(new MyChannelServerOutHandler2());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
// 退出线程
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
服务端ChannelHandler
MyChannelServerHandler.java
public class MyChannelServerHandler extends ChannelInboundHandlerAdapter {
/**
* 在channelRead中调用了这个方法,并会自动释放msg占用的内存
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("MyChannelServerHandler:" + byteBuf.toString(CharsetUtil.UTF_8));
// 跳到下一个ChannelHandler,否则不调用下一个ChannelHandler
// 如果这里实现的是SimpleChannelInboundHandler那么在传递给下一个ChannelHandler时会抛异常
// 因为SimpleChannelInboundHandler中的channelRead方法调用完channelRead0会自动销毁消息对象
ctx.fireChannelRead(msg);
}
}
可以通过fireChannelRead方法传递给ChannelPipeline中的下一个ChannelInboundHandlerAdapter实现的ChannelHandler
MyChannelServerHandler2.java
public class MyChannelServerHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
ReferenceCountUtil.release(msg);
// 像客户端发送消息,如果有ChannelOutboundHandlerAdapter的IO处理器,就转给下一个ChannelHandler处理
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端响应结果", CharsetUtil.UTF_8));
}
}
在MyChannelServerHandler2处理中MyChannelServerHandler传递过来的消息,并向客户端发送消息。writeAndFlush方法会传递给ChannelPipeline中的下一个ChannelOutboundHandlerAdapter实现的ChannelHandler。如果没有的话,就直接返回给客户端。
MyChannelServerOutHandler.java
public class MyChannelServerOutHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
String s = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
System.out.println("MyChannelServerOutHandler1:" + s);
// 向管道写入数据到下一个ChannelHandler,如果没有下一个ChannelHandler,就直接返回给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer(s + "->MyChannelServerOutHandler", CharsetUtil.UTF_8), promise);
}
}
继续返回给下一个ChannelOutboundHandlerAdapter实现的ChannelHandler
MyChannelServerOutHandler2.java
public class MyChannelServerOutHandler2 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
String s = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
System.out.println("MyChannelServerOutHandler2:" + s);
// 向管道写入数据到下一个ChannelHandler,如果没有下一个ChannelHandler,就直接返回给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer(s + "->MyChannelServerOutHandler2", CharsetUtil.UTF_8), promise);
}
}
因为ChannelPipeline中已经没有ChannelOutboundHandlerAdapter实现的ChannelHandler了,所以直接返回给客户端。
服务端在接收客户消息时,会给客户端也发送消息。
其中MyChannelServerHandler和MyChannelServerHandler2是处理读IO事件的,继承自ChannelInboundHandlerAdapter。在服务端给客户端发送消息时,会触发写事件,所以会执行MyChannelServerOutHandler和MyChannelServerOutHandler2,继承自ChannelOutboundHandlerAdapter。
ChannelHandler的执行顺序是MyChannelServerHandler -> MyChannelServerHandler2 -> MyChannelServerOutHandler2 -> MyChannelServerOutHandler
。
客户端启动类
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ClientHandler());
Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
channel.writeAndFlush(Unpooled.copiedBuffer("大哥哥哥", CharsetUtil.UTF_8));
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客户端ChannelHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println(msg.toString(CharsetUtil.UTF_8));
}
}
测试结果
运行服务端,再运行客户端之后,客户端打印消息
服务端响应结果->MyChannelServerOutHandler2->MyChannelServerOutHandler
服务端打印消息
服务启动成功..
MyChannelServerHandler:大哥哥哥
大哥哥哥
MyChannelServerOutHandler2:服务端响应结果
MyChannelServerOutHandler1:服务端响应结果->MyChannelServerOutHandler2
所以刚才的推断是正确的,得出结论:
ChannelInboundHandlerAdapter是读IO事件处理器,是从上到下执行的。
ChannelOutboundHandlerAdapter是写IO事件处理器,是从下到上执行的。
源码:https://codox.coding.net/public/netty-learning/netty-learning/git/files