BroadCast TimeOut 300.
org.apache.spark.SparkException: Could not execute broadcast in 300 secs.
You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
一个stt的job被Block之后,经查日志发现有两个表在广播的时候出现了broadcast TimeOut的问题。
Sql logic:
create or replace temporary view table_tmp
as
select
/** BROADCAST(A_tmp) */
A_tmp.c1,
A_tmp.c2,
A_tmp.c3,
...
from source_table A_tmp
join gdw_table.other_table1 B_tmp on ...
join gdw_table.other_table2 C_tmp on ...
group by 1,2,3,4,5,6,...11
检查了一下
A_tmp的数据量,有238 rows 数据。
B_tmp的数据量,有1834203873 rows 数据。
C_tmp的数据量,有289371375 rows 的数据。
Property Name | Default | Meaning |
---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | The maximum number of bytes to pack into a single partition when reading files. |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). |
spark.sql.broadcastTimeout | 300 | Timeout in seconds for the broadcast wait time in broadcast joins |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE |
spark.sql.shuffle.partitions | 200 | Configures the number of partitions to use when shuffling data for joins or aggregations. |
问题产生原因:
可能Job的执行时间是美国的凌晨一点,会有大量的资源被占用,资源紧缺,导致没有在规定时间内完成join并相应。
解决方案:
- 增大默认Timeout时间,set spark.sql.broadcastTimeout = 600.
- persist(), persist() 可以使broadcastHashjoin 变成 shuffleHashJoin.
- 关闭broadcast join set spark.sql.autoBroadcastJoinThreshold = -1
- reset job.
选择了一个其他的时间重新执行该job,成功执行。