searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Flink+Kafka+Doris实现交易数据实时统计

2023-09-12 06:56:38
494
0

需要组件:

Kafka:Apache软件基金会开发的一个开源流处理平台。

Flink:一款高吞吐、低延迟的分布式流/批处理引擎框架,可以在无边界和有边界数据流中进行有状态的计算。

Doris:一个MPP的OLAP系统,以较低成本提供在大数据集上的高性能分析和报表查询功能。

本文介绍如何通过flink + kafka+doris 实现交易数据实时统计

 

操作步骤

1,通过Mysql 客户端连接Doris,并再Doris创建一张表

CREATE TABLE IF NOT EXISTS order_stat (
  biz VARCHAR(100),
  window_start DATETIME,
  window_end DATETIME,
  total_order_price DOUBLE,
  count BIGINT
)
DISTRIBUTED BY HASH (biz,window_start)
;

2,通过Kafka脚本工具创建Topic,并将数据写入数据至kafka中

创建Topic,  bootstrap-server 是broker服务暴露的服务ip 和port

./kafka-topics.sh --bootstrap-server XXXX:9020 --topic order_topic --create

3, 通过FinkSQL 客户端创建flink 任务,并启动任务

SET 'parallelism.default' = '1'; 
SET 'execution.checkpointing.interval' = '60000'; 

CREATE TABLE order_detail( 
`biz` VARCHAR,
`order_id` VARCHAR,
`price` DOUBLE,
`detail` VARCHAR,
`timestamp` BIGINT,
`time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
WATERMARK FOR `time_ltz` AS `time_ltz` - INTERVAL '5' SECOND
) WITH ( 
'connector'='kafka',
'topic'='order_topic',
'properties.group.id' = 'order_group',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers'='<Kafka Bootstrap Server>',
'format'='json'); 

CREATE TABLE order_stat(
      `biz` VARCHAR,
      `window_start` TIMESTAMP(3),
      `window_end` TIMESTAMP(3),
      `total_order_price` DOUBLE,
      `count` BIGINT,
      PRIMARY KEY (biz, window_start) NOT ENFORCED
    ) WITH (  
      'connector'= 'doris',
      'fenodes'='<Doris FE HTTP Service>',
      'table.identifier'='trade.order_stat',
      'username' = '<Doris User Name>',
      'password' = '<Doris User Password>',
      'sink.label-prefix' = 'doris_label'
);

INSERT INTO order_stat
  SELECT biz, window_start, window_end, SUM(price) AS total_order_price, COUNT(*) AS `count`
  FROM TABLE(TUMBLE(TABLE order_detail, DESCRIPTOR(`time_ltz`), INTERVAL '1' HOUR))
  GROUP BY biz, window_start, window_end;

 

4, 一方面通过kafka 脚本向kafka 写入数据, 另一方通过Mysql客户端连接Doris, 查看统计结果

写入数据到kafka

./kafka-console-producer.sh --bootstrap-server XXXX:9020 --topic order_topic
{"biz":"biz1","order_id":"8b2fae6e-5dbb-40d5-bcba-2eaa990bc28f","price":837,"detail":"order detail","timestamp":1678143011192}
{"biz":"biz1","order_id":"1e6bffff-cb9b-4cf1-b16c-49c012750438","price":637,"detail":"order detail","timestamp":1678144811194}
{"biz":"biz1","order_id":"96248d8c-a58f-49da-b772-ef0eefca7d43","price":825,"detail":"order detail","timestamp":1678146611195}
{"biz":"biz1","order_id":"d31469d7-0f3d-452d-8f20-692df913bea3","price":270,"detail":"order detail","timestamp":1678148411197}
{"biz":"biz1","order_id":"b0149b41-5707-4dbb-9bec-22404521b69f","price":928,"detail":"order detail","timestamp":1678150211199}
{"biz":"biz1","order_id":"0cbc2f81-75b8-4ec4-803d-98a5528073b1","price":477,"detail":"order detail","timestamp":1678152011200}
{"biz":"biz1","order_id":"ca9aeea8-4d29-4b53-8cb5-c89947738d59","price":350,"detail":"order detail","timestamp":1678153811202}
{"biz":"biz1","order_id":"1a8681e6-edea-4205-8d5f-87cd9a144604","price":702,"detail":"order detail","timestamp":1678155611203}
{"biz":"biz1","order_id":"833f9f90-9f2f-486b-a6b8-1eb3378d4e93","price":939,"detail":"order detail","timestamp":1678157411204}
{"biz":"biz1","order_id":"afbd9ad8-748d-4986-80ef-d828d6eebd32","price":306,"detail":"order detail","timestamp":1678159211206}
{"biz":"biz1","order_id":"69df83e4-1ff5-4e68-9b30-29b89a5145e9","price":149,"detail":"order detail","timestamp":1678161011207}
{"biz":"biz1","order_id":"6767c106-2ab9-4bf7-af1d-6a0c48a84a41","price":477,"detail":"order detail","timestamp":1678162811208}
{"biz":"biz1","order_id":"728e4249-10c8-4d8f-a13b-72478fd7d452","price":763,"detail":"order detail","timestamp":1678164611249}
{"biz":"biz1","order_id":"c69a4cc1-89e2-4d5f-9e4f-d8645f11eb22","price":302,"detail":"order detail","timestamp":1678166411251}
{"biz":"biz1","order_id":"53e159aa-c6c2-4980-b253-27b733fe2e8a","price":129,"detail":"order detail","timestamp":1678168211253}
{"biz":"biz1","order_id":"d51cf02b-9ac8-4554-9ccf-77ce8bb00407","price":670,"detail":"order detail","timestamp":1678170011254}
{"biz":"biz1","order_id":"b3f19fa7-c402-404f-af6d-751057579a87","price":342,"detail":"order detail","timestamp":1678171811256}
{"biz":"biz1","order_id":"082fe0d9-38ef-41e2-a412-88f36b78aaa9","price":407,"detail":"order detail","timestamp":1678173611257}

查询统计结果

MySQL [trade]> select * from trade.order_stat;
+------+---------------------+---------------------+-------------------+-------+
| biz  | window_start        | window_end          | total_order_price | count |
+------+---------------------+---------------------+-------------------+-------+
| biz1 | 2023-03-07 14:00:00 | 2023-03-07 15:00:00 |              1012 |     2 |
| biz1 | 2023-03-07 08:00:00 | 2023-03-07 09:00:00 |              2396 |     4 |
| biz1 | 2023-03-07 07:00:00 | 2023-03-07 08:00:00 |              1462 |     2 |
| biz1 | 2023-03-07 06:00:00 | 2023-03-07 07:00:00 |               837 |     1 |
| biz1 | 2023-03-07 13:00:00 | 2023-03-07 14:00:00 |               431 |     2 |
| biz1 | 2023-03-07 12:00:00 | 2023-03-07 13:00:00 |               763 |     1 |
+------+---------------------+---------------------+-------------------+-------+
0条评论
0 / 1000
s****n
4文章数
0粉丝数
s****n
4 文章 | 0 粉丝
原创

Flink+Kafka+Doris实现交易数据实时统计

2023-09-12 06:56:38
494
0

需要组件:

Kafka:Apache软件基金会开发的一个开源流处理平台。

Flink:一款高吞吐、低延迟的分布式流/批处理引擎框架,可以在无边界和有边界数据流中进行有状态的计算。

Doris:一个MPP的OLAP系统,以较低成本提供在大数据集上的高性能分析和报表查询功能。

本文介绍如何通过flink + kafka+doris 实现交易数据实时统计

 

操作步骤

1,通过Mysql 客户端连接Doris,并再Doris创建一张表

CREATE TABLE IF NOT EXISTS order_stat (
  biz VARCHAR(100),
  window_start DATETIME,
  window_end DATETIME,
  total_order_price DOUBLE,
  count BIGINT
)
DISTRIBUTED BY HASH (biz,window_start)
;

2,通过Kafka脚本工具创建Topic,并将数据写入数据至kafka中

创建Topic,  bootstrap-server 是broker服务暴露的服务ip 和port

./kafka-topics.sh --bootstrap-server XXXX:9020 --topic order_topic --create

3, 通过FinkSQL 客户端创建flink 任务,并启动任务

SET 'parallelism.default' = '1'; 
SET 'execution.checkpointing.interval' = '60000'; 

CREATE TABLE order_detail( 
`biz` VARCHAR,
`order_id` VARCHAR,
`price` DOUBLE,
`detail` VARCHAR,
`timestamp` BIGINT,
`time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
WATERMARK FOR `time_ltz` AS `time_ltz` - INTERVAL '5' SECOND
) WITH ( 
'connector'='kafka',
'topic'='order_topic',
'properties.group.id' = 'order_group',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers'='<Kafka Bootstrap Server>',
'format'='json'); 

CREATE TABLE order_stat(
      `biz` VARCHAR,
      `window_start` TIMESTAMP(3),
      `window_end` TIMESTAMP(3),
      `total_order_price` DOUBLE,
      `count` BIGINT,
      PRIMARY KEY (biz, window_start) NOT ENFORCED
    ) WITH (  
      'connector'= 'doris',
      'fenodes'='<Doris FE HTTP Service>',
      'table.identifier'='trade.order_stat',
      'username' = '<Doris User Name>',
      'password' = '<Doris User Password>',
      'sink.label-prefix' = 'doris_label'
);

INSERT INTO order_stat
  SELECT biz, window_start, window_end, SUM(price) AS total_order_price, COUNT(*) AS `count`
  FROM TABLE(TUMBLE(TABLE order_detail, DESCRIPTOR(`time_ltz`), INTERVAL '1' HOUR))
  GROUP BY biz, window_start, window_end;

 

4, 一方面通过kafka 脚本向kafka 写入数据, 另一方通过Mysql客户端连接Doris, 查看统计结果

写入数据到kafka

./kafka-console-producer.sh --bootstrap-server XXXX:9020 --topic order_topic
{"biz":"biz1","order_id":"8b2fae6e-5dbb-40d5-bcba-2eaa990bc28f","price":837,"detail":"order detail","timestamp":1678143011192}
{"biz":"biz1","order_id":"1e6bffff-cb9b-4cf1-b16c-49c012750438","price":637,"detail":"order detail","timestamp":1678144811194}
{"biz":"biz1","order_id":"96248d8c-a58f-49da-b772-ef0eefca7d43","price":825,"detail":"order detail","timestamp":1678146611195}
{"biz":"biz1","order_id":"d31469d7-0f3d-452d-8f20-692df913bea3","price":270,"detail":"order detail","timestamp":1678148411197}
{"biz":"biz1","order_id":"b0149b41-5707-4dbb-9bec-22404521b69f","price":928,"detail":"order detail","timestamp":1678150211199}
{"biz":"biz1","order_id":"0cbc2f81-75b8-4ec4-803d-98a5528073b1","price":477,"detail":"order detail","timestamp":1678152011200}
{"biz":"biz1","order_id":"ca9aeea8-4d29-4b53-8cb5-c89947738d59","price":350,"detail":"order detail","timestamp":1678153811202}
{"biz":"biz1","order_id":"1a8681e6-edea-4205-8d5f-87cd9a144604","price":702,"detail":"order detail","timestamp":1678155611203}
{"biz":"biz1","order_id":"833f9f90-9f2f-486b-a6b8-1eb3378d4e93","price":939,"detail":"order detail","timestamp":1678157411204}
{"biz":"biz1","order_id":"afbd9ad8-748d-4986-80ef-d828d6eebd32","price":306,"detail":"order detail","timestamp":1678159211206}
{"biz":"biz1","order_id":"69df83e4-1ff5-4e68-9b30-29b89a5145e9","price":149,"detail":"order detail","timestamp":1678161011207}
{"biz":"biz1","order_id":"6767c106-2ab9-4bf7-af1d-6a0c48a84a41","price":477,"detail":"order detail","timestamp":1678162811208}
{"biz":"biz1","order_id":"728e4249-10c8-4d8f-a13b-72478fd7d452","price":763,"detail":"order detail","timestamp":1678164611249}
{"biz":"biz1","order_id":"c69a4cc1-89e2-4d5f-9e4f-d8645f11eb22","price":302,"detail":"order detail","timestamp":1678166411251}
{"biz":"biz1","order_id":"53e159aa-c6c2-4980-b253-27b733fe2e8a","price":129,"detail":"order detail","timestamp":1678168211253}
{"biz":"biz1","order_id":"d51cf02b-9ac8-4554-9ccf-77ce8bb00407","price":670,"detail":"order detail","timestamp":1678170011254}
{"biz":"biz1","order_id":"b3f19fa7-c402-404f-af6d-751057579a87","price":342,"detail":"order detail","timestamp":1678171811256}
{"biz":"biz1","order_id":"082fe0d9-38ef-41e2-a412-88f36b78aaa9","price":407,"detail":"order detail","timestamp":1678173611257}

查询统计结果

MySQL [trade]> select * from trade.order_stat;
+------+---------------------+---------------------+-------------------+-------+
| biz  | window_start        | window_end          | total_order_price | count |
+------+---------------------+---------------------+-------------------+-------+
| biz1 | 2023-03-07 14:00:00 | 2023-03-07 15:00:00 |              1012 |     2 |
| biz1 | 2023-03-07 08:00:00 | 2023-03-07 09:00:00 |              2396 |     4 |
| biz1 | 2023-03-07 07:00:00 | 2023-03-07 08:00:00 |              1462 |     2 |
| biz1 | 2023-03-07 06:00:00 | 2023-03-07 07:00:00 |               837 |     1 |
| biz1 | 2023-03-07 13:00:00 | 2023-03-07 14:00:00 |               431 |     2 |
| biz1 | 2023-03-07 12:00:00 | 2023-03-07 13:00:00 |               763 |     1 |
+------+---------------------+---------------------+-------------------+-------+
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0