Netty快速入门

Netty中核心组件

  • EventLoopGroup:事件回环组(线程池)。BossGroup负责接收IO请求但不处理,分发给WorkerGroup(又叫ChildGroup)处理。
  • EventLoop:事件回环队列,单线程
  • ServerBootstrap:服务端启动类
  • Bootstrap:客户端启动类
  • ChannelPipeline:管道(双向链表实现,可以对IO事件添加具体处理的Handler)
  • ChannelHandler:IO事件处理器,分别有专门处理读请求和写请求两handler接口。ChannelInboundHandler(读),ChannelOutboundHandler(写)。他们都继承自ChannelHandler接口。

创建EventLoopGroup

首先需要创建两个EventLoopGroup,一个是BossGroup是专门接收IO请求的。一个是WorkerGroup(又叫ChildGroup),专门处理IO请求的。
EventLoopGroup是一个线程池,里面包含了一组EventLoop。EventLoop是单线程的,叫事件回环队列,可以同时监听多个管道(Channel),如果有事件发生,BossGroup就会把事件交给WorkerGroup,WorkerGroup再开个单独的线程去处理。

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

如果没有指定线程数的话,默认是 cpu核数*2 个线程
EventLoop实际类型是NioEventLoop,看源码:EventLoop启动时会执行run方法,然后用个死循环不断的去监听和处理IO事件

@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        ...
    }
}

配置服务的线程模型

然后需要一个启动类来启动这个Netty服务,如果是服务端就用ServerBootstrap,客户端就用Bootstrap。

// 服务端启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 客户端启动类
Bootstrap bootstrap = new Bootstrap();

将 BossGroup和WorkerGroup注册到ServerBootstrap上。Netty中的线程模型有三种:单Reactor单线程模型、单Reactor多线程模型、多Reactor多线程模型。这里用的是最后一种模型,这种线程模型效率最高。
一个EventLoopGroup就是一个Reactor,这里有两个就是多Reactor。因为EventLoopGroup包含了一组EventLoop,每个EventLoop是一个单独的线程,所以EventLoopGroup是多线程的。

serverBootstrap.group(bossGroup, workerGroup);

这段代码就是配置了当前服务用的是多Reactor多线程模型。

配置IO模型

服务端默认用NioServerSocketChannel就行了,如果时客户端启动类,就用NioSocketChannel

// 服务端
serverBootstrap.channel(NioServerSocketChannel.class);
// 客户端
bootstrap.channel(NioSocketChannel.class);

配置事件处理器

handler方法是给BossGroup配置的,childHandler是给WorkerGroup配置的。WorkerGroup才是负责处理IO操作的,所以一般只需要关心childHandler方法就行了。

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        // 官方DISCARD请求例子,只打印请求,不做任何响应
        ch.pipeline().addLast(new DiscardServerHandler());
    }
})

initChannel方法有个SocketChannel类型的参数,这个参数是和客户端连接的通道。可以通过它获取管道(ChannelPipeline),然后向管道中添加事件处理器(ChannelHandler)。
DiscardServerHandler是我自定义的一个事件处理器,它不做任何响应,只是单纯打印客户端发送的消息。

public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 注册时触发
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
    }

    /**
     * 激活后触发
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
    }

    /**
     * 每当从客户端接收到数据时候,调用channelRead方法
     * @param ctx
     * @param msg 消息体,ByteBuf类型
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        String str = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println(str);
        // 相当于 ((ByteBuf) msg).release(),必须手动释放内存
        ReferenceCountUtil.release(msg);
    }

    /**
     * 当IO错误引发异常时,调用exceptionCaught方法
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

自定义事件处理器继承ChannelInboundHandlerAdapter,在发生IO事件时就会执行。ChannelInboundHandlerAdapter是处理读IO的事件处理器,如果在服务端添加,客户端向服务端发消息时会执行。还有一个ChannelOutboundHandlerAdapter是专门处理写IO的事件处理器,在服务端定义,那么服务端向客户端发消息时会执行。

  1. 客户端第一次请求服务端时(第一次IO请求),会触发服务端childHandler中的initChannel方法,用来初始化服务端与客户端之间的IO通道。随后channelRegistered和channelActive会被先后执行。
  2. 在通道未关闭时,客户端每次给服务端发消息都会触发channelRead方法,在channelRead方法中处理消息读取的业务。

channelRead中的消息时Object类型的。Netty中传递的消息都是二进制存在的,用Netty自定义的类型来说就是ByteBuf。所以直接将消息强转成ByteBuf就行了。
如果不想每次都这么麻烦,可以直接继承SimpleChannelInboundHandler接口来自定义事件处理器,可以指定一个泛型,这个泛型就是消息的类型。注意这里重写的是channelRead0方法,在SimpleChannelInboundHandler中channelRead方法会调用channelRead0方法,并且调用结束后自动释放消息占用的内存。
如果类型是自定义对象的话,需要自定义编解码器。在发消息时将对象转成二进制,在读取时再将二进制转成对象。

ChannelPipeline和ChannelHandler关系

ChannelPipeline是一个双向链表。ChannelHandler可以是Netty中已经实现的,也可以是自定义的。ChannelHandler是处理IO的主要业务逻辑。可以对数据进行读写,编码和解码等。

在ChannelPipeline中,处理器是流式的,上一个处理器的结果会转接到下一个处理器,这也是吧ChannelPipeline叫做管道的原因。

ChannelHandler分为输入处理器和输出处理器,看继承结构:

image20200624173258980.png

ChannelInboundHandler是处理读取操作的,ChannelOutboundHandler是处理写入操作

ChannelPipeline处理ChannelInboundHandler是顺序的,处理ChannelOutboundHandler是倒序的,所以往ChannelPipeline中添加ChannelHandler时要注意添加顺序和添加方法。否则会导致有些ChannelHandler不会执行。

ChannelPipeline添加ChannelHandler的方法:

ChannelPipeline pipeline = ch.pipeline();    
pipeline.addLast();
pipeline.addFirst();
pipeline.addBefore();
pipeline.addAfter();

ChannelHandler处理IO事件

自定义一个处理读操作的ChannelHandler,继承ChannelInboundHandlerAdapter。
在channel第一次收到IO请求时服务端和客户端会建立通道,这时会触发channelRegistered然后channelActive。
在有数据进来时,会触发channelRead方法,类型是二进制的,所以之间强转成ByteBuf就行了

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
    String str = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
    ReferenceCountUtil.release(msg);
}

发生异常时会调用exceptionCaught方法。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
    cause.printStackTrace();
    ctx.close();
}

也可以直接用SimpleChannelInboundHandler,可以指定泛型,这样就不用强转了。对基本类型还好说,如果是自定义对象的话,需要自己实现编解码器。需要将对象编码成二进制传输,然后将二进制解码成对象后处理。
可以通过ChannelHandlerContext的channel()方法获取通道,通过通道的writeAndFlush方法向客户端发送数据。

常用配置项

  • option:配置BossGroup
  • childOption:配置WorkerGroup
    可以发现一个套路,以child*开头的都是给WorkerGroup添加的配置。
// 允许客户端最大连接数,如果超过128,服务端拒绝连接
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
    // 保持HTTP连接不马上断开,多路复用(长连接)
    .childOption(ChannelOption.SO_KEEPALIVE, true);

启动服务和关闭

绑定8080端口,已同步的方式启动服务,并以同步的方式关闭。(套路基本固定)

serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();

关闭EventLoopGroup,基本上也是固定代码

workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();

启动服务,用浏览器访问,可以看到控制台打印了HTTP请求报文。

channelRegistered
channelActive
channelRegistered
channelActive
GET / HTTP/1.1
Host: localhost:8080
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Sec-Fetch-Site: none
Sec-Fetch-Mode: navigate
Sec-Fetch-User: ?1
Sec-Fetch-Dest: document
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: _ga=GA1.1.175695201.1590237021; Hm_lvt_8b02a318fde5831da10426656a43d03c=1590237021,1590302503; Idea-cd929520=04f29cff-4389-472f-a52d-67ae6df5b8fc

源码:https://codox.coding.net/public/netty-learning/netty-learning/git/files

# Netty 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×