三.spark优化参数

spark优化参数


--设置spark shuffle分区数量参考: excutor-cores * 3 
set("spark.sql.shuffle.partitions", "36")


--1.broadcastHashJOin
--默认小表小于10M自动进行广播join
set("spark.sql.autoBroadcastJoinThreshold","10m")
--1.可强制使用广播join SQL Hint暗示方式 
select /*+  BROADCASTJOIN(sc) */ 
select /*+  BROADCAST(sc) */
select /*+  MAPJOIN(sc) */
--2.使用API方式
import org.apache.spark.sql.functions._
    broadcast(sc)
      .join(csc,Seq("courseid"))
      .select("courseid")

--2.shufflehashjoin 的hint
select /*+ SHUFFLE_REPLICATE_NL(school) */

--3.SortMergeJoin的hint
select /*+ SHUFFLE_MERGE(school) */
select /*+ MERGEJOIN(school) */ 
select /*+ MERGE(school) */ 


--调增小文件的读取,避免大量数据分片、task元数据信息对Driver造成压力
spark.sql.files.maxPartitionBytes=128MB  --设置分区大小
spark.files.openCostInBytes=4194304 	--设置文件开销大小最好为小文件大小, 默认 4m

--设置map端输出流buffer(改变并不明显)
set("spark.shuffle.file.buffer", "64") --能修改 map端输出流buffer默认为32k
set("spark.shuffle.spill.batchSize", "20000") -- 不可修改 溢写批次条数
set("spark.shuffle.spill.initialMemoryThreshold", "104857600")--不可修改溢写批次大小为5M


--设置reduce端拉取大小以及拉取次数(改变并不明显)
set("spark.reducer.maxSizeInFlight", "96m") -- reduce缓冲区,默认48m
set("spark.shuffle.io.maxRetries", "6")  -- 重试次数,默认3次
set("spark.shuffle.io.retryWait", "60s")  -- 重试的间隔,默认5s


---本地化级别:进程本地化、节点本地化、机架本地化、any。从左到右按待时间不满足时,依次降级.
set("spark.locality.wait", "6s")			--默认3s.全局设置本地化级别等待时间
set("spark.locality.wait.process", "60s")	--进程本地化级别等待时间
set("spark.locality.wait.node", "30s")		--节点本地化级别等待时间
set("spark.locality.wait.rack", "20s")		--机架本地化级别等待时间


--设置节点超时时间,避免GC时间过长,把excutor干掉
--conf spark.core.connection.ack.wait.timeout=300s ##默认120s

--开启使用堆外内存
 --conf spark.memory.offHeap.enabled=true 
 --conf spark.memory.offHeap.size=1g 
 
--设置堆外内存开销
 --conf spark.executor.memoryOverhead=1G ## max(384,200)
 --conf spark.driver.memoryOverhead	=1G ## max(384,200)
	

---------spark3推出了 AQE(Adaptive Query Execution),即自适应查询执行。
--spark动态合并分区
set("spark.sql.autoBroadcastJoinThreshold", "-1")  --为演示效果关闭自动join
set("spark.sql.adaptive.enabled", "true")			--开启自适应总开关
set("spark.sql.adaptive.coalescePartitions.enabled", "true")	--开启合并分区
set("spark.sql.adaptive.coalescePartitions.initialPartitionNum","1000")	--开启初始化分区数,默认为分区数
set("spark.sql.adaptive.coalescePartitions.minPartitionNum","10")	--合并最小分区数
set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "20mb")	--每个合并后分区最大的大小
set("spark.dynamicAllocation.enabled","true")  -- 动态申请资源 (不建议开)
set("spark.dynamicAllocation.shuffleTracking.enabled","true") -- shuffle动态跟踪(查看动态申请资源)
 

--spark动态切换join策略
spark.sql.adaptive.localShuffleReader.enabled=true	--当不需要shuffle分区时,尝试使用本地shuffle

--设置数据倾斜的数据均衡
set("spark.sql.adaptive.skewJoin.enable","true")		--打开数据倾斜开关
set("spark.sql.adaptive.skewJoin.skewedPartitionFactor","2")	--如果数据块 大于中位数*此因子 则为数据倾斜
set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes","20mb") --当单个数据块大于此值则数据倾斜(和上面同时满足则认为数据倾斜
set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "8mb")	--建议shuffle和并后分区最大的大小


--解决数据倾斜:对于大key
1.distribute by cast(rand() * 5 as int)
2.先对数据进行分组,对key进行加盐聚合,再去盐聚合。

上一篇:MapReduce工作流程


下一篇:Hadoop配置文件之(yarn-site.xml)