作业开发 Spark作业访问MySQL数据库的方案 通过DLI Spark作业访问MySQL数据库中的数据有如下两种方案: 方案1:在DLI中购买按需专属队列,创建增强型跨源连接,再通过跨源表读取MySQL数据库中的数据,该方案需要用户自行编写java代码或scala代码。 方案2:先使用云数据迁移服务CDM将MySQL数据库中的数据导入OBS桶中,再通过Spark作业读取OBS桶中的数据,如果用户已有CDM集群,该方案比方案1简单,且不会对现有数据库造成压力。 如何通过JDBC设置spark.sql.shuffle.partitions参数提高并行度 操作场景 Spark作业在执行shuffle类语句,包括group by、join等场景时,常常会出现数据倾斜的问题,导致作业任务执行缓慢。 该问题可以通过设置spark.sql.shuffle.partitions提高shuffle read task的并行度来进行解决。 设置spark.sql.shuffle.partitions参数提高并行度 用户可在JDBC中通过set方式设置dli.sql.shuffle.partitions参数。具体方法如下: Statement st conn.stamte() st.execute("set spark.sql.shuffle.partitions20") Spark jar 如何读取上传文件 Spark可以使用SparkFiles读取 –file中提交上来的文件的本地路径,即:SparkFiles.get("上传的文件名")。 说明 lDriver中的文件路径与Executor中获取的路径位置是不一致的,所以不能将Driver中获取到的路径作为参数传给Executor去执行。 lExecutor获取文件路径的时候,仍然需要使用SparkFiles.get(“filename”)的方式获取。 lSparkFiles.get()方法需要spark初始化以后才能调用。 代码段如下所示 package main.java import org.apache.spark.SparkFiles import org.apache.spark.sql.SparkSession import scala.io.Source object DliTest { def main(args:Array[String]): Unit { val spark SparkSession.builder .appName("SparkTest") .getOrCreate() // driver 获取上传文件 println(SparkFiles.get("test")) spark.sparkContext.parallelize(Array(1,2,3,4)) // Executor 获取上传文件 .map( > println(SparkFiles.get("test"))) .map( > println(Source.fromFile(SparkFiles.get("test")).mkString)).collect() } }