爆款云主机2核4G限时秒杀,88元/年起!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      《KAFKA官方文档》设计与实现

      首页 知识中心 服务器 文章详情页

      《KAFKA官方文档》设计与实现

      2023-03-16 07:51:15 阅读次数:163

      5.设计与实现(IMPLEMENTATION)

      5.1 API 设计

      生产者 APIS

      生产者API包含2个producers-kafka.producer.SyncProducer和

      kafka.producer.async.AsyncProducer。示例代码如下:

      class Producer {
      
      /* Sends the data, partitioned by key to the topic using either the */
      /* synchronous or the asynchronous producer */
      /*使用同步或者异步的生产者,发送单条消息至由key所对应的topic的分区*/
      public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);
      
      /* Sends a list of data, partitioned by key to the topic using either */
      /* the synchronous or the asynchronous producer */
      /*使用同步或者异步的生产者,发送一系列数据至由key所对应的topic的分区*/
      public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
      
      /* Closes the producer and cleans up */
      /*关闭生产者并做相应清理*/
      public void close();
      
      }
      

      其目的就是通过一个单一的API向客户端暴露所有的生产者功能。kafka生产者

      • 可以处理多个生产者的排队以及缓冲请求以及异步地分发批量的数据:

        kafka.producer.Producer对于多个生产者的请求数据(producer.type=async),在序列化和分发它们至相应的kafka节点分区之前,其有能力对它们进行批量处理。而批量处理的大小可由少量的配置参数完成。当数据进入至队列,它们将被缓冲在队列里面,直到 queue.time超时或者达到了配置(batch.size)的批量处理的最大值.后台的异步线程(kafka.producer.async.ProducerSendThread)负责将队列里的数据批量取出并让 kafka.producer.EventHandler进行序列化工作,且将数据发送至kafka相应的节点分区。通过设置 event.handler配置参数,即可实现一个自定义的事件处理器( event handler )。不论对于植入自定义日志/跟踪代码,还是自定义监控逻辑,能在生产者队列管道的不同阶段注入回调函数是极其有帮助的。一种可能的方案是通过实现 kafka.producer.async.CallbackHandler接口并且对该类设置 callback.handler配置参数。

      • 通过用户自定义的 Encoder实现对数据的序列化操作:

      interface Encoder<T> {
      
      public Message toMessage(T data);
      
      }
      

      默认的 Encoder``是kafka.serializer.DefaultEncoder“`

      • 通过用户设置(可选)的 Partitioner提供基于软件层面的负载均衡(slb):

        kafka.producer.Partitioner会影响到数据传输时的路由策略。

      interface Partitioner<T> {
      
      int partition(T key, int numPartitions);
      
      }
      

      分区API使用key以及可用节点分区来返回一个分区id。这个id通常用作有序 broker_ids的索引,同时节点分区( partitions )将会用这个id挑选出一个分区去处理生产者的请求。默认的分区策略是是对key进行hash,并对分区数目取余,即 hash(key)%numPartitions。如果key为null,那么将会挑选出一个随机的节点。如果想要实现自定义的分区策略,也可以通过设置 partitioner.class配置参数实现。

      消费者 APIS

      kafka提供两种级别的消费者APIS。对于普通、简单的消费者API,其仅包含对单个节点的连接,且可关闭发送给server网络请求。这个API是完全无状态的,每个网络请求将携带偏移量,用户可以根据自己的选择是否保留这些元数据。

      高级的消费者API不仅隐藏了kafka集群的细节,而且可以消费集群中的任意一台机器而不用关心其背后的网络拓扑。同时,它也保留了消息是否被消费的状态。另外,高级别的消费者API还支持对依据过滤表达式来对订阅的topic进行过滤(譬如白名单或者黑名单等类似的正则表达式)

      普通的 API

      class SimpleConsumer {
      
      /*向节点发出拉取消息的请求,并返回消息的数据集*/
      public ByteBufferMessageSet fetch(FetchRequest request);
      
      /*发送批量拉取消息的请求,返回响应数据集*/
      public MultiFetchResponse multifetch(List<FetchRequest> fetches);
      
      /**
      *在给定时间前返回有效的偏移量(分区容量最大值)数据集,且为降序排列
      *
      * @param time: 毫秒,
      * 如果设置了 OffsetRequest$.MODULE$.LATEST_TIME(),则可以从最新的偏移量获取消息
      * 如果设置了 OffsetRequest$.MODULE$.EARLIEST_TIME(), 则可以从最早的偏移获取消息.
      */
      public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
      }
      

      普通消费者API通常用于实现高级API,以及用于一些离线消费者,这些消费者对于保持状态有特殊的要求

      高级 API

      /*创建一个对kafka集群的连接*/
      ConsumerConnector connector = Consumer.create(consumerConfig);
      
      interface ConsumerConnector {
      
      /**
      * 此方法用于获取KafkaStreams的集合,这个集合是
      * MessageAndMetadata对象的迭代器,通过这个对象你可以获取到与元数据(目前仅指topic)相关联的消息
      * Input: a map of <topic, #streams>
      * Output: a map of <topic, list of message streams>
      */
      public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
      
      /**
      * 你可以获取KafkaStreams的集合, 对符合TopicFilter过滤后topic消息进行迭代(TopicFilter是用标准Java正则表达式封装的topic白名单或黑名单)
      */
      public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);
      
      /* 提交截止至目前所有的已消费的消息的偏移量 */
      public commitOffsets()
      
      /* 关闭连接 */
      public shutdown()
      }
      

      这个API围绕迭代器并由KafkaStream类实现。每个kafkastream表示从一个或多个服务器的一个或多个分区的信息流。每个流用于单线程处理,所以客户端可以在创建调用中提供所需的流数。因此,流可能代表多个服务器分区的合并(对应于处理线程的数量),但每个分区只会流向一个流。

      createMessageStreams方法调用已在某个topic注册的consumer,这将导致消费者/kafka节点分配的再平衡。API鼓励在一个调用中创建多个主题流,以最小化这种重新平衡。createMessageStreamsByFilter方法调用(额外的)注册的watcher去发现新的符合被过滤的topic。注意通过createMessageStreamsByFilter方法返回的每个流可能会迭代多个topic的消息(譬如,过滤器中允许多个topic)

      5.2 网络层

      Kafka网络层是一个相当简单的NIO服务器,这个将不会进行详细的阐述。sendfile的实现是由 MessageSet接口和 writeTo方法完成。这使得备份文件的信息集合,使用更有效的 transferTo实现而不是中间缓冲写。线程模型是一个单线程和用来处理每个固定连接数的N个处理器线程组成。这种设计已经在其他地方进行了充分的测试,并且被公认为是简单和快速的实现。该协议保持相当简洁的形式,以便将来更多其他类型语言的客户端实现。

      5.3 消息

      消息由固定大小的head、可变长度的不透明密钥键字节数组和可变长度的不透明值字节数组组成.消息头包含如下的一些字段:

      – CRC32 用以检测消息的截取和损坏

      – 格式版本

      – 鉴别器的一个属性

      – 时间戳

      使键和值保持不透明是一个正确的确定:现在序列化包有很大的进展,任何特定的选择都不适合所有的使用.更不用说,一个特定的应用程序使用卡夫卡可能会指定一个特定的序列化类型作为其使用的一部分。MessageSet接口仅仅只是一个迭代器,用于迭代方法产生的消息,这个方法对NiO通道进行批量读取和写入。

      5.4 消息格式

      /**
      * 1. 消息的4字节CRC32
      * 2. 一个字节的 identifier ,用以格式的变化,变化的值为0 或者1
      * 3. 一个字节的 identifier属性,允许消息的注释与版本无关
      * 位 0 ~ 2 : 压缩编解码
      * 0 : 无压缩
      * 1 : gzip
      * 2 : snappy
      * 3 : lz4
      * bit 3 : 时间戳类型
      * 0 : 创建时间
      * 1 : 日志追加时间
      * bit 4 ~ 7 : 保留位
      * 4. (可选的) 8字节时间戳只有当“magic”标识符大于0
      * 5. 4字节密钥长度,包含长度k
      * 6. K 字节的 key
      * 7. 4字节有效负载长度,含长度v
      * 8. V字节的有效负载
      */
      

      5.5 日志

      topic名字为”my_topic”的日志有两个分区,并且包含两个目录(也就是 my_topic_0和 my_topic_1),目录里面包含的是该topic的消息数据文件。日志文件的格式是“日志条目”序列;每个日志条目是一个4字节整数n存储的消息长度,其次是N消息字节.每个消息唯一标识是一个64位整数的偏移量,该偏移量由发送到该主题的该分区的所有消息流中的消息的开始的字节位置给出。每个消息的磁盘格式如下。每个日志文件以其包含的第一条消息的偏移量命名。所以第一个创建的文件将是00000000000.kafka,每个附加文件将有一个大致S字节整数的名字,S是指之前在配置中设置的日志文件最大值

      确切的二进制消息格式是版本化的并且是以标准的接口进行维护,因此,当进行容错时,消息集可以在生产者,kafka节点,以及客户端之间进行相互的任意转换,而不用复制和调节。改格式如下:

      磁盘上消息的格式
      
      offset : 8 bytes
      message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
      crc : 4 bytes
      magic value : 1 byte
      attributes : 1 byte
      timestamp : 8 bytes (Only exists when magic value is greater than zero)
      key length : 4 bytes
      key : K bytes
      value length : 4 bytes
      value : V bytes
      

      使用消息偏移作为消息ID是极其少见的。我们最初的想法是使用一个由生产者自带的GUID自增长实现,并且保存从GUID到每一个节点上偏移量的映射。但由于每个消费者必须持有每个server的ID,因此GUID的全局唯一性将无法提供。更重要的是持有随机id至分区偏移量

      的复杂度需要很重的索引结构,而且必须要与磁盘同步,这在本质上就需要一种完全的持久化的随机存取数据结构。因此,为了简化查找结构,我们决定使用一个简单的针对每个分区的原子计数器,它可以用分区ID和节点ID唯一标识消息;这使得查找结构更简单,尽管多每个消费者的多个请求仍然是可能的。然而,一旦我们解决了计数器的问题,跳转至直接使用偏移量似乎变得顺理成章了,毕竟对于任意一个分区而言,计数器是单调唯一增长的。由于偏移量的实现对于消费者是不可见的,因此最终可以采取一种更为高效的方式具体实现( 此处句子读得不太明白 )。

      《KAFKA官方文档》设计与实现

      写(Write)

      日志允许串行追加,通常是追加到最后一个文件,当达到配置的最大值时,该文件会在另一个新的文件上继续追加。日志文件有两个配置参数:m是操作系统将文件刷新到磁盘之前,可写入的消息的数目,s是指强制多少秒执行一次刷新操作。这给出了一个持久性保障,在系统崩溃时,至多丢失M条消息或S秒的数据。

      读(Read)

      消息的读取是依据消息在分区中的64位逻辑偏移量以及S字节的最大读取值完成。这个将返回在S字节缓冲区中的消息迭代器。S值的配置应比任何消息的长度要大,但当消息的长度异常地大时,消息的读取将被重试多次,每一次重试都会扩大缓冲区一倍,直到消息被成功读取。消息的最大值缓冲区的最大值均可设置,如此服务器便可以拒绝处理比它们要大的消息,并提供客户端上要读取完整消息所需的最大值的绑定。已分区消息为结尾去读取缓冲区是有可能的,大小的分割将会被很容易地检测到。

      从偏移量读取消息的实际过程需要首先定位存储数据的日志段文件,从全局偏移量计算文件特定的偏移量,然后从该文件偏移量读取。这是一个简单的对每个文件保持在内存范围内变化的二分查找

      该日志提供了获取最近写入消息的能力,允许客户端“实时”订阅。这在消费者未能消费指定天数消息的场景下尤为有用。在这种情况下,消费者试图去消费一个由OutOfRangeException引起的不存在的偏移量,要么采取重置自身的方式,要么消费失败并作为消费失败的案例(有些许问题)

      如下是发送给消费者的结果的格式

      MessageSetSend (fetch result)
      
      total length : 4 bytes
      error code : 2 bytes
      message 1 : x bytes
      ...
      message n : x bytes
      
      MultiMessageSetSend (multiFetch result)
      
      total length : 4 bytes
      error code : 2 bytes
      messageSetSend 1
      ...
      messageSetSend n
      

      删除(Delete)

      数据删除是一次删除一个日志段。日志管理器允许可插入的删除策略来选择哪些文件适合删除。当前的策略是删除任何超过 N天前修改时间的日志,虽然保留最后N GB的政策也可以是有用的。为了避免读取时锁定,而仍然允许修改段列表的删除,我们使用一个 copy-on-write样式分段列表实现,它提供一致的视图,允许以二分查找的方式读取日志段的不变的静态快照视图,且删除操作也同步进行。

      保证(Guarantees)

      kafka日志提供了一个可配置的参数 M ,该参数可以控制在消息被强制写入到磁盘前消息的最大数量。在kafka服务启动时,一个日志恢复线程会伴随着启动,它迭代所有最新分段日志的消息,并且校验所有消息项的有效性。如果消息的大小和偏移量的总和不超过该文件的长度并且消息有效负载的CRC32匹配消息存储有CRC信息,那么该消息项就是有效的。在检测到崩溃事件时,日志将被截断为最后有效的偏移量。

      请注意,两种类型的崩溃必须处理:由于崩溃导致的写块丢失损坏,以及无意义区块被追加到文件的损坏。这样做的原因是,在一般的操作系统不保证在文件的节点之间的和实际的数据块的写入顺序,因此除了失去已写入的数据,假如节点正在更新值,但在数据写入之前发生崩溃了,那么该文件获取到的都是无意义的数据。CRC就是检测这样的边缘案例,并防止它损坏日志(当然,未写入的消息会丢失)

      版权声明:本文内容来自第三方投稿或授权转载,原文地址:http://ifeve.com/kafka-implementation/,作者:并发编程网,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇: Shell常用服务器日志分析命令总结

      下一篇:《KAFKA官方文档》使用场景

      相关文章

      2023-03-16 08:15:35

      《KAFKA官方文档》使用场景

      消息Kafka被当作传统消息中间件的替代品。消息中间件的使用原因有多种(从数据生产者解耦处理,缓存未处理的消息等)。与大多数消息系统

      2023-03-16 08:15:35
      Apache
      查看更多
      推荐标签

      作者介绍

      separate_out
      天翼云用户

      文章

      16

      阅读量

      5426

      查看更多

      最新文章

      《KAFKA官方文档》使用场景

      2023-03-16 08:15:35

      查看更多

      热门文章

      《KAFKA官方文档》使用场景

      2023-03-16 08:15:35

      查看更多

      热门标签

      服务器 linux 虚拟机 Linux 数据库 运维 网络 日志 数据恢复 java python 配置 nginx centos mysql
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      《KAFKA官方文档》使用场景

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号