爆款云主机2核4G限时秒杀,88元/年起!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      3、kafka重要概念介紹及示例

      首页 知识中心 云计算 文章详情页

      3、kafka重要概念介紹及示例

      2023-06-30 08:27:13 阅读次数:434

      java,kafka,分布式

      本文前提是kafka环境可用。
      本文分为五部分,即概念、幂等与事务、分区的leader和follower、消息可靠机制和限速机制。

      一、概念

      1、Kafka重要概念

      1)、broker

      3、kafka重要概念介紹及示例

      • 一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
      • broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
      • 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能
      2)、zookeeper

      3、kafka重要概念介紹及示例

       

      ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)

      ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。

      3)、 producer(生产者)

      生产者负责将数据推送给broker的topic

      4)、consumer(消费者)

      消费者负责从broker的topic中拉取数据,并自己进行处理

      5)、consumer group(消费者组)

      consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

      3、kafka重要概念介紹及示例

      • consumer group是kafka提供的可扩展且具有容错性的消费者机制
      • 一个消费者组可以包含多个消费者
      • 一个消费者组有一个唯一的ID(group Id)
      • 组内的消费者一起消费主题的所有分区数据
      6)、分区(Partitions)

      3、kafka重要概念介紹及示例

       

      在Kafka集群中,主题被分为多个分区

      7)、副本(Replicas)

      3、kafka重要概念介紹及示例

       

      副本可以确保某个服务器出现故障时,确保数据依然可用

      在Kafka中,一般都会设计副本的个数>1

      8)、主题(Topic)

      3、kafka重要概念介紹及示例

      • 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
      • Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
      • 在主题中的消息是有结构的,一般一个主题包含某一类消息
      • 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)
      9)、偏移量(offset)

      3、kafka重要概念介紹及示例

      • offset记录着下一条将要发送给Consumer的消息的序号
      • 默认Kafka将offset存储在ZooKeeper中
      • 在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
      • 偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的

      2、消费者组验证

      Kafka支持有多个消费者同时消费一个主题中的数据。

      1)、创建topic

      由于每个分区数据只能是一个消费者进行消费,故该topic至少需要2个分区才能验证消费者组。

      kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_consumerGroup --partitions 2 --replication-factor 1
      2)、生产者代码
      // 生产40条数据
      
      import java.util.Properties;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.Future;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      
      
      public class KafkaProducerTest {
          // 只有主题分区超过一个以上的,才会由不同的消费者进行消费,一个分区的数据只能由组内的一个消费者进行消费,本示例的topic有2个分区
          public final static String TOPIC_NAME = "t_consumerGroup";
      
          public static void main(String[] args) {
              Properties props = new Properties();
              props.put("bootstrap.servers", "server1:9092");
              props.put("acks", "all");
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
              KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
              boolean flag = true;
              int count = 0;
              while (flag) {
                  count++;
      
                  for (int i = 0; i < 10; ++i) {
                      try {
                          // 获取返回值Future,该对象封装了返回值
                          Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, i + "", i + ""));
                          // 调用一个Future.get()方法等待响应
                          future.get();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      } catch (ExecutionException e) {
                          e.printStackTrace();
                      }
                  }
      
                  if (count > 3)
                      flag = false;
              }
              // 5. 关闭生产者
              producer.close();
          }
      
      }
      3)、消费者代码
      // 设置消费者组的属性
      props.setProperty("group.id", TOPIC_NAME);
      import java.time.Duration;
      import java.util.Arrays;
      import java.util.Properties;
      
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      
      /**
       * @author alanchan
       *
       */
      public class KafkaConsumerTest {
          public final static String TOPIC_NAME = "t_consumerGroup";
      
          public static void main(String[] args) throws Exception {
              Properties props = new Properties();
              props.setProperty("bootstrap.servers", "server1:9092,server2:9092,server3:9092");
              // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
              // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
              props.setProperty("group.id", TOPIC_NAME);
              // 自动提交offset
              props.setProperty("enable.auto.commit", "true");
              // 自动提交offset的时间间隔
              props.setProperty("auto.commit.interval.ms", "1000");
              // 拉取的key、value数据的
              props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      
              KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
              kafkaConsumer.subscribe(Arrays.asList(TOPIC_NAME));
              while (true) {
                  // Kafka的消费者一次拉取一批的数据
                  ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
                  // 5.将将记录(record)的offset、key、value都打印出来
                  for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                      // 主题
                      String topic = consumerRecord.topic();
                      // offset:这条消息处于Kafka分区中的哪个位置
                      long offset = consumerRecord.offset();
                      // key\value
                      String key = consumerRecord.key();
                      String value = consumerRecord.value();
                      int partition = consumerRecord.partition();
                      System.out.println("topic: " + topic + "  partition:" + partition + " offset:" + offset + " key:" + key + " value:" + value);
                  }
                  Thread.sleep(1000);
              }
          }
      
      }
      4)、验证

      1、启动2个消费者程序

      2、启动生产者程序

      3、查看日志和kafka数据存储情况

      3、kafka重要概念介紹及示例

       

      3、kafka重要概念介紹及示例

       

      消费者程序1运行输出

      topic: t_consumerGroup partition:0 offset:0 key:0 value:0 topic:
      t_consumerGroup partition:0 offset:1 key:2 value:2 topic:
      t_consumerGroup partition:0 offset:2 key:5 value:5 topic:
      t_consumerGroup partition:0 offset:3 key:6 value:6 topic:
      t_consumerGroup partition:0 offset:4 key:0 value:0 topic:
      t_consumerGroup partition:0 offset:5 key:2 value:2 topic:
      t_consumerGroup partition:0 offset:6 key:5 value:5 topic:
      t_consumerGroup partition:0 offset:7 key:6 value:6 topic:
      t_consumerGroup partition:0 offset:8 key:0 value:0 topic:
      t_consumerGroup partition:0 offset:9 key:2 value:2 topic:
      t_consumerGroup partition:0 offset:10 key:5 value:5 topic:
      t_consumerGroup partition:0 offset:11 key:6 value:6 topic:
      t_consumerGroup partition:0 offset:12 key:0 value:0 topic:
      t_consumerGroup partition:0 offset:13 key:2 value:2 topic:
      t_consumerGroup partition:0 offset:14 key:5 value:5 topic:
      t_consumerGroup partition:0 offset:15 key:6 value:6

      消费者程序2运行输出

      topic: t_consumerGroup partition:1 offset:0 key:1 value:1 topic:
      t_consumerGroup partition:1 offset:1 key:3 value:3 topic:
      t_consumerGroup partition:1 offset:2 key:4 value:4 topic:
      t_consumerGroup partition:1 offset:3 key:7 value:7 topic:
      t_consumerGroup partition:1 offset:4 key:8 value:8 topic:
      t_consumerGroup partition:1 offset:5 key:9 value:9 topic:
      t_consumerGroup partition:1 offset:6 key:1 value:1 topic:
      t_consumerGroup partition:1 offset:7 key:3 value:3 topic:
      t_consumerGroup partition:1 offset:8 key:4 value:4 topic:
      t_consumerGroup partition:1 offset:9 key:7 value:7 topic:
      t_consumerGroup partition:1 offset:10 key:8 value:8 topic:
      t_consumerGroup partition:1 offset:11 key:9 value:9 topic:
      t_consumerGroup partition:1 offset:12 key:1 value:1 topic:
      t_consumerGroup partition:1 offset:13 key:3 value:3 topic:
      t_consumerGroup partition:1 offset:14 key:4 value:4 topic:
      t_consumerGroup partition:1 offset:15 key:7 value:7 topic:
      t_consumerGroup partition:1 offset:16 key:8 value:8 topic:
      t_consumerGroup partition:1 offset:17 key:9 value:9 topic:
      t_consumerGroup partition:1 offset:18 key:1 value:1 topic:
      t_consumerGroup partition:1 offset:19 key:3 value:3 topic:
      t_consumerGroup partition:1 offset:20 key:4 value:4 topic:
      t_consumerGroup partition:1 offset:21 key:7 value:7 topic:
      t_consumerGroup partition:1 offset:22 key:8 value:8 topic:
      t_consumerGroup partition:1 offset:23 key:9 value:9

      经过以上数据核对,验证结果符合预期。

      5)、验证多于2个消费者情况

      1、启动3个消费者程序

      2、启动生产者程序

      3、查看日志和kafka数据存储情况

      3、kafka重要概念介紹及示例

       

      3、kafka重要概念介紹及示例

       

      消费者1程序运行结果

      3、kafka重要概念介紹及示例

       

      消费者2程序运行结果

      3、kafka重要概念介紹及示例

       

      消费者3程序运行结果

      3、kafka重要概念介紹及示例

       

      经过以上数据核对,验证结果可以得出结论是每个数据分区只会分配给一个消费者,即便有多个消费者。

      二、Kafka生产者幂等性与事务

      1、Kafka生产者幂等性

      3、kafka重要概念介紹及示例

      1)、配置幂等性
      props.put("enable.idempotence",true);
      2)、幂等性原理

      为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和Sequence Number的概念。

      • PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
      • Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。

      2、Kafka事务

      1)、简介

      Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。

      3、kafka重要概念介紹及示例

      2)、事务操作API

      Producer接口中定义了以下5个事务相关方法:

      • initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
      • beginTransaction(开始事务):启动一个Kafka事务
      • sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
      • commitTransaction(提交事务):提交事务
      • abortTransaction(放弃事务):取消事务
      //producer提供的事务方法
      /**
      * 初始化事务。需要注意的有:
      * 1、前提
      *   需要保证transation.id属性被配置。
      * 2、这个方法执行逻辑是:
      *   (1)Ensures any transactions initiated by previous instances of the producer with the same
       transactional.id are completed. If the previous instance had failed with a transaction in
      *      progress, it will be aborted. If the last transaction had begun completion, but not yet finished, this method awaits its completion.
      *    (2)Gets the internal producer id and epoch, used in all future transactional messages issued by the producer.
      
      */
          public void initTransactions();
       
      /**
       * 开启事务
       */
          public void beginTransaction() throws ProducerFencedException ;
       
      /**
       * 为消费者提供的在事务内提交偏移量的操作
       */
          public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException ;
       
      /**
       * 提交事务
       */
          public void commitTransaction() throws ProducerFencedException;
       
      /**
       * 放弃事务,类似回滚事务的操作
       */
          public void abortTransaction() throws ProducerFencedException ;

      3)、Kafka事务编程

      在一个原子操作中,根据包含的操作类型,可以分为三种情况:

      • 只有Producer生产消息;
      • 消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的“consume-transform-produce ”模式
      • 只有consumer消费消息

      前两种情况是事务引入的场景,最后一种情况没有使用价值(跟使用手动提交效果一样)。

      4)、属性配置说明

      使用kafka的事务api时的一些注意事项

      • 需要消费者的自动模式设置为false,并且不能再手动的执行consumer#commitSync或者consumer#commitAsyc
      • 生产者配置transaction.id属性
      • 生产者不需要再配置enable.idempotence,因为如果配置了transaction.id,则此时enable.idempotence会被设置为true
      •  
      • 消费者需要配置Isolation.level。在consume-trnasform-produce模式下使用事务时,必须设置为READ_COMMITTED。
      • 3、kafka重要概念介紹及示例

      •  
      • 3、kafka重要概念介紹及示例

      •  

      5)、事务相关属性配置

      1、生产者
      // 配置事务的id,开启了事务会默认开启幂等性
      props.put("transactional.id", "first-transactional");
      2、消费者
      // 1. 消费者需要设置隔离级别
      props.put("isolation.level","read_committed");
      //  2. 关闭自动提交
       props.put("enable.auto.commit", "false");

      3、Kafka事务编程

      1)、需求

      在Kafka的topic ods_user 中有一些用户数据,数据格式如下:

      姓名,性别,出生日期
      张三,1,1980-10-09
      李四,0,1985-11-01
      将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic dwd_user 中。
      要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。

      2)、启动生产者控制台程序模拟数据

      # 创建名为ods_user和dwd_user的主题
      kafka-topics.sh --create --bootstrap-server server1:9092 --topic ods_user --partitions 1 --replication-factor 1
      kafka-topics.sh --create --bootstrap-server server1:9092 --topic dwd_user --partitions 1 --replication-factor 1
      # 生产数据到 ods_user
      kafka-console-producer.sh --broker-list server1:9092 --topic ods_user
      # 从dwd_user消费数据
      kafka-console-consumer.sh --bootstrap-server server1:9092 --topic dwd_user --from-beginning  --isolation-level  read_committed

      3)、编写创建消费者代码

      编写一个方法 createConsumer,该方法中返回一个消费者,订阅 ods_user 主题。
      需要配置事务隔离级别、关闭自动提交。
      实现步骤:
      1、创建Kafka消费者配置
      2、创建消费者,并订阅 ods_user 主题
      3、代码

      // 一、创建一个消费者来消费ods_user中的数据
      public Consumer buildConsumer() {
          Properties props = new Properties();
          // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
          props.put("bootstrap.servers", KAFKA_SERVERS);
          // 消费者群组
          props.put("group.id", TOPIC_ODS_USER);
          // 设置隔离级别
          // 1、关闭自动提交 enable.auto.commit
          // 2、isolation.level为read_committed
          props.put("isolation.level", "read_committed");
          // 关闭自动提交
          // 在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
          props.put("enable.auto.commit", "false");
          props.put("session.timeout.ms", "30000");
          props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
          consumer.subscribe(Arrays.asList(TOPIC_ODS_USER));
          return consumer;
      }

      4)、编写创建生产者代码

      编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。
      1、创建生产者配置
      2、创建生产者对象
      3、代码

      // 二、编写createProducer方法,用来创建一个带有事务配置的生产者
          // 1、设置transactional.id
          // 2、设置enable.idempotence
          private Producer buildProducer() {
              // create instance for properties to access producer configs
              Properties props = new Properties();
              // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
              props.put("bootstrap.servers", KAFKA_SERVERS);
              // 设置事务id
              props.put("transactional.id", TOPIC_DWD_USER);
              // 设置幂等性
              // 生产者不需要再配置enable.idempotence,因为如果配置了transaction.id,则此时enable.idempotence会被设置为true
              props.put("enable.idempotence", true);
              // Set acknowledgements for producer requests.
              props.put("acks", "all");
              // If the request fails, the producer can automatically retry,
              props.put("retries", 1);
              // Specify buffer size in
              // config,这里不进行设置这个属性,如果设置了,还需要执行producer.flush()来把缓存中消息发送出去
              // props.put("batch.size", 16384);
              // Reduce the no of requests less than 0
              props.put("linger.ms", 1);
              // The buffer.memory controls the total amount of memory available to the
              // producer for buffering.
              props.put("buffer.memory", 33554432);
              // Kafka消息是以键值对的形式发送,需要设置key和value类型序列化器
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              Producer<String, String> producer = new KafkaProducer<String, String>(props);
              return producer;
          }

      5)、编写代码消费并生产数据

      实现步骤:
      1、调用之前实现的方法,创建消费者、生产者对象
      2、生产者调用initTransactions初始化事务
      3、编写一个while死循环,在while循环中不断拉取数据,将 TOPIC_ODS_USER 队列中user信息中的0/1转换成女/男,并写入 TOPIC_DWD_USER 队列
      (1) 生产者开启事务
      (2) 消费者拉取消息
      (3) 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
      (4) 生产消息到dwd_user topic中
      (5) 提交偏移量到事务中
      (6) 提交事务
      (7) 捕获异常,如果出现异常,则取消事务

      6)、完整代码

      import java.time.Duration;
      import java.util.Arrays;
      import java.util.HashMap;
      import java.util.Map;
      import java.util.Properties;
      import java.util.concurrent.Future;
      
      import org.apache.kafka.clients.consumer.Consumer;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.consumer.OffsetAndMetadata;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.Producer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import org.apache.kafka.common.TopicPartition;
      
      /**
       * 功能: 
       1、消费 ods_user 队列中的数据 
       2、在消费ods_user 队列中的数据同时,完成转换,同时将数据写入 dwd_user 队列中 
       验证:
       1、通过客户端向ods_user中写入业务数据 
       2、查看dwd_user中的数据是否按照要求进行转换
       * 
       * @author alanchan
       *
       */
      public class ConsumeTransferProduceTransactionTest {
          private final static String TOPIC_ODS_USER = "ods_user";
          private final static String TOPIC_DWD_USER = "dwd_user";
          private final static String KAFKA_SERVERS = "server1:9092";
      
          // 一、创建一个消费者来消费ods_user中的数据
          public Consumer buildConsumer() {
              Properties props = new Properties();
              // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
              props.put("bootstrap.servers", KAFKA_SERVERS);
              // 消费者群组
              props.put("group.id", TOPIC_ODS_USER);
              // 设置隔离级别
              // 1、关闭自动提交 enable.auto.commit
              // 2、isolation.level为read_committed
              props.put("isolation.level", "read_committed");
              // 关闭自动提交
              // 在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
              props.put("enable.auto.commit", "false");
              props.put("session.timeout.ms", "30000");
              props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
              consumer.subscribe(Arrays.asList(TOPIC_ODS_USER));
              return consumer;
          }
      
          // 二、编写createProducer方法,用来创建一个带有事务配置的生产者
          // 1、设置transactional.id
          // 2、设置enable.idempotence
          private Producer buildProducer() {
              // create instance for properties to access producer configs
              Properties props = new Properties();
              // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
              props.put("bootstrap.servers", KAFKA_SERVERS);
              // 设置事务id
              props.put("transactional.id", TOPIC_DWD_USER);
              // 设置幂等性
              // 生产者不需要再配置enable.idempotence,因为如果配置了transaction.id,则此时enable.idempotence会被设置为true
              props.put("enable.idempotence", true);
              // Set acknowledgements for producer requests.
              props.put("acks", "all");
              // If the request fails, the producer can automatically retry,
              props.put("retries", 1);
              // Specify buffer size in
              // config,这里不进行设置这个属性,如果设置了,还需要执行producer.flush()来把缓存中消息发送出去
              // props.put("batch.size", 16384);
              // Reduce the no of requests less than 0
              props.put("linger.ms", 1);
              // The buffer.memory controls the total amount of memory available to the
              // producer for buffering.
              props.put("buffer.memory", 33554432);
              // Kafka消息是以键值对的形式发送,需要设置key和value类型序列化器
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              Producer<String, String> producer = new KafkaProducer<String, String>(props);
              return producer;
          }
      
          // 三、将 TOPIC_ODS_USER 队列中user信息中的0/1转换成女/男,并写入 TOPIC_DWD_USER 队列
          //在一个事务内,即有生产消息又有消费消息,即常说的Consume-tansform-produce模式
          public void consumeTransferProduce() {
              // 1.构建生产者
              Producer producer = buildProducer();
              // 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
              producer.initTransactions();
              // 3.构建消费者和订阅主题
              Consumer consumer = buildConsumer();
      
              while (true) {
                  // 4.开启事务
                  producer.beginTransaction();
                  // 5.1 接受消息
                  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                  try {
                      // 5.2 do业务逻辑;
                      Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
      
                      for (ConsumerRecord<String, String> record : records) {
                          // 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records.
                          System.out.printf("接收消息信息:offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                          String msg = trans(record.value());
                          System.out.println("转换后的消息信息:" + msg);
      
                          // 5.2.2 记录提交的偏移量
                          // offset + 1:offset是当前消费的记录(消息)对应在partition中的offset
                          // 必须要+1,解决服务重启后,消息消费不重复,即消费下一条消息
                          commits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
      
                          // 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
                          producer.send(new ProducerRecord<String, String>(TOPIC_DWD_USER, msg));
                          
                      }
      
                      // 7.提交偏移量
                      producer.sendOffsetsToTransaction(commits, TOPIC_ODS_USER);
      //                int i = 1 / 0;
                      // 8.事务提交
                      producer.commitTransaction();
      
                  } catch (Exception e) {
                      e.printStackTrace();
                      // 7.放弃事务
                      producer.abortTransaction();
                  }
              }
          }
      
          public static void main(String[] args) {
              ConsumeTransferProduceTransactionTest ctp = new ConsumeTransferProduceTransactionTest();
              ctp.consumeTransferProduce();
          }
      
          private String trans(String message) {
              // 将1转换为男,0转换为女
              String[] fieldArray = message.split(",");
      
              // 将字段进行替换
              if (fieldArray != null && fieldArray.length > 2) {
                  String sexField = fieldArray[1];
                  if (sexField.equals("1")) {
                      fieldArray[1] = "男";
                  } else if (sexField.equals("0")) {
                      fieldArray[1] = "女";
                  }
              }
      
              // 重新拼接字段
              return fieldArray[0] + "," + fieldArray[1] + "," + fieldArray[2];
          }
      }

      7)、测试功能正常与否

      测试步骤
      1)、创建ods_user、dw_user topic
      2)、启动ConsumeTransferProduceTransactionTest.java
      3)、在kafka客户端创建ods_user消息命令行,并生产消息

      [alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic ods_user
      >张三,1,1980-10-09
      >

      4)、在kafka客户端消费dwd_user消息命令,并查看其转换结果

      [alanchan@server3 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic dwd_user --from-beginning  --isolation-level  read_committed
      张三,男,1980-10-09

      5)、同时查看ConsumeTransferProduceTransactionTest.java运行结果

      接收消息信息:offset = 4, key = null, value = 李四,0,1985-11-01

      转换后的消息信息:张三,男,1980-10-09

      8)、模拟异常测试事务

      测试步骤与上面的基本上一致,只是需要在提交事务前增加个认为异常,进行验证

       

       

      // 7.提交偏移量 producer.sendOffsetsToTransaction(commits, TOPIC_ODS_USER); // int i = 1 / 0; // 8.事务提交 producer.commitTransaction();

       

       

      通过上述验证可以发现,如果该程序运行出现异常,ods_user消息队列的数据不会消费,也不会写入dw_user队列中,一旦该程序运行正常,则ods_user的数据可以正常消费、且会写入dw_user队列中。需要注意的是程序运行异常期间,ods_user队列中未消费数据,在程序运行正常后也会继续消费掉。

      三、分区的leader与follower

      1、Leader和Follower

      在Kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀地分配在每个broker上。正常使用kafka是感觉不到leader、follower的存在的。但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader。

      • Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步
      • 如果leader出现故障,其他follower会被重新选举为leader
      •  
      • follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中
      • 3、kafka重要概念介紹及示例

      •  

      2、查看某个partition的leader

      使用Kafka-eagle查看某个topic的partition的leader在哪个服务器中。为了方便观察,我们创建一个名为test_query_partition的3个分区、2个副本的topic。

      3、kafka重要概念介紹及示例

      3、AR、ISR、OSR

      Kafka中,把follower可以按照不同状态分为三类——AR、ISR、OSR。
      分区的所有副本称为AR(Assigned Replicas——已分配的副本)
      所有与leader副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas——在同步中的副本)
      由于follower副本同步滞后过多的副本(不包括 leader 副本)组成 OSR(Out-of-Sync Replias)
      AR = ISR + OSR
      正常情况下,所有的follower副本都应该与leader副本保持同步,即AR = ISR,OSR集合为空。

      1、查看分区的ISR

      使用Kafka Eagle查看某个Topic的partition的ISR有哪几个节点。

      3、kafka重要概念介紹及示例

       

      2、尝试关闭id为0的broker(杀掉该broker的进程),参看topic的ISR情况。

      3、kafka重要概念介紹及示例

      4、Leader选举

      leader对于消息的写入以及读取是非常关键的,此时有两个疑问:
      1、Kafka如何确定哪个partition是leader、哪个partition是follower呢?
      2、某个leader崩溃了,如何快速确定另外一个leader呢?因为Kafka的吞吐量很高、延迟很低,所以选举leader必须非常快

      1)、如果leader崩溃,Kafka会如何?

      使用Kafka Eagle找到某个partition的leader,再找到leader所在的broker。在Linux中强制杀掉该Kafka的进程,然后观察leader的情况。

      3、kafka重要概念介紹及示例

       

      通过观察,我们发现,leader在崩溃后,Kafka又从其他的follower中快速选举出来了leader。

      2)、Controller介绍

      Kafka启动时,会在所有的broker中选择一个controller

      • 前面leader和follower是针对partition,而controller是针对broker的
      • 创建topic、或者添加分区、修改副本数量之类的管理任务都是由controller完成的
      • Kafka分区leader的选举,也是由controller决定的
      1、Controller的选举

      在Kafka集群启动的时候,每个broker都会尝试去ZooKeeper上注册成为Controller(ZK临时节点)
      但只有一个竞争成功,其他的broker会注册该节点的监视器
      一点该临时节点状态发生变化,就可以进行相应的处理
      Controller也是高可用的,一旦某个broker崩溃,其他的broker会重新注册为Controller

      2、找到当前Kafka集群的controller

      点击Kafka Tools的Tools菜单,找到ZooKeeper Brower… 点击左侧树形结构的controller节点,就可以查看到哪个broker是controller了。

      3、kafka重要概念介紹及示例

      3、测试controller选举

      通过kafka tools找到controller所在的broker对应的kafka进程,杀掉该进程,重新打开ZooKeeper brower,观察kafka是否能够选举出来新的Controller。

      3、kafka重要概念介紹及示例

      4、Controller选举partition leader
      • 所有Partition的leader选举都由controller决定
      • controller会将leader的改变直接通过RPC的方式通知需为此作出响应的Broker
      • controller读取到当前分区的ISR,只要有一个Replica还幸存,就选择其中一个作为leader否则,则任意选这个一个Replica作为leader
      • 如果该partition的所有Replica都已经宕机,则新的leader为-1

      为什么不能通过ZK的方式来选举partition的leader?
      Kafka集群如果业务很多的情况下,会有很多的partition
      假设某个broker宕机,就会出现很多的partiton都需要重新选举leader
      如果使用zookeeper选举leader,会给zookeeper带来巨大的压力。所以,kafka中leader的选举不能使用ZK来实现

      5、leader负载均衡

      1)、Preferred Replica

      Kafka中引入了一个叫做preferred-replica的概念,意思就是:优先的Replica
      在ISR列表中,第一个replica就是preferred-replica
      第一个分区存放的broker,就是preferred-replica
      执行以下脚本可以将preferred-replica设置为leader,均匀分配每个分区的leader。

      kafka-leader-election.sh
      --bootstrap-server server1:9092 
      --topic 主题 
      --partition=1
      --election-type preferred

      2)、确保leader在broker中负载均衡

      杀掉test主题的某个broker,这样kafka会重新分配leader。等到Kafka重新分配leader之后,再次启动kafka进程。观察test主题各个分区leader的分配情况。

      3、kafka重要概念介紹及示例

       

      此时,会造成leader分配是不均匀的,所以可以执行以下脚本来重新分配leader:

      kafka-leader-election.sh
      --bootstrap-server server1:9092 
      --topic test 
      --partition=2
      --election-type preferred
      
      # --partition:指定需要重新分配leader的partition编号

      3、kafka重要概念介紹及示例

      四、消息不丢失机制

      1、broker数据不丢失

      生产者通过分区的leader写入数据后,所有在ISR中follower都会从leader中复制数据,这样可以确保即使leader崩溃了,其他的follower的数据仍然是可用的

      2、生产者数据不丢失

      1、生产者连接leader写入数据时,可以通过ACK机制来确保数据已经成功写入。ACK机制有三个可选配置

      • 配置ACK响应要求为 -1 时 —— 表示所有的节点都收到数据(leader和follower都接收到数据)
      • 配置ACK响应要求为 1 时 —— 表示leader收到数据
      • 配置ACK影响要求为 0 时 —— 生产者只负责发送数据,不关心数据是否丢失(这种情况可能会产生数据丢失,但性能是最好的)

      2、生产者可以采用同步和异步两种方式发送数据

      • 同步:发送一批数据给kafka后,等待kafka返回结果
      • 异步:发送一批数据给kafka,只是提供一个回调函数。
        如果broker迟迟不给ack,而buffer又满了,开发者可以设置是否直接清空buffer中的数据。

      3、消费者数据不丢失
      在消费者消费数据的时候,只要每个消费者记录好offset值即可,就能保证数据不丢失。

      五、Kafka配额限速机制(Quotas)

      生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。通过配额(Quotas)设置可以避免这些问题。
      Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch操作进行流量限制,防止个别业务过载。

      1、限制producer端速率

      为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s,
      命令如下:

      kafka-configs.sh
      --bootstrap-server server1:9092
      --alter 
      --add-config 'producer_byte_rate=1048576' 
      --entity-type clients 
      --entity-default

      运行基准测试,观察生产消息的速率

      kafka-producer-perf-test.sh 
      --topic test
      --num-records 500000 
      --throughput -1 
      --record-size 1000 
      --producer-props
      bootstrap.servers=server1:9092,server2:9092,server3:9092
      acks=1

      结果:50000 records sent, 1108.156028 records/sec (1.06 MB/sec)

      2、限制consumer端速率

      对consumer限速与producer类似,只不过参数名不一样。
      为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。
      命令如下:

      kafka-configs.sh
      --bootstrap-server server1:9092 
      --alter 
      --add-config 'consumer_byte_rate=1048576' 
      --entity-type clients 
      --entity-default

      运行基准测试

      kafka-consumer-perf-test.sh 
      --broker-list server1:9092,server2:9092,server3:9092
      --topic test
      --fetch-size 1048576 
      --messages 500000

      结果为:MB.sec:1.0743

      3、取消Kafka的Quota配置

      使用以下命令,删除Kafka的Quota配置

      kafka-configs.sh 
      --bootstrap-server server1:9092  
      --alter 
      --delete-config 'producer_byte_rate' 
      --entity-type  clients 
      --entity-default
      
      kafka-configs.sh 
      --bootstrap-server server1:9092  
      --alter 
      --delete-config 'consumer_byte_rate' 
      --entity-type  clients 
      --entity-default

      以上,完成了kafka相关重要概念的介绍、示例等内容。

       

      版权声明:本文内容来自第三方投稿或授权转载,原文地址:https://blog.51cto.com/alanchan2win/6280296,作者:一瓢一瓢的饮,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇:云原生入门之K8S工作节点部署

      下一篇:NiFi FileFlow示例和NIFI模板示例

      相关文章

      2025-05-14 10:02:58

      java项目多端数据同步解决方案

      多端数据同步是指在多个设备(例如桌面应用、移动应用、Web应用)之间保持数据的一致性。

      2025-05-14 10:02:58
      java , Spring , WebSocket , 同步 , 数据 , 版本号
      2025-05-14 10:02:58

      java休眠到指定时间怎么写

      java休眠到指定时间怎么写

      2025-05-14 10:02:58
      java , sleep , Thread , util , 方法
      2025-05-13 09:49:12

      Java学习(动态代理的思想详细分析与案例准备)(1)

      Java学习(动态代理的思想详细分析与案例准备)(1)

      2025-05-13 09:49:12
      java , 代理 , 代码 , 对象 , 接口 , 方法 , 需要
      2025-05-09 08:20:32

      基于IDEA的Maven简单工程创建及结构分析

      通过一个 mvn 命令直接让我们创建一个 Maven 的脚手架。

      2025-05-09 08:20:32
      java , Maven , xml , 创建 , 文件 , 文件夹 , 项目
      2025-05-08 09:03:57

      前K个高频元素java

      给定一个非空的整数数组,返回其中出现频率前 前K个高频元素java 高的元素。

      2025-05-08 09:03:57
      java , 元素 , 样例 , 给定
      2025-05-08 09:03:21

      基于java Swing开发的学生成绩管理系统【项目源码+数据库脚本】

      基于java Swing开发的学生成绩管理系统【项目源码+数据库脚本】

      2025-05-08 09:03:21
      java , Swing , 学生 , 源码
      2025-05-08 09:03:21

      java Swing学生成绩管理系统【项目源码+数据库脚本】

      本项目是一套基于java Swing开发的学生成绩管理系统,主要针对计算机相关专业的正在做bishe的学生和需要项目实战练习的Java学习者。

      2025-05-08 09:03:21
      java , 学生 , 成绩 , 数据库 , 源码
      2025-05-07 09:08:08

      java Swing学生选课管理系统【源码+数据库+报告】

      本项目是一套基于java Swing学生选课管理系统,主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。

      2025-05-07 09:08:08
      java , 学生 , 截图 , 源码
      2025-05-07 09:08:08

      java swing人机对战五子棋(含背景音乐)

      本项目是一套基于java swing的人机对战五子棋系统,主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。

      2025-05-07 09:08:08
      java , 源码
      2025-05-06 09:18:49

      【Linux 从基础到进阶】Ceph分布式存储系统搭建

      随着数据量的爆炸式增长,传统的存储解决方案逐渐暴露出扩展性差、成本高、管理复杂等问题。Ceph是一种高性能、可扩展的开源分布式存储系统,能够为对象存储、块存储和文件系统提供统一的存储平台。

      2025-05-06 09:18:49
      分布式 , 存储 , 高可用性
      查看更多
      推荐标签

      作者介绍

      天翼云小翼
      天翼云用户

      文章

      33561

      阅读量

      5230994

      查看更多

      最新文章

      【Linux 从基础到进阶】Ceph分布式存储系统搭建

      2025-05-06 09:18:49

      LDAP基础理论

      2025-04-18 07:10:53

      分布式事务大揭秘:使用MQ实现最终一致性

      2025-04-15 09:19:55

      Java实战之亲戚关系计算器(swing版)(3)——界面设计

      2025-04-09 09:15:47

      Pow(x, n)。实现 pow(x, n) ,即计算 x 的 n 次幂函数(即,x**n)。

      2025-04-01 10:29:12

      分布式存储技术

      2025-03-28 07:42:50

      查看更多

      热门文章

      Docker部署sentinel Mac Docker 部署 sentinel

      2023-05-05 10:12:49

      java159-两个线程共同完成1到100计算

      2023-03-13 09:29:37

      【Java Web】 Tomcat 的 使用、部署

      2023-05-04 09:17:10

      Android移动设备远程接入ZooKeeper分布式集群

      2023-04-18 14:14:56

      用zookeeper实现分布式session

      2023-02-16 08:59:22

      java学习第二天笔记-java基础概念06-计算公交车司机的人数24

      2023-03-16 06:47:13

      查看更多

      热门标签

      系统 测试 用户 分布式 Java java 计算机 docker 代码 数据 服务器 数据库 源码 管理 python
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      用zookeeper实现分布式session

      SpringCloud子项目简介

      ts重点学习-分布式条件类型笔记

      设计与优化淘客返利系统中的分布式缓存架构

      分布式架构下,Session共享有什么方案--------->分布式事务解决方案

      使用springSession完成分布式session

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号