Seatunnel 简介
SeaTunnel 是一个简单上手的数据集成框架,具有丰富的连接器。它既可以使用自身引擎进行数据同步,也可以用 Spark 和 Flink 为引擎,所以能很好地进行分布式的海量数据同步。同时Seatunnel支持多表或全库同步,解决了过度JDBC连接的问题; 支持多表或全库日志读取解析,解决了CDC多表同步场景下需要处理日志重复读取解析的问题。
SeaTunnel的同步由Source、Sink以及多个Transform构成,可以满足多种数据处理需求。Source主要从各种数据源获取数据,Sink则负责将数据写入目标数据源。在Source和Sink之间,可以通过多个Transform对数据进行清洗、转换和聚合等操作。
Mysql 到 Hive 的数据同步
环境准备
版本:Flink:1.16.3 , Seatunnel:2.3.4,Hadoop:3.3.3
运行方式:Flink on Yarn
步骤
1. 在mysql数据库中创建需要同步的表
CREATE TABLE `test_to_hive` (
`id` int(10) NOT NULL,
`column1` varchar(100) DEFAULT NULL,
`column2` varchar(100) DEFAULT NULL,
`column3` float DEFAULT NULL,
`column4` double DEFAULT NULL,
`column5` text NULL,
`column6` timestamp NULL,
`column7` date DEFAULT NULL,
`column8` bigint(20) DEFAULT NULL,
`column9` json DEFAULT NULL,
`column10` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2. 在hive数据库中创建同步的表
CREATE TABLE test_db_sink (
id int,
column1 string,
column2 string,
column3 float,
column4 double,
column5 string,
column6 timestamp,
column7 date,
column8 bigint,
column9 string,
column10 string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
3. 通过pipeline方式提交mysql cdc to doris的任务
cd $SEATUNNEL_HOME/conf
vim mysql_to_hive.conf
{
# env配置
env {
execution.parallelism = 1
job.mode = "BATCH"
job.name = "mysql_to_hive"
}
# source配置
source {
JDBC {
url = "jdbc:mysql://192.168.***.***:****?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
driver = "com.mysql.jdbc.Driver"
user = "root"
password = "*********"
query = "select * from test.test_to_hive"
split.size = 5000
fetch_size = 2000
parallelism = 1
result_table_name = "source1"
}
}
# transform配置
transform {
FieldMapper {
source_table_name = "source1"
result_table_name = "source2"
field_mapper = {
id = id
column1 = column1
column2 = column2
column3 = column3
column4 = column4
column5 = column5
column6 = column6
column7 = column7
column8 = column8
column9 = column9
column10 = column10
}
}
}
# sink配置
sink {
Hive {
table_name = "wholedb.test_db_sink"
metastore_uri = "thrift://dw-dev17:****"
hdfs_site_path = "./lib/hive/hdfs-site.xml"
hive_site_path = "./lib/hive/hive-site.xml"
krb5_path = "./lib/hive/krb5.conf"
kerberos_principal = "hive/dw-dev14@datawings.dev.cn"
kerberos_keytab_path = "./lib/hive/hive.keytab"
parallelism = "1"
source_table_name = "source2"
}
}
}
4. 通过pipeline方式提交mysql to hive的任务
cd $SEATUNNEL_HOME
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/mysql_to_hive.conf
5. 数据同步结果
mysql数据
hive数据