按场景划分
数据源 |
导入方式 |
对象存储(s3),HDFS |
使用Broker导入数据 |
本地文件 |
导入本地数据 |
Kafka |
订阅Kafka数据 |
Mysql、PostgreSQL,Oracle,SQLServer |
通过外部表同步数据 |
通过JDBC导入 |
使用JDBC同步数据 |
导入JSON格式数据 |
JSON格式数据导入 |
按导入方式划分
导入方式名称 |
使用方式 |
Spark Load |
通过Spark导入外部数据 |
Broker Load |
通过Broker导入外部存储数据 |
Stream Load |
流式导入数据(本地文件及内存数据) |
Routine Load |
导入Kafka数据 |
Insert Into |
外部表通过INSERT方式导入数据 |
S3 Load |
S3协议的对象存储数据导入 |
MySQL Load |
MySQL客户端导入本地数据 |
支持的数据格式
不同的导入方式支持的数据格式不同。
导入方式 |
支持的格式 |
Broker Load |
parquet、orc、csv、gzip |
Stream Load |
csv、json、parquet、orc |
Routine Load |
csv、json |
MySQL Load |
csv |
1、INSERT INTO
在Doris数据库中,INSERT INTO语句的使用方式和在MySQL等数据库中INSERT INTO语句的使用方式类似。但在Doris中,所有的数据写入都是一个独立的导入作业,所以这里将INSERT INTO作为一种导入方式介绍。
INSERT INTO语句主要有3个应用场景。
1)导入几条测试数据,验证Doris的功能,此时适合使用INSERT INTO VALUES语法;
2)在执行数据ETL操作时,将Doris内部表的查询结果写入另一张新表,此时适合使用INSERT INTO SELECT语法;
3)先创建外部表,然后通过INSERT INTO SELECT语句将外部表数据导入Doris内部表存储。
注:因为Doris的每个INSERT INTO语句都是一个独立的任务,所以每次INSERT INTO语句的执行都会产生一个新的数据版本。频繁小批量导入操作会产生过多的数据版本,而过多的数据版本会影响查询性能,所以并不建议频繁使用INSERT INTO语句导入数据。如果有流式导入或者小批量导入任务需求,我们可以使用Stream Load或者Routine Load语句进行导入。
2、Stream Load
Stream Load是Doris用户常用的数据导入方式之一,是一种同步导入方式,允许用户通过HTTP将CSV格式或JSON格式的数据批量导入,并返回数据导入结果。用户可以直接通过HTTP请求的返回结果判断数据导入是否成功,也可以通过在客户端执行查询SQL命令来查询历史任务的结果。另外,Doris还针对Stream Load任务提供了日志审计功能,以通过审计日志对历史Stream Load任务进行审计。
Stream Load执行原理:在Coordinator BE节点中,一个线程池负责处理所有的HTTP请求,其中包括Stream Load请求。一次Stream Load任务拥有唯一一个Label。
Stream Load的详细执行流程如下。
1)用户提交Stream Load请求到FE节点,也可以直接提交Stream Load请求到BE节点
2)FE节点接收到用户提交的Stream Load请求后进行Header解析(其中包括解析数据导入的库、表、Label等信息),然后进行用户鉴权。如果Header解析成功并且用户鉴权通过,FE节点会将Stream Load请求转发到一个BE节点,由该BE节点作为本次Stream Load任务的Coordinator;否则,FE节点会直接向用户返回Stream Load失败的信息
3)Coordinator BE接收到Stream Load请求后进行Header解析和数据校验,其中包括解析数据的文件格式、消息体的大小、超时时间、用户鉴权信息等。如果Header解析和数据校验失败,Coordinator BE直接向用户返回Stream Load失败的信息。
4)Header解析和数据校验通过之后,Coordinator BE会通过Thrift RPC向FE节点发送Begin Transaction请求。
5)FE节点收到Coordinator BE发送的Begin Transaction请求之后,开启一个事务,并向Coordinator BE返回事务ID。
6)Coordinator BE收到事务ID后,通过Thrift RPC向FE节点发送获取导入计划的请求
7)FE节点收到Coordinator BE发送的获取导入计划请求之后,为Stream Load任务生成导入计划,并返回给Coordinator BE。
8)Coordinator BE接收到导入计划之后,开始执行导入计划,其中包括接收传来的实时数据以及将实时数据通过BRPC分发到其他Executor BE。
9)其他Executor BE接收到Coordinator BE分发的实时数据之后,将数据写入存储层。
10)Executor BE完成数据写入之后,Coordinator BE通过Thrift RPC向FE节点发送Commit Transaction请求。
11)FE节点收到Coordinator BE发送的Commit Transaction请求之后,对事务进行提交,并向Executor BE发送Publish Version任务,同时等待Executor BE执行Publish Version任务完成。
12)Executor BE异步执行Publish Version任务,并将数据导入时生成的Rowset变为可见数据版本。
13)当Publish Version任务正常完成或执行超时时,FE向Coordinator BE返回Commit Transaction和Publish Version任务结果。
14)Coordinator BE向用户返回Stream Load,执行最终结果。
3、Broker Load
在Broker Load方式下,通过部署的Broker程序,Doris可读取对应数据源(如HDFS、S3)中的数据,利用自身的计算资源对数据进行预处理和导入。这是一种异步导入方式,用户需要通过MySQL协议创建Broker Load任务,并通过查看导入命令检查导入结果。
Broker Load详细执行流程如下。
1)用户创建Broker Load任务,提交给FE。
2)FE根据文件存储大小和文件个数,制定数据分片导入计划。
3)FE按照计划指挥多个BE节点导入指定的文件或者分片数据。
4)BE通过Broker拉取数据,写入磁盘。
5)BE完成数据导入后反馈消息给FE。
6)FE继续下发任务给BE,直到所有文件数据都导入完成。
7)FE收到所有文件数据导入完成的消息后,反馈给用户。