需要组件:
Kafka:Apache软件基金会开发的一个开源流处理平台。
Flink:一款高吞吐、低延迟的分布式流/批处理引擎框架,可以在无边界和有边界数据流中进行有状态的计算。
Doris:一个MPP的OLAP系统,以较低成本提供在大数据集上的高性能分析和报表查询功能。
本文介绍如何通过flink + kafka+doris 实现交易数据实时统计
操作步骤
1,通过Mysql 客户端连接Doris,并再Doris创建一张表
CREATE TABLE IF NOT EXISTS order_stat (
biz VARCHAR(100),
window_start DATETIME,
window_end DATETIME,
total_order_price DOUBLE,
count BIGINT
)
DISTRIBUTED BY HASH (biz,window_start)
;
2,通过Kafka脚本工具创建Topic,并将数据写入数据至kafka中
创建Topic, bootstrap-server 是broker服务暴露的服务ip 和port
./kafka-topics.sh --bootstrap-server XXXX:9020 --topic order_topic --create
3, 通过FinkSQL 客户端创建flink 任务,并启动任务
SET 'parallelism.default' = '1';
SET 'execution.checkpointing.interval' = '60000';
CREATE TABLE order_detail(
`biz` VARCHAR,
`order_id` VARCHAR,
`price` DOUBLE,
`detail` VARCHAR,
`timestamp` BIGINT,
`time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
WATERMARK FOR `time_ltz` AS `time_ltz` - INTERVAL '5' SECOND
) WITH (
'connector'='kafka',
'topic'='order_topic',
'properties.group.id' = 'order_group',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers'='<Kafka Bootstrap Server>',
'format'='json');
CREATE TABLE order_stat(
`biz` VARCHAR,
`window_start` TIMESTAMP(3),
`window_end` TIMESTAMP(3),
`total_order_price` DOUBLE,
`count` BIGINT,
PRIMARY KEY (biz, window_start) NOT ENFORCED
) WITH (
'connector'= 'doris',
'fenodes'='<Doris FE HTTP Service>',
'table.identifier'='trade.order_stat',
'username' = '<Doris User Name>',
'password' = '<Doris User Password>',
'sink.label-prefix' = 'doris_label'
);
INSERT INTO order_stat
SELECT biz, window_start, window_end, SUM(price) AS total_order_price, COUNT(*) AS `count`
FROM TABLE(TUMBLE(TABLE order_detail, DESCRIPTOR(`time_ltz`), INTERVAL '1' HOUR))
GROUP BY biz, window_start, window_end;
4, 一方面通过kafka 脚本向kafka 写入数据, 另一方通过Mysql客户端连接Doris, 查看统计结果
写入数据到kafka
./kafka-console-producer.sh --bootstrap-server XXXX:9020 --topic order_topic
{"biz":"biz1","order_id":"8b2fae6e-5dbb-40d5-bcba-2eaa990bc28f","price":837,"detail":"order detail","timestamp":1678143011192}
{"biz":"biz1","order_id":"1e6bffff-cb9b-4cf1-b16c-49c012750438","price":637,"detail":"order detail","timestamp":1678144811194}
{"biz":"biz1","order_id":"96248d8c-a58f-49da-b772-ef0eefca7d43","price":825,"detail":"order detail","timestamp":1678146611195}
{"biz":"biz1","order_id":"d31469d7-0f3d-452d-8f20-692df913bea3","price":270,"detail":"order detail","timestamp":1678148411197}
{"biz":"biz1","order_id":"b0149b41-5707-4dbb-9bec-22404521b69f","price":928,"detail":"order detail","timestamp":1678150211199}
{"biz":"biz1","order_id":"0cbc2f81-75b8-4ec4-803d-98a5528073b1","price":477,"detail":"order detail","timestamp":1678152011200}
{"biz":"biz1","order_id":"ca9aeea8-4d29-4b53-8cb5-c89947738d59","price":350,"detail":"order detail","timestamp":1678153811202}
{"biz":"biz1","order_id":"1a8681e6-edea-4205-8d5f-87cd9a144604","price":702,"detail":"order detail","timestamp":1678155611203}
{"biz":"biz1","order_id":"833f9f90-9f2f-486b-a6b8-1eb3378d4e93","price":939,"detail":"order detail","timestamp":1678157411204}
{"biz":"biz1","order_id":"afbd9ad8-748d-4986-80ef-d828d6eebd32","price":306,"detail":"order detail","timestamp":1678159211206}
{"biz":"biz1","order_id":"69df83e4-1ff5-4e68-9b30-29b89a5145e9","price":149,"detail":"order detail","timestamp":1678161011207}
{"biz":"biz1","order_id":"6767c106-2ab9-4bf7-af1d-6a0c48a84a41","price":477,"detail":"order detail","timestamp":1678162811208}
{"biz":"biz1","order_id":"728e4249-10c8-4d8f-a13b-72478fd7d452","price":763,"detail":"order detail","timestamp":1678164611249}
{"biz":"biz1","order_id":"c69a4cc1-89e2-4d5f-9e4f-d8645f11eb22","price":302,"detail":"order detail","timestamp":1678166411251}
{"biz":"biz1","order_id":"53e159aa-c6c2-4980-b253-27b733fe2e8a","price":129,"detail":"order detail","timestamp":1678168211253}
{"biz":"biz1","order_id":"d51cf02b-9ac8-4554-9ccf-77ce8bb00407","price":670,"detail":"order detail","timestamp":1678170011254}
{"biz":"biz1","order_id":"b3f19fa7-c402-404f-af6d-751057579a87","price":342,"detail":"order detail","timestamp":1678171811256}
{"biz":"biz1","order_id":"082fe0d9-38ef-41e2-a412-88f36b78aaa9","price":407,"detail":"order detail","timestamp":1678173611257}
查询统计结果
MySQL [trade]> select * from trade.order_stat;
+------+---------------------+---------------------+-------------------+-------+
| biz | window_start | window_end | total_order_price | count |
+------+---------------------+---------------------+-------------------+-------+
| biz1 | 2023-03-07 14:00:00 | 2023-03-07 15:00:00 | 1012 | 2 |
| biz1 | 2023-03-07 08:00:00 | 2023-03-07 09:00:00 | 2396 | 4 |
| biz1 | 2023-03-07 07:00:00 | 2023-03-07 08:00:00 | 1462 | 2 |
| biz1 | 2023-03-07 06:00:00 | 2023-03-07 07:00:00 | 837 | 1 |
| biz1 | 2023-03-07 13:00:00 | 2023-03-07 14:00:00 | 431 | 2 |
| biz1 | 2023-03-07 12:00:00 | 2023-03-07 13:00:00 | 763 | 1 |
+------+---------------------+---------------------+-------------------+-------+