爆款云主机2核4G限时秒杀,88元/年起!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      netty系列之:kequeue传输协议详解

      首页 知识中心 其他 文章详情页

      netty系列之:kequeue传输协议详解

      2023-05-12 07:20:42 阅读次数:466

      简介

      在前面的章节中,我们介绍了在netty中可以使用kequeue或者epoll来实现更为高效的native传输方式。那么kequeue和epoll和NIO传输协议有什么不同呢?

      本章将会以kequeue为例进行深入探讨。

      在上面我们介绍的native的例子中,关于kqueue的类有这样几个,分别是KQueueEventLoopGroup,KQueueServerSocketChannel和KQueueSocketChannel,通过简单的替换和添加对应的依赖包,我们可以轻松的将普通的NIO netty服务替换成为native的Kqueue服务。

      是时候揭开Kqueue的秘密了。

      KQueueEventLoopGroup

      eventLoop和eventLoopGroup是用来接受event和事件处理的。先来看下KQueueEventLoopGroup的定义:

      public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup
      

      作为一个MultithreadEventLoopGroup,必须实现一个newChild方法,用来创建child EventLoop。在KQueueEventLoopGroup中,除了构造函数之外,额外需要实现的方法就是newChild:

          protected EventLoop newChild(Executor executor, Object... args) throws Exception {
              Integer maxEvents = (Integer) args[0];
              SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
              RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
              EventLoopTaskQueueFactory taskQueueFactory = null;
              EventLoopTaskQueueFactory tailTaskQueueFactory = null;
      
              int argsLength = args.length;
              if (argsLength > 3) {
                  taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
              }
              if (argsLength > 4) {
                  tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
              }
              return new KQueueEventLoop(this, executor, maxEvents,
                      selectStrategyFactory.newSelectStrategy(),
                      rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
          }
      

      newChild中的所有参数都是从KQueueEventLoopGroup的构造函数中传入的。除了maxEvents,selectStrategyFactory和rejectedExecutionHandler之外,还可以接收taskQueueFactory和tailTaskQueueFactory两个参数,最后把这些参数都传到KQueueEventLoop的构造函数中去,最终返回一个KQueueEventLoop对象。

      另外在使用KQueueEventLoopGroup之前我们还需要确保Kqueue在系统中是可用的,这个判断是通过调用KQueue.ensureAvailability();来实现的。

      KQueue.ensureAvailability首先判断是否定义了系统属性io.netty.transport.noNative,如果定了,说明native transport被禁用了,后续也就没有必要再进行判断了。

      如果io.netty.transport.noNative没有被定义,那么会调用Native.newKQueue()来尝试从native中获取一个kqueue的FileDescriptor,如果上述的获取过程中没有任何异常,则说明kqueue在native方法中存在,我们可以继续使用了。

      以下是判断kqueue是否可用的代码:

          static {
              Throwable cause = null;
              if (SystemPropertyUtil.getBoolean("io.netty.transport.noNative", false)) {
                  cause = new UnsupportedOperationException(
                          "Native transport was explicit disabled with -Dio.netty.transport.noNative=true");
              } else {
                  FileDescriptor kqueueFd = null;
                  try {
                      kqueueFd = Native.newKQueue();
                  } catch (Throwable t) {
                      cause = t;
                  } finally {
                      if (kqueueFd != null) {
                          try {
                              kqueueFd.close();
                          } catch (Exception ignore) {
                              // ignore
                          }
                      }
                  }
              }
              UNAVAILABILITY_CAUSE = cause;
          }
      

      KQueueEventLoop

      KQueueEventLoop是从KQueueEventLoopGroup中创建出来的,用来执行具体的IO任务。

      先来看一下KQueueEventLoop的定义:

      final class KQueueEventLoop extends SingleThreadEventLoop 
      

      不管是NIO还是KQueue或者是Epoll,因为使用了更加高级的IO技术,所以他们使用的EventLoop都是SingleThreadEventLoop,也就是说使用单线程就够了。

      和KQueueEventLoopGroup一样,KQueueEventLoop也需要判断当前的系统环境是否支持kqueue:

          static {
              KQueue.ensureAvailability();
          }
      

      上一节讲到了,KQueueEventLoopGroup会调用KQueueEventLoop的构造函数来返回一个eventLoop对象, 我们先来看下KQueueEventLoop的构造函数:

          KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
                          SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                          EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
              super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                      rejectedExecutionHandler);
              this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
              this.kqueueFd = Native.newKQueue();
              if (maxEvents == 0) {
                  allowGrowing = true;
                  maxEvents = 4096;
              } else {
                  allowGrowing = false;
              }
              this.changeList = new KQueueEventArray(maxEvents);
              this.eventList = new KQueueEventArray(maxEvents);
              int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
              if (result < 0) {
                  cleanup();
                  throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
              }
          }
      

      传入的maxEvents表示的是这个KQueueEventLoop能够接受的最大的event个数。如果maxEvents=0,则表示KQueueEventLoop的event容量可以动态扩展,并且最大值是4096。否则的话,KQueueEventLoop的event容量不能扩展。

      maxEvents是作为数组的大小用来构建changeList和eventList。

      KQueueEventLoop中还定义了一个map叫做channels,用来保存注册的channels:

      private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);
      

      来看一下channel的add和remote方法:

          void add(AbstractKQueueChannel ch) {
              assert inEventLoop();
              AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);
              assert old == null || !old.isOpen();
          }
      
          void remove(AbstractKQueueChannel ch) throws Exception {
              assert inEventLoop();
              int fd = ch.fd().intValue();
              AbstractKQueueChannel old = channels.remove(fd);
              if (old != null && old != ch) {
                  channels.put(fd, old);
                  assert !ch.isOpen();
              } else if (ch.isOpen()) {
                  ch.unregisterFilters();
              }
          }
      

      可以看到添加和删除的都是AbstractKQueueChannel,后面的章节中我们会详细讲解KQueueChannel,这里我们只需要知道channel map中的key是kequeue中特有的FileDescriptor的int值。

      再来看一下EventLoop中最重要的run方法:

         protected void run() {
              for (;;) {
                  try {
                      int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                      switch (strategy) {
                          case SelectStrategy.CONTINUE:
                              continue;
      
                          case SelectStrategy.BUSY_WAIT:
                
                          case SelectStrategy.SELECT:
                              strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
                              if (wakenUp == 1) {
                                  wakeup();
                              }
                          default:
                      }
      
                      final int ioRatio = this.ioRatio;
                      if (ioRatio == 100) {
                          try {
                              if (strategy > 0) {
                                  processReady(strategy);
                              }
                          } finally {
                              runAllTasks();
                          }
                      } else {
                          final long ioStartTime = System.nanoTime();
      
                          try {
                              if (strategy > 0) {
                                  processReady(strategy);
                              }
                          } finally {
                              final long ioTime = System.nanoTime() - ioStartTime;
                              runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                          }
      

      它的逻辑是先使用selectStrategy.calculateStrategy获取当前的select strategy,然后根据strategy的值来判断是否需要执行processReady方法,最后执行runAllTasks,从task queue中拿到要执行的任务去执行。

      selectStrategy.calculateStrategy用来判断当前的select状态,默认情况下有三个状态,分别是:SELECT,CONTINUE,BUSY_WAIT。 这三个状态都是负数:

          int SELECT = -1;
      
          int CONTINUE = -2;
      
          int BUSY_WAIT = -3;
      

      分别表示当前的IO在slect的block状态,或者跳过当前IO的状态,和正在IO loop pull的状态。BUSY_WAIT是一个非阻塞的IO PULL,kqueue并不支持,所以会fallback到SELECT。

      除了这三个状态之外,calculateStrategy还会返回一个正值,表示当前要执行的任务的个数。

      在run方法中,如果strategy的结果是SELECT,那么最终会调用Native.keventWait方法返回当前ready的events个数,并且将ready的event放到KQueueEventArray的eventList中去。

      如果ready的event个数大于零,则会调用processReady方法对这些event进行状态回调处理。

      怎么处理的呢?下面是处理的核心逻辑:

                  AbstractKQueueChannel channel = channels.get(fd);
      
                  AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
      
                  if (filter == Native.EVFILT_WRITE) {
                      unsafe.writeReady();
                  } else if (filter == Native.EVFILT_READ) {
                      unsafe.readReady(eventList.data(i));
                  } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
                      unsafe.readEOF();
                  }
      

      这里的fd是从eventList中读取到的:

      final int fd = eventList.fd(i);
      

      根据eventList的fd,我们可以从channels中拿到对应的KQueueChannel,然后根据event的filter状态来决定KQueueChannel的具体操作,是writeReady,readReady或者readEOF。

      最后就是执行runAllTasks方法了,runAllTasks的逻辑很简单,就是从taskQueue中读取任务然后执行。

      KQueueServerSocketChannel和KQueueSocketChannel

      KQueueServerSocketChannel是用在server端的channel:

      public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel implements ServerSocketChannel {
      

      KQueueServerSocketChannel继承自AbstractKQueueServerChannel,除了构造函数之外,最重要的一个方法就是newChildChannel:

          @Override
          protected Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception {
              return new KQueueSocketChannel(this, new BsdSocket(fd), address(address, offset, len));
          }
      

      这个方法用来创建一个新的child channel。从上面的代码中,我们可以看到生成的child channel是一个KQueueSocketChannel的实例。

      它的构造函数接受三个参数,分别是parent channel,BsdSocket和InetSocketAddress。

          KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) {
              super(parent, fd, remoteAddress);
              config = new KQueueSocketChannelConfig(this);
          }
      

      这里的fd是socket accept acceptedAddress的结果:

      int acceptFd = socket.accept(acceptedAddress);
      

      下面是KQueueSocketChannel的定义:

      public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel {
      

      KQueueSocketChannel和KQueueServerSocketChannel的关系是父子的关系,在KQueueSocketChannel中有一个parent方法,用来返回ServerSocketChannel对象,这也是前面提到的newChildChannel方法中传入KQueueSocketChannel构造函数中的serverChannel:

      public ServerSocketChannel parent() {
              return (ServerSocketChannel) super.parent();
          }
      

      KQueueSocketChannel还有一个特性就是支持tcp fastopen,它的本质是调用BsdSocket的connectx方法,在建立连接的同时传递数据:

      int bytesSent = socket.connectx(
                                      (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
      
      版权声明:本文内容来自第三方投稿或授权转载,原文地址:https://blog.51cto.com/flydean/5318853,作者:程序那些事,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇:深入理解Linux启动过程

      下一篇:Linux常用命令总结

      相关文章

      2024-09-25 10:14:09

      netty系列之:channelHandlerContext详解

      我们知道ChannelHandler有两个非常重要的子接口,分别是ChannelOutboundHandler和ChannelInboundHandler,基本上这两个handler接口定义了所有channel inbound和outbound的处理逻辑。

      2024-09-25 10:14:09
      java
      2023-04-24 11:28:21

      netty系列之:自定义编码解码器

      netty系列之:自定义编码解码器

      2023-04-24 11:28:21
      2023-04-18 14:13:36

      #yyds干货盘点#还有这种好事!netty自带http2的编码解码器framecodec

      简介netty为我们提供了很多http2的封装,让我们可以轻松的搭建出一个支持http2的服务器。其中唯一需要我们自定义的就是http2 handler。在之前的文章中,我们介绍了自定义http2handler继承自Http2Connect

      2023-04-18 14:13:36
      查看更多
      推荐标签

      作者介绍

      天翼云小翼
      天翼云用户

      文章

      33561

      阅读量

      5239542

      查看更多

      最新文章

      netty系列之:channelHandlerContext详解

      2024-09-25 10:14:09

      netty系列之:自定义编码解码器

      2023-04-24 11:28:21

      #yyds干货盘点#还有这种好事!netty自带http2的编码解码器framecodec

      2023-04-18 14:13:36

      查看更多

      热门文章

      #yyds干货盘点#还有这种好事!netty自带http2的编码解码器framecodec

      2023-04-18 14:13:36

      netty系列之:自定义编码解码器

      2023-04-24 11:28:21

      netty系列之:channelHandlerContext详解

      2024-09-25 10:14:09

      查看更多

      热门标签

      linux java python javascript 数组 前端 docker Linux vue 函数 shell git 节点 容器 示例
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      netty系列之:channelHandlerContext详解

      netty系列之:自定义编码解码器

      #yyds干货盘点#还有这种好事!netty自带http2的编码解码器framecodec

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号