netty 底层的工作原理

什么是 Netty

Netty 是一个高性能的网络通信框架,封装了底层复杂的 socket 编程细节,让我们可以高效快速构建自己的应用

有哪些开源框架用了 Netty 呢?grpc、dubbo、kafka、rocketmq、zookeeper、hadoop

Netty Demo

server 端启动 netty 服务器

public static void main(String[] args) throws InterruptedException {
    EventLoopGroup boss = new NioEventLoopGroup();
    EventLoopGroup worker = new NioEventLoopGroup(); try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class) // 设置接收缓冲区大小 // 控制窗口值 .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 设置发送缓冲区大小 .childOption(ChannelOption.SO_SNDBUF, 32 * 1023)
                .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline channelPipeline = ch.pipeline();
                        channelPipeline.addLast(new JdrpcCodec());
                        channelPipeline.addLast(new JdrpcServerHandler());
                    }
                });
        ChannelFuture channelFuture = serverBootstrap.bind(JdrpcConstant.PORT).sync();

        channelFuture.addListener(future -> { if (future.isSuccess()) {
               logger.info("服务启动成功,绑定端口: {}", JdrpcConstant.PORT);
           } else {
               logger.error("服务启动失败");
           }
        });
        channelFuture.channel().closeFuture().sync();

    } finally {
        boss.shutdownGracefully();
        worker.shutdownGracefully();
    }
}

启动 client

public static void main(String[] args) throws InterruptedException {
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(eventLoopGroup)
            .channel(NioSocketChannel.class) // 连接超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 保持长连接心跳 .option(ChannelOption.SO_KEEPALIVE, true) // 禁用 Nagle 算法 .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline() // 设置编码解码器 .addLast(new JdrpcCodec()) // 业务逻辑处理类 .addLast(new JdrpcClientHandler());
                }
            });
    bootstrap.connect("127.0.0.1", JdrpcConstant.PORT).addListener(future -> { if (future.isSuccess()) {
            logger.info("连接成功, 启动控制台");
            waitInputMessage(((ChannelFuture) future).channel());
        } else {
            logger.error("连接失败");
        }
    });
}

通信协议

通信协议用来指定调用方和被调用方通信的规则,满足通信协议字节数组即为一个完整的数据包

我们来看看阿里的 dubbo 协议(rpc 协议)


某某公司的 rpc 协议,先要读取到一个 \n 换行符然后得到数据包长度 length 字段,然后读取 length 长度的内容

<=4 字节 1字节 不确定
length \n content

序列化协议

序列化协议用来指定通信协议中的内容部分该如何序列化传输,所有协议的设计无非是下面几个点

(1)可读性

(2)编解码压缩解压缩效率

(3)压缩后包的大小

json 序列化:将需要传输的对象序列化为 json 字符串,同时获取字符串的字节数组填充内容,jdrpc 采用的就是 json 序列化

hessian 序列化、protobuf 序列化:采用一些高效的序列化算法让内容数据包足够小,压缩效率足够快,最后直接序列化为字节数组

各种序列化协议的性能比较,序列化后数据包大小比较





缓冲区 ByteBuf

在读写 socket 数据的时候是基于缓冲区进行数据读写的,缓冲区分为 2 大类堆缓冲区堆外缓冲区

当网卡收到数据后会请求中断将内核将数据通过 DMA 拷贝到 tcp 缓冲区中,如果是边缘触发机制的话收到数据就会通知 read 事件准备就绪,此时进程可以开始读取数据

堆缓冲区:数据是存储在 Java 堆中的,通过 CPU 拷贝内核 tcp 缓冲区数据到用户态,然后 CPU 拷贝用户态数据到 Java 堆中

堆外缓冲区(也叫直接缓冲区):数据是存储在非堆的,通过 CPU 拷贝内核 tcp 缓冲区数据到用户态,相比堆缓冲区少了一次数据拷贝

需要注意的是当我们使用的是堆缓冲区,那么发送数据的时候需要将堆缓冲区先拷贝到直接缓冲区,然后基于直接缓冲区进行数据的发送,因为底层网络传输的是需要传递不可变的引用地址,而 Java 堆是会随着 gc 而改变位置的

复合缓冲区:聚合了多个 ByteBuf

什么时候用堆缓冲区?什么时候用堆外缓冲区?

netty 中的缓冲区ByteBuf 设计


该图来自于, 跟着闪电侠学netty

编码

按照通信协议的规范来组织数据,将需要传输的对象编码为字节数组进行传输,如以下的编码方式为 jdrpc 协议通信方式

(1)写入数据包长度

(2)写入分隔符

(3)采用 JSON 序列化内容,将其转换为 JSON 的字节数组

jdrpc 协议这里的设计如果数据包长度是固定 4 个字节那么就没有必要写入分隔符了,当然如果长度不是固定的如 1-4 字节那么确实有分隔符存在的必要

protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
  JSONObject object = (JSONObject) msg;
  String content = object.toJSONString(); // 获取内容的字节数组 byte[] bytes = content.getBytes(StandardCharsets.UTF_8); // 写入数据包长度 // int 为 4 字节 out.writeInt(bytes.length); // 写入分隔符 out.writeByte('\n'); // 写入字节数组 out.writeBytes(bytes);
}

解码

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { // 寻找 \n 分隔符 int i = in.forEachByte(ByteProcessor.FIND_LF); if (i < 0) { return;
    } // 找到了分隔符 // 读取数据包长度 // 记录当前读取到的 index 标记 in.markReaderIndex();
    byte[] lengthBytes = new byte[i]; in.readBytes(lengthBytes); // 获取长度,读数据数据包 int length = Util.byteToInt(lengthBytes);
    logger.info("获取到的长度: {}", length); // tcp 缓冲区的数据是否满足一个数据包长度的要求 // 满足就进行粘包,不满足则继续等待数据包 if (in.readableBytes() < length) { // 还原读取的位置 in.resetReaderIndex(); return;
    } // 跳过 \n in.skipBytes(1); // 读取数据 byte[] content = new byte[length]; in.readBytes(content); String res = new String(content, StandardCharsets.UTF_8);
    out.add(res); // 释放缓冲区已经处理的字节避免内存泄露 ReferenceCountUtil.release(msg);
}

当被调用方收到请求时候,按照通信协议的规范进行解码出来请求内容,比如 jdrpc 协议

(1)遍历 tcp 缓冲区寻找 \n 换行符,记录其偏移量

(2)读取偏移量前的字节数组将其转换为 int 得到内容的长度 length

(3)跳过 \n 换行符字节

(4)读取 length 个字节数据,将其转换为 String 即为 JSON 字符串



当我们在读取缓冲区数据进行解码的时候会出现以下几种情况

1. 遍历 tcp 缓冲区数据结果没有找到分隔符 \n

继续向 selector 注册 OP_READ 事件,当后续数据到达 tcp 缓冲区时候基于边缘触发机制,select 感知到读事件返回,继续处理 tcp 缓冲区数据直到找到了 \n



2. 找到了 \n 解析出来内容长度为 5,结果检索 tcp 缓冲区中数据只有 3字节 不满足一个数据包要求

这种处理方式跟第一种方式一样



3. 找到了 \n 解析出来内容长度为 5,结果检索 tcp 缓冲区中居然有 8 字节的数据多了3个字节

值读取 \n 后 5 字节数据形成一个完整的数据包后 length,然后跳过一个字节的分隔符,再次去缓冲区读取 length 个字节,多出来的字节不做处理,就后续 rpc 逻辑调用,剩下的数据再次走从第一步开始的逻辑



4. tcp 缓冲区的数据刚好满足一个数据包

在这种情况下 tcp 缓冲区数据不多不少刚刚满足一个数据包的大小



那么什么情况下才会出现无法收到完整数据包的情况呢? 比如 1 和 2

我们知道以太网数据链路层的 MTU 数据包默认为最大 1500 字节,我们的 TCP 数据包内购传递的最大内容 MSS = 1500 - IP 头部 - tcp 头部 = 1460 字节

编码后的单个数据包太大,大于 1500 字节

数据包大于了单词数据链路层传递的最大长度,就会分批发送

接收缓冲区几乎被打满了

TCP 具备拥塞控制功能,当服务器接收缓冲区只剩下 2 字节空间的时候,那么客户端如果一股脑的无脑发数据,数据存储到哪去?所以客户端也只会发送 2 字节,当接收缓冲区为 0 的时候,客户端就会暂停发送

网卡流量几乎被打满了

尽管当前 socket 接收缓冲区还有充足空间比如 2M,但是网卡被其它请求打满了,只有 2 字节的剩余,这个时候我们也只会收到 <= 2 字节的数据包

服务处理速度太慢

比如操作耗时较久没有及时释放空间,导致 TCP 的释放速度跟不上新增速度,这也是导致第三点的原因之一

:Netty 采用的是边缘触发模式,内核组装完成一个 TCP 数据包后就会告知用户线程可以处理了,这个时候就会拿到不完整的数据

长连接保活机制

当客户端和服务端建立好长连接后,如果一方长时间没有请求那么有可能这个链接已经断开了,这时候为了避免资源浪费是需要释放对应的资源的

系统层面的保活:时效性低,资源释放不及时

默认 linux 系统可以通过 keepalive 机制来进行存活的探测,默认 2 小时如果没有收到数据包,就发送一个心跳数据包给对方,如果返回了 ack 表明连接正常,如果超时未收到响应则一共会重试 9 次间隔 75S 最后没有成功的话就释放长连接资源

应用层面的保活:时效性高,资源释放及时

在应用层面创建定时任务每个指定时间发送一次心跳,如果心跳失败则进行重试还是关闭连接等操作

public class JdIdleStateHandler extends IdleStateHandler { private static final int READER_IDLE_TIME = 15; public JdIdleStateHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接");
        ctx.channel().close();
    }
}

Channel Pipline

服务端接收到请求后,当做一个 inbound 接入的流处理,按照添加的顺序依次调用 ChannelInboundHandler 进行处理

服务端响应请求的时候,当做一个 outbound 流处理,按照添加的顺序,从尾部到头部执行 ChannelOutboundHandler



在 server 端 handler pipline 又分为了 2 个,一个作为 server 处理建立长连接的 pipline,另外一个作为 socket 读写请求的 pipline,添加方式如下

serverBootstrap.group(boss, worker)
  .channel(NioServerSocketChannel.class) // 设置接收缓冲区大小 .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 设置发送缓冲区大小 .childOption(ChannelOption.SO_SNDBUF, 32 * 1023) // 添加处理建立连接的 pipline .handler(new ChannelInitializer<NioServerSocketChannel>() { @Override protected void initChannel(NioServerSocketChannel ch) throws Exception {
      ch.pipeline().addLast(new ServerAcceptHandler());
    }
  }) // 添加处理读写事件的 pipline .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception {
      ChannelPipeline channelPipeline = ch.pipeline();
      channelPipeline.addLast(new JdrpcCodec());
      channelPipeline.addLast(new JdrpcServerHandler());
    }
  });

这里都是我们自定义的 pipline,除此之外系统还会向其中添加默认的 handler,做一些系统默认的操作,最后 pipline 链条如下

ServerBootstrapAcceptor 就是负责选择一个 NioEventLoop 然后将 NioSocketChannel 绑定到其中,后续 NioEventLoop 线程将 NioSocketChannel 注册到对应的 Selector 中就能感知到读写事件了



池化缓冲区

缓冲区的创建和释放都是有一定成本的,如果能进行池化复用将会提升性能

通过 ByteBufAllocator 来创建和使用池化的 ByteBuf

通过 Unpooled 来申请未此话的 ByteBuf

缓冲区的另外一块意义在于减少用户态到内核态切换以及数据拷贝的次数

如下通过创建 1亿次 1MB 空间使用后释放内存,观察他们的GC耗时

注:该案例和测试来自于 Netty 进阶之路-李林锋

(1)内存池模式 gc 32 次,耗时 37.6ms

(2)非内存池模式 gc 3038 次,耗时 2.35S





图解工作原理

![netty工作原理 (1)]



默认一个 NioEventLoop 线程关联一个 selector,服务端创建 ServerSocketChannel 将其注册到 Selector 中,关注 OP_ACCEPT 事件

Server 的事件由其对应的 handler 来处理,例如我们想要处理在建立连接时候的行为,Server 在启动的时候还会默认为 Server 的 Handler Pipline 添加一个 ServerBootstrapAcceptor handler,这样当底层三次握手完成后会先调用 AcceptServerHandler 然后再调用 ServerBootstrapAcceptor

serverBootstrap.group(boss, worker)
  .channel(NioServerSocketChannel.class) // 设置接收缓冲区大小 .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 设置发送缓冲区大小 .childOption(ChannelOption.SO_SNDBUF, 32 * 1023)
  .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new AcceptServerHandler());
    }
  })

ServerBootstrapAcceptor 的作用取出与客户端建立好连接 NioSocketChannel 然后选择一个 NioEventLoop 将其与之绑定,并且注册到其中的 Selector 中,关注读写事件

当读事件就绪后会交由 NioByteUnsafe 进行处理,读取到数据后交由编解码 handler 进行处理,形成一个完整的数据包后交给 JdrpcServerHandler 业务进行处理,这是有按照 Pipline 添加的 inbound 顺序依次调用

当写数据的时候,会将数据压入队列,flush 的时候统一将数据发送给客户端

IO 模型

linux 下的网络模型

阻塞、非阻塞、多路复用、信号驱动、异步

netty 如何指定网络模型

非阻塞 IO:案列中我们默认使用的是 NioEventLoop 和 NioSocketChannel 这 2 种模式是 netty 进行优化后能够运行在个平台上的,底层是基于非阻塞 IO 实现的

多路复用 IO:替换使用 EpollEventLoop 和 EpollSocketChannel 即可切换为 epoll 模式,但是 epoll 模式只在 linux 下面执行,当能够确定 netty 是运行在 linux 中时候采用 epoll 能够获得更高的性能

#Java##Netty##程序员#
全部评论
长知识了,已赞
点赞 回复 分享
发布于 2022-09-06 16:40 江苏

相关推荐

05-14 22:18
已编辑
西南石油大学 前端工程师
菜菜鼠鼠劳动节之后在ssob投简历妄想找一个六月份的实习(学校六月底开始懂得都懂的实训),迫切需要一份像样的offer躲过去,顺带积攒一段实习经历为下半年的实习早做准备,投了百余份简历出去约面的一只手都数的过来经历了两个小厂的水面之后终于接到了一个比较正式的面试通知,于是鼠鼠把八股看过去看过来,把手撕多写了几遍,今下午提前五分钟进入会议等着被拷打,面试官是温柔小姐姐,一问问题直摧鼠鼠内心,感觉是针对简历项目问的问题,再顺便拓展问八股的,有一些压根没去了解过……还好面试官姐姐好,没压力我让我挺了过来面试时长大概35分钟1、自我介绍2、AI助手如何实现的(项目中实现了AI问答功能,最好会做打字流)3、节流和防抖的区别和具体使用场景(项目中是setTimeout手搓的)5、如何实现图文混合上传,怎么实现的多种类型报告对应下载(项目中的)6、如果需要批量上传一组很多图片或者数据该如何设计7、上传了100组数据,上传成功n组,要怎么实现断点记录哪些上传成功了,让用户下一次只需要上传后100-n组8、如果因为某些问题上传一半失败了,用户离开了页面,怎么储存上传成功返回的东西(这里答成粑粑了,感觉是考简历写的Blob分片之类的东西)9、axios写了什么请求拦截,怎么实现的请求去重,为什么想用map结构替换对象实现拦截器的去重(自己给自己挖坑了)10、路由懒加载怎么实现的,图片懒加载怎么实现,自定义指令的IntersectionObserver不兼容怎么办,requestIdleCallback预加载不兼容怎么办,有什么降级方案11、Vue2&nbsp;/&nbsp;Vue3区别,响应式,组合式API,TS支持(感觉还是了解底层原因的好一点)12、webpack和Vite的区别和为什么用Vite13、npm和pnpm的区别和pnpm的优势,为什么pnpm比npm更快14、微前端应用场景15、项目中的TS用在了哪里,泛型和接口的运用,泛型怎么用和约束之类的16、TS的extends,TS的方法等(这里也是答成粑粑了,对TS不是很熟悉)反问总结:面成了粑粑,比较紧张,也是感受到了不足,很多原理没去了解很深,还是很感谢给我这次面试机会了,面试官姐姐也很好,给了一些建议。这些问题其实也很简单了,只能说自己还是太菜了20分钟之后通知一面过了约二面!!!(马萨卡,我也运气来了吗,得去好好熟悉项目和简历了,好好准备一下呜呜呜,AAA沉淀~~)
查看15道真题和解析
点赞 评论 收藏
分享
05-20 22:38
已编辑
南京理工大学 Java
45min提问1.&nbsp;讲一下你实习时候这个项目的大概业务流程2.&nbsp;二级缓存怎么设计的,像二维码的同时核销的并发,在你这个缓存里效果是怎么样的(感觉他没问清楚..)3.&nbsp;提到Jmeter,怎么设计线程组,用了多少个线程组,使用多少个并发请求4.&nbsp;Jmeter是在本地还是服务器部署,讲一下你使用的过程5.&nbsp;Jmeter本地跑一万个线程能跑起来吗(问Jmeter的瓶颈)6.&nbsp;布隆过滤器怎么实现的,原理,效果7.&nbsp;Redis是单机部署还是分布式部署,为什么不用分布式(因为实习的公司只要单机阿..)8.&nbsp;Redis常用数据结构,List和Set的区别,存商品id用哪个9.&nbsp;缓存的过期机制如何设计的,设计随机过期机制的作用10.&nbsp;缓存读写策略,多个服务节点对Redis操作怎么办11.&nbsp;大key或者热点key会导致什么问题,如何解决12.&nbsp;Redis哨兵机制13.&nbsp;数据库锁的类别14.&nbsp;数据库事务的隔离级别,幻读的现象15.&nbsp;优惠券秒杀,如何测试(界面,功能,兼容,安全等..)16.&nbsp;Netty核心组件了解吗,作用分别是什么17.&nbsp;NIO和BIO区别18.&nbsp;Bootstrap/ServerBootstrap的区别19.&nbsp;Netty的长连接和心跳监听20.&nbsp;TCP沾包问题的解决21.&nbsp;拿到一个慢sql,如何分析定位问题22.&nbsp;索引失效的情况,数据的类型和大小影响索引的功能吗23.&nbsp;回表查询24.&nbsp;MySQL日志了解哪些,他们的功能25.&nbsp;介绍bin&nbsp;log26.&nbsp;为什么主从数据同步不用redo&nbsp;log&nbsp;要用bin&nbsp;log手撕:链表相交的结点(10min)用常规的指针遍历(到尾节点就跳到另一条链表)让我回去思考一下再优化时间复杂度反问..感觉一面面了别人两面的量,应该凉了
查看26道真题和解析 面试问题记录
点赞 评论 收藏
分享
评论
1
6
分享

创作者周榜

更多
牛客网
牛客企业服务