searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Hudi Clustering 功能介绍

2023-07-24 05:44:58
7
0

一、功能实现

 

1、整体功能介绍

不会影响现有的数据读写流程,在数据通过INSERT\BULK等方式写入后整理写入的文件,可以即时、也可以制定定时策略进行数据整理,将小文件合并成较大文件,同时会根据指定的列进行数据排序后再存储,这样让数据更加聚合,增加读取性能。

 

2、COW Table实现流程

(1)基于TimeLine在t5时刻,表中的分区具有5个文件组f0、f1、f2、f3、f4。假设每个文件组都是100MB。所以分区中的总数据是500MB。

(2)在t6时刻请求聚类操作。与compaction的流程,使用“ClusteringPlan”在元数据中创建一个“t6.clustering.requested”文件,该文件包括所有分区中集群操作所涉及的所有文件组。

示例内容:{ partitionPath: {“datestr”}, oldfileGroups: [ {fileId: “f0”, time: “t0”}, { fileId: “f1”, time: “t1”}, ... ], newFileGroups: [“c1”, “c2”]  }

(3)假设集群后的最大文件大小配置为250MB。集群将把分区中的所有数据重新分配到两个文件组中:c1,c2。这些文件组暂时是“幻影”,在t8集群完成之前查询不可见。另外,一个文件组中的记录可以拆分为多个文件组。在本例中,f4文件组中的一些记录同时进入两个新的文件组c1、c2。

(4)当集群正在进行时(t6到t8),任何涉及相关文件组的写入操作都是被拒绝的。

(5)在写入新的数据文件c1-t6.parquet和c2-t6.parque之后,如果配置了全局索引,我们将在记录级别索引中为具有新位置的所有键添加条目。新的索引项对于其他写入将不可见,因为还没有关联的提交。

(6)最后,我们创建一个提交元数据文件“t6.commit”,其中包括通过该提交修改的文件组(f0、f1、f2、f3、f4)。

(7)文件组(f0到f4)不会立即从磁盘中删除。Cleaner会在存档t6.commit之前清理这些文件及更新了所有视图,以忽略所有提交元数据文件中提到的所有文件组。所以不会看到重复的数据。

 

2、MOR Table实现流程

整个流程和COW table类似,对于MOR table, 数据写入会影响了parquet数据文件和日志文件. Clustering影响的是parquet数据文件。

 

二、制定Clustering计划

1、确定符合Clustering条件的文件

分区过滤出指定的分区

根据targetFileSize 排除不需要Clustering的文件组

排除已经计划在clustering或者compact的文件组

排除包含日志文件的文件组

 

2、对文件进行重新分组

根据特定条件对符合群集条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数根据记录关键字范围对文件进行分组。键值可以被存储在parquet文件的footer中,利于一些特定的查询与更新。

分组都是基于统一的commit time

排序基于的规则可以指定key的线性排序,或者是选择多维度更好的算法如z-order与hilbert

 

3、保存计划

clustering 计划被保存在TimeLine上,元数据结构

hudi/hudi-common/src/main/avro/HoodieClusteringPlan.avsc at master · apache/hudi · GitHub

 

三、执行计划案例

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._

 

val tableName = "hudi_trips_cow"

val basePath = "/tmp/hudi_trips_cow"

 

val dataGen = new DataGenerator(Array("2020/03/11"))

val updates = convertToStringList(dataGen.generateInserts(10))

val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));

df.write.format("org.apache.hudi").

        options(getQuickstartWriteConfigs).

          option(PRECOMBINE_FIELD_OPT_KEY, "ts").

          option(RECORDKEY_FIELD_OPT_KEY, "uuid").

          option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

         option(TABLE_NAME, tableName).

          option("hoodie.parquet.small.file.limit", "0").

          option("hoodie.clustering.inline", "true").

         option("hoodie.clustering.inline.max.commits", "4").

          option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").

          option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").

          option("hoodie.clustering.plan.strategy.sort.columns", ""). //optional, if sorting is needed as part of rewriting data

          mode(Append).

           save(basePath);

 

0条评论
0 / 1000
x****m
2文章数
0粉丝数
x****m
2 文章 | 0 粉丝