张坤的个人博客

  • 首页
  • 分类
  • 标签
  • 日志

  • 搜索
Lombok Dubbo Jenkins RabbitMQ Zookeeper IDEA Logstash Kibana ELK NIO Netty Spring Cloud Golang DataX Elasticsearch React Native Mysql H2 Socket Spring Boot Kafka Mybatis Sqlmap Vue Postgresql Docker Vert.x Flutter Flink Redis

Netty快速入门

发表于 2020-06-25 | 分类于 Netty | 0 | 阅读次数 73

Netty中核心组件

  • EventLoopGroup:事件回环组(线程池)。BossGroup负责接收IO请求但不处理,分发给WorkerGroup(又叫ChildGroup)处理。
  • EventLoop:事件回环队列,单线程
  • ServerBootstrap:服务端启动类
  • Bootstrap:客户端启动类
  • ChannelPipeline:管道(双向链表实现,可以对IO事件添加具体处理的Handler)
  • ChannelHandler:IO事件处理器,分别有专门处理读请求和写请求两handler接口。ChannelInboundHandler(读),ChannelOutboundHandler(写)。他们都继承自ChannelHandler接口。

创建EventLoopGroup

首先需要创建两个EventLoopGroup,一个是BossGroup是专门接收IO请求的。一个是WorkerGroup(又叫ChildGroup),专门处理IO请求的。
EventLoopGroup是一个线程池,里面包含了一组EventLoop。EventLoop是单线程的,叫事件回环队列,可以同时监听多个管道(Channel),如果有事件发生,BossGroup就会把事件交给WorkerGroup,WorkerGroup再开个单独的线程去处理。

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

如果没有指定线程数的话,默认是 cpu核数*2 个线程
EventLoop实际类型是NioEventLoop,看源码:EventLoop启动时会执行run方法,然后用个死循环不断的去监听和处理IO事件

@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        ...
    }
}

配置服务的线程模型

然后需要一个启动类来启动这个Netty服务,如果是服务端就用ServerBootstrap,客户端就用Bootstrap。

// 服务端启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 客户端启动类
Bootstrap bootstrap = new Bootstrap();

将 BossGroup和WorkerGroup注册到ServerBootstrap上。Netty中的线程模型有三种:单Reactor单线程模型、单Reactor多线程模型、多Reactor多线程模型。这里用的是最后一种模型,这种线程模型效率最高。
一个EventLoopGroup就是一个Reactor,这里有两个就是多Reactor。因为EventLoopGroup包含了一组EventLoop,每个EventLoop是一个单独的线程,所以EventLoopGroup是多线程的。

serverBootstrap.group(bossGroup, workerGroup);

这段代码就是配置了当前服务用的是多Reactor多线程模型。

配置IO模型

服务端默认用NioServerSocketChannel就行了,如果时客户端启动类,就用NioSocketChannel

// 服务端
serverBootstrap.channel(NioServerSocketChannel.class);
// 客户端
bootstrap.channel(NioSocketChannel.class);

配置事件处理器

handler方法是给BossGroup配置的,childHandler是给WorkerGroup配置的。WorkerGroup才是负责处理IO操作的,所以一般只需要关心childHandler方法就行了。

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        // 官方DISCARD请求例子,只打印请求,不做任何响应
        ch.pipeline().addLast(new DiscardServerHandler());
    }
})

initChannel方法有个SocketChannel类型的参数,这个参数是和客户端连接的通道。可以通过它获取管道(ChannelPipeline),然后向管道中添加事件处理器(ChannelHandler)。
DiscardServerHandler是我自定义的一个事件处理器,它不做任何响应,只是单纯打印客户端发送的消息。

public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 注册时触发
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
    }

    /**
     * 激活后触发
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
    }

    /**
     * 每当从客户端接收到数据时候,调用channelRead方法
     * @param ctx
     * @param msg 消息体,ByteBuf类型
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        String str = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println(str);
        // 相当于 ((ByteBuf) msg).release(),必须手动释放内存
        ReferenceCountUtil.release(msg);
    }

    /**
     * 当IO错误引发异常时,调用exceptionCaught方法
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

自定义事件处理器继承ChannelInboundHandlerAdapter,在发生IO事件时就会执行。ChannelInboundHandlerAdapter是处理读IO的事件处理器,如果在服务端添加,客户端向服务端发消息时会执行。还有一个ChannelOutboundHandlerAdapter是专门处理写IO的事件处理器,在服务端定义,那么服务端向客户端发消息时会执行。

  1. 客户端第一次请求服务端时(第一次IO请求),会触发服务端childHandler中的initChannel方法,用来初始化服务端与客户端之间的IO通道。随后channelRegistered和channelActive会被先后执行。
  2. 在通道未关闭时,客户端每次给服务端发消息都会触发channelRead方法,在channelRead方法中处理消息读取的业务。

channelRead中的消息时Object类型的。Netty中传递的消息都是二进制存在的,用Netty自定义的类型来说就是ByteBuf。所以直接将消息强转成ByteBuf就行了。
如果不想每次都这么麻烦,可以直接继承SimpleChannelInboundHandler接口来自定义事件处理器,可以指定一个泛型,这个泛型就是消息的类型。注意这里重写的是channelRead0方法,在SimpleChannelInboundHandler中channelRead方法会调用channelRead0方法,并且调用结束后自动释放消息占用的内存。
如果类型是自定义对象的话,需要自定义编解码器。在发消息时将对象转成二进制,在读取时再将二进制转成对象。

ChannelPipeline和ChannelHandler关系

ChannelPipeline是一个双向链表。ChannelHandler可以是Netty中已经实现的,也可以是自定义的。ChannelHandler是处理IO的主要业务逻辑。可以对数据进行读写,编码和解码等。

在ChannelPipeline中,处理器是流式的,上一个处理器的结果会转接到下一个处理器,这也是吧ChannelPipeline叫做管道的原因。

ChannelHandler分为输入处理器和输出处理器,看继承结构:

image20200624173258980.png

ChannelInboundHandler是处理读取操作的,ChannelOutboundHandler是处理写入操作

ChannelPipeline处理ChannelInboundHandler是顺序的,处理ChannelOutboundHandler是倒序的,所以往ChannelPipeline中添加ChannelHandler时要注意添加顺序和添加方法。否则会导致有些ChannelHandler不会执行。

ChannelPipeline添加ChannelHandler的方法:

ChannelPipeline pipeline = ch.pipeline();    
pipeline.addLast();
pipeline.addFirst();
pipeline.addBefore();
pipeline.addAfter();

ChannelHandler处理IO事件

自定义一个处理读操作的ChannelHandler,继承ChannelInboundHandlerAdapter。
在channel第一次收到IO请求时服务端和客户端会建立通道,这时会触发channelRegistered然后channelActive。
在有数据进来时,会触发channelRead方法,类型是二进制的,所以之间强转成ByteBuf就行了

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
    String str = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
    ReferenceCountUtil.release(msg);
}

发生异常时会调用exceptionCaught方法。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
    cause.printStackTrace();
    ctx.close();
}

也可以直接用SimpleChannelInboundHandler,可以指定泛型,这样就不用强转了。对基本类型还好说,如果是自定义对象的话,需要自己实现编解码器。需要将对象编码成二进制传输,然后将二进制解码成对象后处理。
可以通过ChannelHandlerContext的channel()方法获取通道,通过通道的writeAndFlush方法向客户端发送数据。

常用配置项

  • option:配置BossGroup
  • childOption:配置WorkerGroup
    可以发现一个套路,以child*开头的都是给WorkerGroup添加的配置。
// 允许客户端最大连接数,如果超过128,服务端拒绝连接
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
    // 保持HTTP连接不马上断开,多路复用(长连接)
    .childOption(ChannelOption.SO_KEEPALIVE, true);

启动服务和关闭

绑定8080端口,已同步的方式启动服务,并以同步的方式关闭。(套路基本固定)

serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();

关闭EventLoopGroup,基本上也是固定代码

workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();

启动服务,用浏览器访问,可以看到控制台打印了HTTP请求报文。

channelRegistered
channelActive
channelRegistered
channelActive
GET / HTTP/1.1
Host: localhost:8080
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Sec-Fetch-Site: none
Sec-Fetch-Mode: navigate
Sec-Fetch-User: ?1
Sec-Fetch-Dest: document
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: _ga=GA1.1.175695201.1590237021; Hm_lvt_8b02a318fde5831da10426656a43d03c=1590237021,1590302503; Idea-cd929520=04f29cff-4389-472f-a52d-67ae6df5b8fc

源码:https://codox.coding.net/public/netty-learning/netty-learning/git/files

# Lombok # Dubbo # Jenkins # RabbitMQ # Zookeeper # IDEA # Logstash # Kibana # ELK # NIO # Netty # Spring Cloud # Golang # DataX # Elasticsearch # React Native # Mysql # H2 # Socket # Spring Boot # Kafka # Mybatis # Sqlmap # Vue # Postgresql # Docker # Vert.x # Flutter # Flink # Redis
IO基础回顾
ChannelHandler执行顺序
  • 文章目录
  • 站点概览
会Coding的猴子

会Coding的猴子

58 日志
20 分类
30 标签
RSS
Github
Creative Commons
© 2021 会Coding的猴子
由 Halo 强力驱动
|
主题 - NexT.Gemini v5.1.4

湘ICP备18011740号