Flink CDC简介
Flink CDC 是一种实时数据集成框架,底层引擎主要使用Flink,基于数据库日志 CDC(Change Data Capture)技术,Flink CDC 可以高效实现海量数据的实时集成。用户可以通过编写简单的yaml文件提交任务,其本身也支持自定义实现各种 source 和 sink来满足不同的业务需求。相比于其他的同步工具,flinkcdc具有完整的数据同步,它可以全量读取结束自动同步增量数据,并且上游表结构变更自动应用到下游。其部署模式也非常多样,它可以以standalone的模式单独起集群,也可以与主流的yarn进行结合。接下来将展示如何基于 Flink CDC 快速构建 MySQL 到 Doris 的 Streaming ELT 作业。
Mysql 到 Doris的实时数据同步
环境准备
版本:Flink:1.16.3 , Flink-cdc:3.0.0,Hadoop:3.3.3
运行方式:Flink on Yarn(yarn-session方式)
步骤
1. 在flink所安装的服务器上启动yarn-session
cd $FLINK_HOME/bin
./yarn-session.sh -d
2. 在mysql数据库中创建数据库以及需要同步的表
CREATE DATACBASE test;
CREATE TABLE `test`.`test_db` (
`id` int(10) NOT NULL,
`Column1` varchar(100) DEFAULT NULL,
`Column2` varchar(100) DEFAULT NULL,
`Column3` float DEFAULT NULL,
`Column4` double DEFAULT NULL,
`Column5` text,
`Column6` timestamp NULL DEFAULT NULL,
`Column7` datetime DEFAULT NULL,
`Column8` bigint(20) DEFAULT NULL,
`Column9` json DEFAULT NULL,
`Column10` blob,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
使用flinkcdc进行mysql to doris数据同步时,会自动建对应的 sink表,无需提前建表,只需要在doris中建好同名库
3. 编写pipeline文件
cd $FLINKCDC_HOME/conf
vim mysql_to_doris.yaml
source:
type: mysql
hostname: 192.168.***.***
port: 50001
username: root
password: ******
server-id: 5400-5404
tables: test.test_db
sink:
type: doris
fenodes: 192.168.***.***:9033
username: root
password: ******
table.create.properties.replication_num: 1
pipeline:
name: mysql-sync-doris
parallelism: 1
4. 通过pipeline方式提交mysql cdc to doris的任务
切换到flinkcdc的目录:
cd $FLINKCDC_HOME
执行提交pipeline的脚本:
./bin/flink-cdc.sh conf/mysql_to_doris.yaml
5. 查看任务运行状态
打开yarn的web UI,可以看到任务的运行状态
打开flink的web UI,可以查看任务运行的每个步骤
6. 数据同步结果
存量数据同步结果同步
mysql source表数据
doris sink表数据
增量数据同步结果
对source端mysql数据源进行增量插入
doris sink表数据