searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

简单上手Seatunnel

2024-07-29 09:58:54
81
0

Seatunnel 简介

SeaTunnel 是一个简单上手的数据集成框架,具有丰富的连接器。它既可以使用自身引擎进行数据同步,也可以用 Spark 和 Flink 为引擎,所以能很好地进行分布式的海量数据同步。同时Seatunnel支持多表或全库同步,解决了过度JDBC连接的问题; 支持多表或全库日志读取解析,解决了CDC多表同步场景下需要处理日志重复读取解析的问题。

SeaTunnel的同步由Source、Sink以及多个Transform构成,可以满足多种数据处理需求。Source主要从各种数据源获取数据,Sink则负责将数据写入目标数据源。在Source和Sink之间,可以通过多个Transform对数据进行清洗、转换和聚合等操作。

Mysql 到 Hive 的数据同步

环境准备

版本:Flink:1.16.3 , Seatunnel:2.3.4,Hadoop:3.3.3

运行方式:Flink on Yarn

步骤

1. 在mysql数据库中创建需要同步的表
CREATE TABLE `test_to_hive` (
  `id` int(10) NOT NULL,
  `column1` varchar(100) DEFAULT NULL,
  `column2` varchar(100) DEFAULT NULL,
  `column3` float DEFAULT NULL,
  `column4` double DEFAULT NULL,
  `column5` text NULL,
  `column6` timestamp NULL,
  `column7` date DEFAULT NULL,
  `column8` bigint(20) DEFAULT NULL,
  `column9` json DEFAULT NULL,
  `column10` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
​
2. 在hive数据库中创建同步的表
CREATE TABLE test_db_sink ( 
    id int,
    column1 string,
    column2 string,
    column3 float,
    column4 double,
    column5 string,
    column6 timestamp,
    column7 date,
    column8 bigint,
    column9 string,
    column10 string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
3. 通过pipeline方式提交mysql cdc to doris的任务
cd $SEATUNNEL_HOME/conf
vim mysql_to_hive.conf
{
  # env配置
  env {
    execution.parallelism = 1
    job.mode = "BATCH"
    job.name = "mysql_to_hive"
  }
​
  # source配置
  source {
    JDBC {
      url = "jdbc:mysql://192.168.***.***:****?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
      driver = "com.mysql.jdbc.Driver"
      user = "root"
      password = "*********"
      query = "select * from test.test_to_hive"
      split.size = 5000
      fetch_size = 2000
      parallelism = 1
      result_table_name = "source1"
    }
  }
​
  # transform配置
  transform {
    FieldMapper {
      source_table_name = "source1"
      result_table_name = "source2"
    field_mapper = {
        id = id  
        column1 = column1
        column2 = column2
        column3 = column3
        column4 = column4
        column5 = column5
        column6 = column6
        column7 = column7
        column8 = column8
        column9 = column9
        column10 = column10
      }
    }
  }
​
  # sink配置
  sink {
     Hive {
      table_name = "wholedb.test_db_sink"
      metastore_uri = "thrift://dw-dev17:****"
      hdfs_site_path = "./lib/hive/hdfs-site.xml"
      hive_site_path = "./lib/hive/hive-site.xml"
      krb5_path = "./lib/hive/krb5.conf"
      kerberos_principal = "hive/dw-dev14@datawings.dev.cn"
      kerberos_keytab_path = "./lib/hive/hive.keytab"
      parallelism = "1"
      source_table_name = "source2"
   }  
}
}
4. 通过pipeline方式提交mysql to hive的任务
cd $SEATUNNEL_HOME
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/mysql_to_hive.conf
5. 数据同步结果

mysql数据
1.jpg
hive数据
2.png

0条评论
0 / 1000
tongrj
2文章数
0粉丝数
tongrj
2 文章 | 0 粉丝
tongrj
2文章数
0粉丝数
tongrj
2 文章 | 0 粉丝
原创

简单上手Seatunnel

2024-07-29 09:58:54
81
0

Seatunnel 简介

SeaTunnel 是一个简单上手的数据集成框架,具有丰富的连接器。它既可以使用自身引擎进行数据同步,也可以用 Spark 和 Flink 为引擎,所以能很好地进行分布式的海量数据同步。同时Seatunnel支持多表或全库同步,解决了过度JDBC连接的问题; 支持多表或全库日志读取解析,解决了CDC多表同步场景下需要处理日志重复读取解析的问题。

SeaTunnel的同步由Source、Sink以及多个Transform构成,可以满足多种数据处理需求。Source主要从各种数据源获取数据,Sink则负责将数据写入目标数据源。在Source和Sink之间,可以通过多个Transform对数据进行清洗、转换和聚合等操作。

Mysql 到 Hive 的数据同步

环境准备

版本:Flink:1.16.3 , Seatunnel:2.3.4,Hadoop:3.3.3

运行方式:Flink on Yarn

步骤

1. 在mysql数据库中创建需要同步的表
CREATE TABLE `test_to_hive` (
  `id` int(10) NOT NULL,
  `column1` varchar(100) DEFAULT NULL,
  `column2` varchar(100) DEFAULT NULL,
  `column3` float DEFAULT NULL,
  `column4` double DEFAULT NULL,
  `column5` text NULL,
  `column6` timestamp NULL,
  `column7` date DEFAULT NULL,
  `column8` bigint(20) DEFAULT NULL,
  `column9` json DEFAULT NULL,
  `column10` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
​
2. 在hive数据库中创建同步的表
CREATE TABLE test_db_sink ( 
    id int,
    column1 string,
    column2 string,
    column3 float,
    column4 double,
    column5 string,
    column6 timestamp,
    column7 date,
    column8 bigint,
    column9 string,
    column10 string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
3. 通过pipeline方式提交mysql cdc to doris的任务
cd $SEATUNNEL_HOME/conf
vim mysql_to_hive.conf
{
  # env配置
  env {
    execution.parallelism = 1
    job.mode = "BATCH"
    job.name = "mysql_to_hive"
  }
​
  # source配置
  source {
    JDBC {
      url = "jdbc:mysql://192.168.***.***:****?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
      driver = "com.mysql.jdbc.Driver"
      user = "root"
      password = "*********"
      query = "select * from test.test_to_hive"
      split.size = 5000
      fetch_size = 2000
      parallelism = 1
      result_table_name = "source1"
    }
  }
​
  # transform配置
  transform {
    FieldMapper {
      source_table_name = "source1"
      result_table_name = "source2"
    field_mapper = {
        id = id  
        column1 = column1
        column2 = column2
        column3 = column3
        column4 = column4
        column5 = column5
        column6 = column6
        column7 = column7
        column8 = column8
        column9 = column9
        column10 = column10
      }
    }
  }
​
  # sink配置
  sink {
     Hive {
      table_name = "wholedb.test_db_sink"
      metastore_uri = "thrift://dw-dev17:****"
      hdfs_site_path = "./lib/hive/hdfs-site.xml"
      hive_site_path = "./lib/hive/hive-site.xml"
      krb5_path = "./lib/hive/krb5.conf"
      kerberos_principal = "hive/dw-dev14@datawings.dev.cn"
      kerberos_keytab_path = "./lib/hive/hive.keytab"
      parallelism = "1"
      source_table_name = "source2"
   }  
}
}
4. 通过pipeline方式提交mysql to hive的任务
cd $SEATUNNEL_HOME
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/mysql_to_hive.conf
5. 数据同步结果

mysql数据
1.jpg
hive数据
2.png

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0