Mysql8关于hashjoin的代码处理方式
1 表的Schema如下所示:
create table t1(
c1 int primary key,
c2 int
);
create table t2(
d1 int primary key,
d2 int
);
insert into t1 values(1,1);
insert into t1 values(2,2);
insert into t1 values(3,3);
insert into t1 values(4,4);
insert into t1 values(5,5);
insert into t1 values(6,6);
insert into t1 values(7,7);
insert into t2 values(1,1);
insert into t2 values(2,2);
insert into t2 values(3,3);
insert into t2 values(4,4);
insert into t2 values(5,5);
insert into t2 values(6,6);
insert into t2 values(7,7);
在Mysql8.0.20之后(包含),Mysql在实际执行过程中大量的使用了HashJoin
,只要是以前能够用到BNL(Blocked Nested Loop Join)
的地方,都会选择HashJoin
的方式来代替,众所周知,HashJoin
只能适用于等值查询,譬如下面的语句:
explain format=tree SELECT * FROM t1 JOIN t2 ON t1.c2=t2.d2;
/*explain的结果如下*/
| EXPLAIN |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| -> Inner hash join (t2.d2 = t1.c2) (cost=6.10 rows=7)
-> Table scan on t2 (cost=0.05 rows=7)
-> Hash
-> Table scan on t1 (cost=0.95 rows=7)
|
因为上面join列都没有使用到索引,所以对于这个语句,Mysql会优先使用HashJoin
,但是如果使用到了索引,那么就依旧会采用Nested Loop Join
的方式,比如下面的语句:
explain format=tree SELECT * FROM t1 JOIN t2 ON t1.c1=t2.d2;
| EXPLAIN |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| -> Nested loop inner join (cost=3.40 rows=7)
-> Filter: (t2.d2 is not null) (cost=0.95 rows=7)
-> Table scan on t2 (cost=0.95 rows=7)
-> Single-row index lookup on t1 using PRIMARY (c1=t2.d2) (cost=0.26 rows=1)
|
重点来了,在Mysql中,即使join条件没有采用等值的方式,那么也会采用HashJoin的方式,比如下面的语句:
explain format=tree SELECT * FROM t1 JOIN t2 ON t1.c2 > t2.d2;
----------------------------------------------+
| EXPLAIN |
+-------------------------------------------------------------------------------------------------------------+
/*可以看到,join条件变成了filter的方式,但是join方式还是采用了HashJoin的方式*/
| -> Filter: (t1.c2 > t2.d2) (cost=6.10 rows=16)
-> Inner hash join (no condition) (cost=6.10 rows=16)
-> Table scan on t2 (cost=0.07 rows=7)
-> Hash
-> Table scan on t1 (cost=0.95 rows=7)
|
可以看到,join条件变成了filter
的方式,但是join
方式还是采用了HashJoin
的方式,那么Mysql是怎么实现非等值情况下的HashJoin
的呢?这里我们分析下代码。
2 HashJoin代码实现
本示例代码采用如下的sql语句来进行示例:
SELECT * FROM t1 JOIN t2 ON t1.c1=t2.d2
上述语句在Mysql里面使用到的Iterator
如下:
HashJoinIterator
TableScanIterator
TableScanIterator
在HashJoin中,主要是流程如下:
左表扫描数据 -> 构建Hash表 -> 右表扫描数据 -> 在Hash进行probe操作 -> 将结果返回给用户
重要函数调用的流程图大体如下:
-
左表扫描数据并构建Hash表的流程
- 在
sql_union.cc
里面的ExecuteIteratorQuery
函数,会在执行前对iterator
进行如下操作:
if (m_root_iterator->Init()) { return true; }
- 对于
HashJoinIterator
的Init
函数介绍如下:
// Prepare to read the build input into the hash map. // m_build_input:为做表的对象,这里也就是t1表的TableScanIterator,先进行init操作,里面其实就是准备扫描表相关的准备工作 if (m_build_input->Init()) { DBUG_ASSERT(thd()->is_error()); // my_error should have been called. return true; } // We always start out by doing everything in memory. m_hash_join_type = HashJoinType::IN_MEMORY; m_write_to_probe_row_saving = false; //省略部分代码 // Build the hash table // 扫描左表,第一次进行构建Hash表的操作,这是一个比较重要的函数 if (BuildHashTable()) { DBUG_ASSERT(thd()->is_error()); // my_error should have been called. return true; } if (m_state == State::END_OF_ROWS) { // BuildHashTable() decided that the join is done (the build input is // empty, and we are in an inner-/semijoin. Anti-/outer join must output // NULL-complemented rows from the probe input). return false; } // 进行右表的准备工作。 return InitProbeIterator();
-
BuildHashTable()
函数如下:
if (!m_build_iterator_has_more_rows) { m_state = State::END_OF_ROWS; return false; } //省略部分代码 // 初始化m_row_buffer成员变量,该变量本质上存储了一个std::unordered_multimap对象 if (InitRowBuffer()) { return true; } const bool reject_duplicate_keys = RejectDuplicateKeys(); const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER; // If Init() is called multiple times (e.g., if hash join is inside an // dependent subquery), we must clear the NULL row flag, as it may have been // set by the previous executing of this hash join. m_build_input->SetNullRowFlag(/*is_null_row=*/false); PFSBatchMode batch_mode(m_build_input.get()); //在Init阶段,Mysql会尽可能的多从底层读取数据构建Hash表。 for (;;) { // Termination condition within loop. int res = m_build_input->Read(); if (res == 1) { DBUG_ASSERT(thd()->is_error()); // my_error should have been called. return true; } //省略部分代码 //将底层扫描的数据放在m_row_buffer map里面,该函数下面会详细介绍。 const hash_join_buffer::StoreRowResult store_row_result = m_row_buffer.StoreRow(thd(), reject_duplicate_keys, store_rows_with_null_in_join_key); //校验返回的结果,分别为:存储成功,buffer满了需要下盘和致命错误三种情况。 switch (store_row_result) { case hash_join_buffer::StoreRowResult::ROW_STORED: break; case hash_join_buffer::StoreRowResult::BUFFER_FULL: { // The row buffer is full, so start spilling to disk (if allowed). Note // that the row buffer checks for OOM _after_ the row was inserted, so // we should always manage to insert at least one row. DBUG_ASSERT(!m_row_buffer.empty()); // If we are not allowed to spill to disk, just go on to reading from // the probe iterator. if (!m_allow_spill_to_disk) { if (m_join_type != JoinType::INNER) { // Enable probe row saving, so that unmatched probe rows are written // to the probe row saving file. After the next refill of the hash // table, we will read rows from the probe row saving file, ensuring // that we only read unmatched probe rows. InitWritingToProbeRowSavingFile(); } SetReadingProbeRowState(); return false; } // Ideally, we would use the estimated row count from the iterator. But // not all iterators has the row count available (i.e. // RemoveDuplicatesIterator), so get the row count directly from the // QEP_TAB. const QEP_TAB *last_table_in_join = m_build_input_tables.tables().back().qep_tab; if (InitializeChunkFiles( last_table_in_join->position()->prefix_rowcount, m_row_buffer.size(), kMaxChunks, m_probe_input_tables, m_build_input_tables, /*include_match_flag_for_probe=*/m_join_type == JoinType::OUTER, &m_chunk_files_on_disk)) { DBUG_ASSERT(thd()->is_error()); // my_error should have been called. return true; } // Write out the remaining rows from the build input out to chunk files. // The probe input will be written out to chunk files later; we will do // it _after_ we have checked the probe input for matches against the // rows that are already written to the hash table. An alternative // approach would be to write out the remaining rows from the build // _and_ the rows that already are in the hash table. In that case, we // could also write out the entire probe input to disk here as well. But // we don not want to waste the rows that we already have stored in // memory. // // We never write out rows with NULL in condition for the build/right // input, as these rows will never match in a join condition. if (WriteRowsToChunks(thd(), m_build_input.get(), m_build_input_tables, m_join_conditions, kChunkPartitioningHashSeed, &m_chunk_files_on_disk, true /* write_to_build_chunks */, false /* write_rows_with_null_in_join_key */, &m_temporary_row_and_join_key_buffer)) { DBUG_ASSERT(thd()->is_error()); // my_error should have been called. return true; } // Flush and position all chunk files from the build input at the // beginning. for (ChunkPair &chunk_pair : m_chunk_files_on_disk) { if (chunk_pair.build_chunk.Rewind()) { DBUG_ASSERT( thd()->is_error()); // my_error should have been called. return true; } } SetReadingProbeRowState(); return false; } case hash_join_buffer::StoreRowResult::FATAL_ERROR: // An unrecoverable error. Most likely, malloc failed, so report OOM. // Note that we cannot say for sure how much memory we tried to allocate // when failing, so just report ‘join_buffer_size‘ as the amount of // memory we tried to allocate. my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), thd()->variables.join_buff_size); return true; } }
-
HashJoinRowBuffer::StoreRow
函数StoreRowResult HashJoinRowBuffer::StoreRow( THD *thd, bool reject_duplicate_keys, bool store_rows_with_null_in_condition) { // Make the key from the join conditions. m_buffer.length(0); //所有的condition都是equal的形式,append_join_key_for_hash_join函数里面会根据不同的数据类型,将数据拷贝到m_buffer里面。 for (const HashJoinCondition &hash_join_condition : m_join_conditions) { bool null_in_join_condition = hash_join_condition.join_condition()->append_join_key_for_hash_join( thd, m_tables.tables_bitmap(), hash_join_condition, &m_buffer); if (null_in_join_condition && !store_rows_with_null_in_condition) { // SQL NULL values will never match in an inner join or semijoin, so skip // the row. return StoreRowResult::ROW_STORED; } } // TODO(efroseth): We should probably use an unordered_map instead of multimap // for these cases so we do not have to hash and lookup twice. //有的语句是会reject 重复key的,所以这里专门做一个检查。 if (reject_duplicate_keys && contains(Key(pointer_cast<const uchar *>(m_buffer.ptr()), m_buffer.length()))) { return StoreRowResult::ROW_STORED; } // Allocate the join key on the same MEM_ROOT that the hash table is // allocated on, so it has the same lifetime as the rest of the contents in // the hash map (until Clear() is called on the HashJoinBuffer). //从m_buffer中拷贝join key到join_key_data中,方便后续放入map中的操作 const size_t join_key_size = m_buffer.length(); uchar *join_key_data = nullptr; if (join_key_size > 0) { join_key_data = m_mem_root.ArrayAlloc<uchar>(join_key_size); if (join_key_data == nullptr) { return StoreRowResult::FATAL_ERROR; } memcpy(join_key_data, m_buffer.ptr(), join_key_size); } //将行中的具体内容拷贝m_buffer中 // Save the contents of all columns marked for reading. if (StoreFromTableBuffers(m_tables, &m_buffer)) { return StoreRowResult::FATAL_ERROR; } // Give the row the same lifetime as the hash map, by allocating it on the // same MEM_ROOT as the hash map is allocated on. //从m_buffer中取出具体的数据内容,然后放在row对象中,方便后面插入到map中,目前的版本中,BufferRow对象只是Key的一个重命名,本质上两者是同一个类对象。 const size_t row_size = m_buffer.length(); uchar *row = nullptr; if (row_size > 0) { row = m_mem_root.ArrayAlloc<uchar>(row_size); if (row == nullptr) { return StoreRowResult::FATAL_ERROR; } memcpy(row, m_buffer.ptr(), row_size); } //将查询到的数据和join key封装为Key和BufferRow对象,放入到map中 m_last_row_stored = m_hash_map->emplace(Key(join_key_data, join_key_size), BufferRow(row, row_size)); if (m_mem_root.allocated_size() > m_max_mem_available) { return StoreRowResult::BUFFER_FULL; } return StoreRowResult::ROW_STORED; }
-
InitProbeIterator
函数bool HashJoinIterator::InitProbeIterator() { DBUG_ASSERT(m_state == State::READING_ROW_FROM_PROBE_ITERATOR); //本质上是一个TableScanIterator,因此这里也只是简单的Init操作,并没有进行真正的probe操作 if (m_probe_input->Init()) { return true; } if (m_probe_input_batch_mode) { m_probe_input->StartPSIBatchMode(); } return false; }
- 在
-
右表扫描数据并进行
probe
操作。-
首先绝大多数情况下,
probe
操作是在Read
阶段执行的,如下ExecuteIteratorQuery
函数里面的Read
循环。for (;;) { //在这个例子里面,就是HashJoinIterator进行Read操作。 int error = m_root_iterator->Read(); DBUG_EXECUTE_IF("bug13822652_1", thd->killed = THD::KILL_QUERY;); if (error > 0 || thd->is_error()) // Fatal error return true; else if (error < 0) break; else if (thd->killed) // Aborted by user { thd->send_kill_message(); return true; } ++*send_records_ptr; if (query_result->send_data(thd, *fields)) { return true; } thd->get_stmt_da()->inc_current_row_for_condition(); }
-
HashJoinIterator::Read
函数如下:int HashJoinIterator::Read() { for (;;) { if (thd()->killed) { // Aborted by user. thd()->send_kill_message(); return 1; } switch (m_state) { case State::LOADING_NEXT_CHUNK_PAIR: if (ReadNextHashJoinChunk()) { return 1; } break; //大多数情况下,会进入到该分支。 case State::READING_ROW_FROM_PROBE_ITERATOR: if (ReadRowFromProbeIterator()) { return 1; } break; //省略部分代码,省略的case大部分都是内存无法存下,需要下盘的情况。 case State::READING_FIRST_ROW_FROM_HASH_TABLE: //开始从hashtable中获取数据,准备返回给用户 case State::READING_FROM_HASH_TABLE: { //这里的操作是将HashTable里面的数据重新复制给qep_tab->table()里面的相关的成员变量中,具体的行数据存储在qep_tab->table()->record数组中。该函数下面的会介绍 const int res = ReadNextJoinedRowFromHashTable(); if (res == 0) { // A joined row is ready, so send it to the client. return 0; } if (res == -1) { // No more matching rows in the hash table, or antijoin found a // matching row. Read a new row from the probe input. continue; } // An error occured, so abort the join. DBUG_ASSERT(res == 1); return res; } case State::END_OF_ROWS: return -1; } } // Unreachable. DBUG_ASSERT(false); return 1; }
-
HashJoinIterator::ReadRowFromProbeIterator
函数如下:bool HashJoinIterator::ReadRowFromProbeIterator() { DBUG_ASSERT(m_current_chunk == -1); //从右表中读取具体的数据 int result = m_probe_input->Read(); if (result == 1) { DBUG_ASSERT(thd()->is_error()); // my_error should have been called. return true; } if (result == 0) { RequestRowId(m_probe_input_tables.tables()); // A row from the probe iterator is ready. //开始从hash表中进行probe操作 LookupProbeRowInHashTable(); return false; } //下面的代码省略掉,这里只介绍最简单的场景 }
-
void HashJoinIterator::LookupProbeRowInHashTable()
函数如下:void HashJoinIterator::LookupProbeRowInHashTable() { if (m_join_conditions.empty()) { // Skip the call to equal_range in case we don‘t have any join conditions. // This can save up to 20% in case of multi-table joins. m_hash_map_iterator = m_row_buffer.begin(); m_hash_map_end = m_row_buffer.end(); //因为没有join condition(本质上是一个笛卡尔积),所以这里不进行probe操作,直接更新state,下一次循环就可以直接从hashJoin中读取到join后的行 m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE; return; } // Extract the join key from the probe input, and use that key as the lookup // key in the hash table. //类似于HashJoinRowBuffer::StoreRow里面的操作,将probe里面的行构建成hashMap可以用的Key bool null_in_join_key = ConstructJoinKey( thd(), m_join_conditions, m_probe_input_tables.tables_bitmap(), &m_temporary_row_and_join_key_buffer); if (null_in_join_key) { if (m_join_type == JoinType::ANTI || m_join_type == JoinType::OUTER) { // SQL NULL was found, and we will never find a matching row in the hash // table. Let us indicate that, so that a null-complemented row is // returned. m_hash_map_iterator = m_row_buffer.end(); m_hash_map_end = m_row_buffer.end(); m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE; } else { SetReadingProbeRowState(); } return; } hash_join_buffer::Key key( pointer_cast<const uchar *>(m_temporary_row_and_join_key_buffer.ptr()), m_temporary_row_and_join_key_buffer.length()); //这里是一个简单的trick,如果只关心第一行的数据,那么直接使用unordered_multimap的find函数可以更低的时间复杂度,否则使用equal_range函数 if ((m_join_type == JoinType::SEMI || m_join_type == JoinType::ANTI) && m_extra_condition == nullptr) { // find() has a better average complexity than equal_range() (constant vs. // linear in the number of matching elements). And for semijoins, we are // only interested in the first match anyways, so this may give a nice // speedup. An exception to this is if we have any "extra" conditions that // needs to be evaluated after the hash table lookup, but before the row is // returned; we may need to read through the entire hash table to find a row // that satisfies the extra condition(s). m_hash_map_iterator = m_row_buffer.find(key); m_hash_map_end = m_row_buffer.end(); } else { auto range = m_row_buffer.equal_range(key); m_hash_map_iterator = range.first; m_hash_map_end = range.second; } //ok,join后的数据已经准备好了,设置状态为,下一次循环直接将结果返回给用户 m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE; }
-
关于
HashJoinIterator::ReadNextJoinedRowFromHashTable()
函数int HashJoinIterator::ReadNextJoinedRowFromHashTable() { int res; bool passes_extra_conditions = false; do { //将jointable里面的行读取出来,拷贝qep_tab()->table()->records数组里。注意的是,之前存储的时候,采用的是pack的方式存储的,现在还原出来需要unpack。至于probe表的数据,因为已经在record数组里面的,所以这里也就不需要任何额外的操作了。 res = ReadJoinedRow(); // ReadJoinedRow() can only return 0 (row is ready) or -1 (EOF). DBUG_ASSERT(res == 0 || res == -1); // Evaluate any extra conditions that are attached to this iterator before // we return a row. if (res == 0) { //一些额外的condition evaluate,不是filter,filter会专门创建iterator来执行。 passes_extra_conditions = JoinedRowPassesExtraConditions(); if (thd()->is_error()) { // Evaluation of extra conditions raised an error, so abort the join. return 1; } if (!passes_extra_conditions) { // Advance to the next matching row in the hash table. Note that the // iterator stays in the state READING_FIRST_ROW_FROM_HASH_TABLE even // though we are not actually reading the first row anymore. This is // because WriteProbeRowToDiskIfApplicable() needs to know if this is // the first row that matches both the join condition and any extra // conditions; only unmatched rows will be written to disk. ++m_hash_map_iterator; } } } while (res == 0 && !passes_extra_conditions); // The row passed all extra conditions (or we are out of rows in the hash // table), so we can now write the row to disk. // Inner and outer joins: Write out all rows from the probe input (given that // we have degraded into on-disk hash join). // Semijoin and antijoin: Write out rows that do not have any matching row in // the hash table. //检查是否需要下盘 if (WriteProbeRowToDiskIfApplicable()) { return 1; } //外查询的null处理 if (res == -1) { // If we did not find a matching row in the hash table, antijoin and outer // join should ouput the last row read from the probe input together with a // NULL-complemented row from the build input. However, in case of on-disk // antijoin, a row from the probe input can match a row from the build input // that has already been written out to disk. So for on-disk antijoin, we // cannot output any rows until we have started reading from chunk files. // // On-disk outer join is a bit more tricky; we can only output a // NULL-complemented row if the probe row did not match anything from the // build input while doing any of the probe phases. We can have multiple // probe phases if e.g. a build chunk file is too big to fit in memory; we // would have to read the build chunk in multiple smaller chunks while doing // a probe phase for each of these smaller chunks. To keep track of this, // each probe row is prefixed with a match flag in the chunk files. bool return_null_complemented_row = false; if ((on_disk_hash_join() && m_current_chunk == -1) || m_write_to_probe_row_saving) { return_null_complemented_row = false; } else if (m_join_type == JoinType::ANTI) { return_null_complemented_row = true; } else if (m_join_type == JoinType::OUTER && m_state == State::READING_FIRST_ROW_FROM_HASH_TABLE && !m_probe_row_match_flag) { return_null_complemented_row = true; } SetReadingProbeRowState(); if (return_null_complemented_row) { m_build_input->SetNullRowFlag(true); return 0; } return -1; } // 省略部分代码 ++m_hash_map_iterator; return 0; }
-
3 总结
可以看到,在Mysql的处理中,如果join条件里面没有等值查询的话,直接的表现就是HashJoinIterator
里面的m_join_conditions
值为空,那么在往HashTable
里面emplace
的时候,创建的Key对象,代码如下:
//将查询到的数据和join key封装为Key和BufferRow对象,放入到map中
m_last_row_stored = m_hash_map->emplace(Key(join_key_data, join_key_size),
BufferRow(row, row_size));
此时的join_key_data
固定为NULL
,而join_key_size
固定为0,也就是说,这个map
里面的key
只有一个,与之对应的value
则有多个。实际上表现出来的结果就是做了一个简单的笛卡尔积,然后在进行条件过滤,本质上并没有进行传统HashJoin
的probe
操作。