适用场景
spark load, 本质上是把其他大数据存储(比如hdfs/s3)的文件,通过spark func(spark on yarn/spark on k8s)转换成doris能识别的文件
现有文件,再导入doris, 所以适合离线场景
海量数据场景下, 由于其是spark on yarn/spark on k8s,所以需要评估资源,suffer相关优化.
基础设施验证
组件 | 版本 | 环境变量 |
spark | 3.x |
SPARK_HOME |
hive | 3.x | HIVE_HOME |
hadoop | 3.x |
HADOOP_HOME HADOOP_CONF_DIR |
yarn | N/A | N/A |
java | 1.8 | JAVA_HOME |
DORIS (fe&be) |
2.0.4x | DORIS_HOME(非必须) |
DORIS BROKER | 2.0.4x |
环境搭建
fe.conf
新增如下配置固定格式:
spark_resource_path={doris home的目录}/lib/spark2x/jars/spark-2x.zip
sparkdpp的spark任务跑的包路径
-rw-r--r-- 1 doris doris 72M Apr 11 21:15 {doris 安装目录}/spark-dpp/spark-dpp-1.2-SNAPSHOT-jar-with-dependencies.jar
spark-2x.zip打包
需要在fe的lib目录下面创建spark2x目录
把spark_home下面的所有压缩jars到spark-2x.zip
cd {spark 安装目录}
cd jras
zip -q -r spark-2x.zip *
源数据
以hdfs数据为例: doris要能访问到,改配置linux权限 ambari/ranger/kdc
hadoop fs -ls hdfs://目录/tmp/aaa.csv 支持*号这种目录
格式样例
1,2,3
4,5,6
8,6,9
实操
建表:
通过mysql命令进入控制台
CREATE TABLE `测试表A` (
`a` int(11),
`b` int(11),
`c` int(11)
) ENGINE=OLAP
DUPLICATE KEY(`a`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`a`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 2"
);
建资源resource
CREATE EXTERNAL RESOURCE 资源名A
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.executor.memory" = "2g", //根据实际大小调好
"spark.yarn.queue" = "default", //先有队列
"spark.hadoop.spark.sql.parquet.fieldId.write.enabled" = "true", //必须,因为只支持parq
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.address.rm1" = "rm1的host:端口",
"spark.hadoop.yarn.resourcemanager.address.rm2" = "rm2的host:端口",
"spark.hadoop.hadoop.security.authentication" = "kerberos",
"spark.hadoop.yarn.resourcemanager.principal" = "yarn/_HOST@域名", //以实际权限为准
"spark.hadoop.yarn.resourcemanager.keytab" = "yarn.keytab全路径",
"spark.hadoop.dfs.nameservices" = "命名空间",
"spark.hadoop.fs.defaultFS" = "hdfs://命名空间",
"spark.hadoop.dfs.ha.namenodes.命名空间" = "nn1,nn2",
"spark.hadoop.dfs.namenode.rpc-address.命名空间.nn1" = "nn1域名:54310",
"spark.hadoop.dfs.namenode.rpc-address.命名空间.nn2" = "nn2域名:54310",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"spark.hive.metastore.kerberos.principal" = "hive/_HOST@域名", //以实际权限为准
"spark.hive.metastore.kerberos.keytab.file" = "hive.keytab全路径",
"working_dir" = "hdfs://命名空间/tmp/spark_load_yarn5/",
"broker" = "broker",
"broker.hadoop.security.authentication" = "kerberos",
"broker.kerberos_principal" = "doris@域名", //以实际权限为准,可以不带_HOST
"broker.kerberos_keytab" = "doris.keytab全路径",
"broker.dfs.nameservices" = "命名空间",
"broker.dfs.ha.namenodes.命名空间" = "nn1, nn2",
"broker.dfs.namenode.rpc-address.命名空间.nn1" = "NN1的host:端口",
"broker.dfs.namenode.rpc-address.命名空间.nn2" = "NN2的host:端口",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
LOAD
LOAD LABEL 不重复的label名
(
DATA INFILE("hdfs://路径/tmp/aaa.csv") //实际文件路径
INTO TABLE 测试表A //不能带库名
COLUMNS TERMINATED BY ","
FORMAT AS "csv"
(a,b,c) //切出来的值的alias
SET
(
a=a, //字段名=alias
b=b,
c=c
)
)
WITH RESOURCE 资源名A( //使用某个资源
"spark.executor.memory" = "3g",
"spark.shuffle.compress" = "true",
"spark.executor.cores" = "2",
"spark.executor.instances" = "2",
"spark.sql.parquet.fieldId.write.enabled" = "true"
);
数据验证
show load where label='label名'\G;
等到返回为Finsh的时候,查询数据,进行对数