searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

基于FlinkCDC的实时数据同步

2024-05-31 05:34:40
41
0

Flink CDC简介

Flink CDC 是一种实时数据集成框架,底层引擎主要使用Flink,基于数据库日志 CDC(Change Data Capture)技术,Flink CDC 可以高效实现海量数据的实时集成。用户可以通过编写简单的yaml文件提交任务,其本身也支持自定义实现各种 source 和 sink来满足不同的业务需求。相比于其他的同步工具,flinkcdc具有完整的数据同步,它可以全量读取结束自动同步增量数据,并且上游表结构变更自动应用到下游。其部署模式也非常多样,它可以以standalone的模式单独起集群,也可以与主流的yarn进行结合。接下来将展示如何基于 Flink CDC 快速构建 MySQL 到 Doris 的 Streaming ELT 作业。

Mysql 到 Doris的实时数据同步

环境准备

版本:Flink:1.16.3 , Flink-cdc:3.0.0,Hadoop:3.3.3

运行方式:Flink on Yarn(yarn-session方式)

步骤

1. 在flink所安装的服务器上启动yarn-session

cd $FLINK_HOME/bin
./yarn-session.sh -d

2. 在mysql数据库中创建数据库以及需要同步的表

CREATE DATACBASE test;
CREATE TABLE `test`.`test_db` (
  `id` int(10) NOT NULL,
  `Column1` varchar(100) DEFAULT NULL,
  `Column2` varchar(100) DEFAULT NULL,
  `Column3` float DEFAULT NULL,
  `Column4` double DEFAULT NULL,
  `Column5` text,
  `Column6` timestamp NULL DEFAULT NULL,
  `Column7` datetime DEFAULT NULL,
  `Column8` bigint(20) DEFAULT NULL,
  `Column9` json DEFAULT NULL,
  `Column10` blob,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

 使用flinkcdc进行mysql to doris数据同步时,会自动建对应的 sink表,无需提前建表,只需要在doris中建好同名库

3. 编写pipeline文件

cd $FLINKCDC_HOME/conf
vim mysql_to_doris.yaml
source:
  type: mysql
  hostname: 192.168.***.***
  port: 50001
  username: root
  password: ******
  server-id: 5400-5404
  tables: test.test_db
sink:
  type: doris
  fenodes: 192.168.***.***:9033
  username: root
  password: ******
  table.create.properties.replication_num: 1
  
pipeline:
  name: mysql-sync-doris
  parallelism: 1

4.  通过pipeline方式提交mysql cdc to doris的任务

切换到flinkcdc的目录:

cd $FLINKCDC_HOME

执行提交pipeline的脚本:

./bin/flink-cdc.sh conf/mysql_to_doris.yaml

5. 查看任务运行状态

打开yarn的web UI,可以看到任务的运行状态

打开flink的web UI,可以查看任务运行的每个步骤

6. 数据同步结果

存量数据同步结果同步

mysql source表数据

doris sink表数据

增量数据同步结果

对source端mysql数据源进行增量插入

doris sink表数据

0条评论
0 / 1000
tongrj
2文章数
0粉丝数
tongrj
2 文章 | 0 粉丝
tongrj
2文章数
0粉丝数
tongrj
2 文章 | 0 粉丝
原创

基于FlinkCDC的实时数据同步

2024-05-31 05:34:40
41
0

Flink CDC简介

Flink CDC 是一种实时数据集成框架,底层引擎主要使用Flink,基于数据库日志 CDC(Change Data Capture)技术,Flink CDC 可以高效实现海量数据的实时集成。用户可以通过编写简单的yaml文件提交任务,其本身也支持自定义实现各种 source 和 sink来满足不同的业务需求。相比于其他的同步工具,flinkcdc具有完整的数据同步,它可以全量读取结束自动同步增量数据,并且上游表结构变更自动应用到下游。其部署模式也非常多样,它可以以standalone的模式单独起集群,也可以与主流的yarn进行结合。接下来将展示如何基于 Flink CDC 快速构建 MySQL 到 Doris 的 Streaming ELT 作业。

Mysql 到 Doris的实时数据同步

环境准备

版本:Flink:1.16.3 , Flink-cdc:3.0.0,Hadoop:3.3.3

运行方式:Flink on Yarn(yarn-session方式)

步骤

1. 在flink所安装的服务器上启动yarn-session

cd $FLINK_HOME/bin
./yarn-session.sh -d

2. 在mysql数据库中创建数据库以及需要同步的表

CREATE DATACBASE test;
CREATE TABLE `test`.`test_db` (
  `id` int(10) NOT NULL,
  `Column1` varchar(100) DEFAULT NULL,
  `Column2` varchar(100) DEFAULT NULL,
  `Column3` float DEFAULT NULL,
  `Column4` double DEFAULT NULL,
  `Column5` text,
  `Column6` timestamp NULL DEFAULT NULL,
  `Column7` datetime DEFAULT NULL,
  `Column8` bigint(20) DEFAULT NULL,
  `Column9` json DEFAULT NULL,
  `Column10` blob,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

 使用flinkcdc进行mysql to doris数据同步时,会自动建对应的 sink表,无需提前建表,只需要在doris中建好同名库

3. 编写pipeline文件

cd $FLINKCDC_HOME/conf
vim mysql_to_doris.yaml
source:
  type: mysql
  hostname: 192.168.***.***
  port: 50001
  username: root
  password: ******
  server-id: 5400-5404
  tables: test.test_db
sink:
  type: doris
  fenodes: 192.168.***.***:9033
  username: root
  password: ******
  table.create.properties.replication_num: 1
  
pipeline:
  name: mysql-sync-doris
  parallelism: 1

4.  通过pipeline方式提交mysql cdc to doris的任务

切换到flinkcdc的目录:

cd $FLINKCDC_HOME

执行提交pipeline的脚本:

./bin/flink-cdc.sh conf/mysql_to_doris.yaml

5. 查看任务运行状态

打开yarn的web UI,可以看到任务的运行状态

打开flink的web UI,可以查看任务运行的每个步骤

6. 数据同步结果

存量数据同步结果同步

mysql source表数据

doris sink表数据

增量数据同步结果

对source端mysql数据源进行增量插入

doris sink表数据

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0