searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Doris Spark Load 初探

2024-05-28 05:55:33
23
0

 

适用场景

spark load, 本质上是把其他大数据存储(比如hdfs/s3)的文件,通过spark func(spark on yarn/spark on k8s)转换成doris能识别的文件

现有文件,再导入doris, 所以适合离线场景

海量数据场景下, 由于其是spark on yarn/spark on k8s,所以需要评估资源,suffer相关优化.

 

基础设施验证

组件 版本 环境变量
spark 3.x
SPARK_HOME
hive 3.x HIVE_HOME
hadoop 3.x

HADOOP_HOME

HADOOP_CONF_DIR

yarn N/A N/A
java 1.8 JAVA_HOME
DORIS
(fe&be)
2.0.4x DORIS_HOME(非必须)
DORIS BROKER 2.0.4x  

 

环境搭建

fe.conf

新增如下配置固定格式:

spark_resource_path={doris home的目录}/lib/spark2x/jars/spark-2x.zip

sparkdpp的spark任务跑的包路径

-rw-r--r-- 1 doris doris 72M Apr 11 21:15 {doris 安装目录}/spark-dpp/spark-dpp-1.2-SNAPSHOT-jar-with-dependencies.jar

spark-2x.zip打包

需要在fe的lib目录下面创建spark2x目录

把spark_home下面的所有压缩jars到spark-2x.zip

cd {spark 安装目录}
cd jras

zip -q -r spark-2x.zip *

源数据

以hdfs数据为例: doris要能访问到,改配置linux权限 ambari/ranger/kdc

hadoop fs -ls hdfs://目录/tmp/aaa.csv  支持*号这种目录

格式样例

1,2,3
4,5,6
8,6,9

实操

建表:

通过mysql命令进入控制台

CREATE TABLE `测试表A` (
  `a` int(11),
  `b` int(11),
  `c` int(11)
) ENGINE=OLAP
DUPLICATE KEY(`a`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`a`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 2"
);

建资源resource

CREATE EXTERNAL RESOURCE 资源名A
PROPERTIES
(
 
  "type" = "spark",
  "spark.master" = "yarn",
  "spark.submit.deployMode" = "cluster",
  "spark.executor.memory" = "2g",   //根据实际大小调好
  "spark.yarn.queue" = "default",  //先有队列
  "spark.hadoop.spark.sql.parquet.fieldId.write.enabled" = "true",  //必须,因为只支持parq
  "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
  "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
  "spark.hadoop.yarn.resourcemanager.address.rm1" = "rm1的host:端口",
  "spark.hadoop.yarn.resourcemanager.address.rm2" = "rm2的host:端口",
  "spark.hadoop.hadoop.security.authentication" = "kerberos",
  "spark.hadoop.yarn.resourcemanager.principal" = "yarn/_HOST@域名", //以实际权限为准
  "spark.hadoop.yarn.resourcemanager.keytab" = "yarn.keytab全路径",
  "spark.hadoop.dfs.nameservices" = "命名空间",
  "spark.hadoop.fs.defaultFS" = "hdfs://命名空间",
  "spark.hadoop.dfs.ha.namenodes.命名空间" = "nn1,nn2",
  "spark.hadoop.dfs.namenode.rpc-address.命名空间.nn1" = "nn1域名:54310",
  "spark.hadoop.dfs.namenode.rpc-address.命名空间.nn2" = "nn2域名:54310",
  "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
  "spark.hive.metastore.kerberos.principal" = "hive/_HOST@域名", //以实际权限为准
  "spark.hive.metastore.kerberos.keytab.file" = "hive.keytab全路径",
  "working_dir" = "hdfs://命名空间/tmp/spark_load_yarn5/",
  "broker" = "broker",
  "broker.hadoop.security.authentication" = "kerberos",
  "broker.kerberos_principal" = "doris@域名",  //以实际权限为准,可以不带_HOST
  "broker.kerberos_keytab" = "doris.keytab全路径",
  "broker.dfs.nameservices" = "命名空间",
  "broker.dfs.ha.namenodes.命名空间" = "nn1, nn2",
  "broker.dfs.namenode.rpc-address.命名空间.nn1" = "NN1的host:端口",
  "broker.dfs.namenode.rpc-address.命名空间.nn2" = "NN2的host:端口",
  "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);

LOAD

LOAD LABEL 不重复的label名
(
    DATA INFILE("hdfs://路径/tmp/aaa.csv")  //实际文件路径
    INTO TABLE 测试表A  //不能带库名
    COLUMNS TERMINATED BY ","
    FORMAT AS "csv"
    (a,b,c)  //切出来的值的alias
    SET
    (
        a=a,  //字段名=alias
        b=b,
        c=c
    )
     
)
WITH RESOURCE 资源名A(  //使用某个资源
    "spark.executor.memory" = "3g",
    "spark.shuffle.compress" = "true",
    "spark.executor.cores" = "2",
    "spark.executor.instances" = "2",
    "spark.sql.parquet.fieldId.write.enabled" = "true"
);

数据验证

show load where label='label名'\G;

 

等到返回为Finsh的时候,查询数据,进行对数

0条评论
0 / 1000
w****n
6文章数
0粉丝数
w****n
6 文章 | 0 粉丝
w****n
6文章数
0粉丝数
w****n
6 文章 | 0 粉丝
原创

Doris Spark Load 初探

2024-05-28 05:55:33
23
0

 

适用场景

spark load, 本质上是把其他大数据存储(比如hdfs/s3)的文件,通过spark func(spark on yarn/spark on k8s)转换成doris能识别的文件

现有文件,再导入doris, 所以适合离线场景

海量数据场景下, 由于其是spark on yarn/spark on k8s,所以需要评估资源,suffer相关优化.

 

基础设施验证

组件 版本 环境变量
spark 3.x
SPARK_HOME
hive 3.x HIVE_HOME
hadoop 3.x

HADOOP_HOME

HADOOP_CONF_DIR

yarn N/A N/A
java 1.8 JAVA_HOME
DORIS
(fe&be)
2.0.4x DORIS_HOME(非必须)
DORIS BROKER 2.0.4x  

 

环境搭建

fe.conf

新增如下配置固定格式:

spark_resource_path={doris home的目录}/lib/spark2x/jars/spark-2x.zip

sparkdpp的spark任务跑的包路径

-rw-r--r-- 1 doris doris 72M Apr 11 21:15 {doris 安装目录}/spark-dpp/spark-dpp-1.2-SNAPSHOT-jar-with-dependencies.jar

spark-2x.zip打包

需要在fe的lib目录下面创建spark2x目录

把spark_home下面的所有压缩jars到spark-2x.zip

cd {spark 安装目录}
cd jras

zip -q -r spark-2x.zip *

源数据

以hdfs数据为例: doris要能访问到,改配置linux权限 ambari/ranger/kdc

hadoop fs -ls hdfs://目录/tmp/aaa.csv  支持*号这种目录

格式样例

1,2,3
4,5,6
8,6,9

实操

建表:

通过mysql命令进入控制台

CREATE TABLE `测试表A` (
  `a` int(11),
  `b` int(11),
  `c` int(11)
) ENGINE=OLAP
DUPLICATE KEY(`a`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`a`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 2"
);

建资源resource

CREATE EXTERNAL RESOURCE 资源名A
PROPERTIES
(
 
  "type" = "spark",
  "spark.master" = "yarn",
  "spark.submit.deployMode" = "cluster",
  "spark.executor.memory" = "2g",   //根据实际大小调好
  "spark.yarn.queue" = "default",  //先有队列
  "spark.hadoop.spark.sql.parquet.fieldId.write.enabled" = "true",  //必须,因为只支持parq
  "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
  "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
  "spark.hadoop.yarn.resourcemanager.address.rm1" = "rm1的host:端口",
  "spark.hadoop.yarn.resourcemanager.address.rm2" = "rm2的host:端口",
  "spark.hadoop.hadoop.security.authentication" = "kerberos",
  "spark.hadoop.yarn.resourcemanager.principal" = "yarn/_HOST@域名", //以实际权限为准
  "spark.hadoop.yarn.resourcemanager.keytab" = "yarn.keytab全路径",
  "spark.hadoop.dfs.nameservices" = "命名空间",
  "spark.hadoop.fs.defaultFS" = "hdfs://命名空间",
  "spark.hadoop.dfs.ha.namenodes.命名空间" = "nn1,nn2",
  "spark.hadoop.dfs.namenode.rpc-address.命名空间.nn1" = "nn1域名:54310",
  "spark.hadoop.dfs.namenode.rpc-address.命名空间.nn2" = "nn2域名:54310",
  "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
  "spark.hive.metastore.kerberos.principal" = "hive/_HOST@域名", //以实际权限为准
  "spark.hive.metastore.kerberos.keytab.file" = "hive.keytab全路径",
  "working_dir" = "hdfs://命名空间/tmp/spark_load_yarn5/",
  "broker" = "broker",
  "broker.hadoop.security.authentication" = "kerberos",
  "broker.kerberos_principal" = "doris@域名",  //以实际权限为准,可以不带_HOST
  "broker.kerberos_keytab" = "doris.keytab全路径",
  "broker.dfs.nameservices" = "命名空间",
  "broker.dfs.ha.namenodes.命名空间" = "nn1, nn2",
  "broker.dfs.namenode.rpc-address.命名空间.nn1" = "NN1的host:端口",
  "broker.dfs.namenode.rpc-address.命名空间.nn2" = "NN2的host:端口",
  "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);

LOAD

LOAD LABEL 不重复的label名
(
    DATA INFILE("hdfs://路径/tmp/aaa.csv")  //实际文件路径
    INTO TABLE 测试表A  //不能带库名
    COLUMNS TERMINATED BY ","
    FORMAT AS "csv"
    (a,b,c)  //切出来的值的alias
    SET
    (
        a=a,  //字段名=alias
        b=b,
        c=c
    )
     
)
WITH RESOURCE 资源名A(  //使用某个资源
    "spark.executor.memory" = "3g",
    "spark.shuffle.compress" = "true",
    "spark.executor.cores" = "2",
    "spark.executor.instances" = "2",
    "spark.sql.parquet.fieldId.write.enabled" = "true"
);

数据验证

show load where label='label名'\G;

 

等到返回为Finsh的时候,查询数据,进行对数

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0