MySQL 8.0.22执行器源码分析HashJoin —— BuildHashTable函数细节步骤

BuildHashTable函数细节步骤

该函数位置处于hash_join_iterator.cc 403 ~ 560行

step1:如果被驱动表迭代器没有更多的行数,更新m_state为EOR,然后返回false,表明创建hash表失败

if (!m_build_iterator_has_more_rows) {
    m_state = State::END_OF_ROWS;
    return false;
}

**step2:**还原插入行缓冲区的最后一行。如果构建输入是一个嵌套循环,内部有一个过滤器,那么这是必需的。这里还不是很理解

if (m_row_buffer.Initialized() &&
    m_row_buffer.LastRowStored() != m_row_buffer.end()) {
    hash_join_buffer::LoadIntoTableBuffers(
        m_build_input_tables, m_row_buffer.LastRowStored()->second);
}

step3:清除行buffer并且将多有迭代器重新指向它。如果初始化成功,直接返回true。

if (InitRowBuffer()) {
    return true;
}

step4:初始化了两个变量

reject_duplicate_keysstore_rows_with_null_in_join_key

const bool reject_duplicate_keys = RejectDuplicateKeys();
const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;

RejectDuplicateKeys()函数返回值为true的话,说明拒绝哈希表中的重复键。当遇到半连接或反连接等相同键值只需要返回一条结果,不需要返回extra情况。

对于反连接与半连接可以参考:半连接&反连接

指明当前jointype为外连接JoinType::OUTER

step5:将被驱动表输入的SetNullRowFlag清除。这是为了防止hashjoin用于独立子查询时init被调用多次的情况,不然这个标志将被之前执行的hashjoin操作污染。

m_build_input->SetNullRowFlag(/*is_null_row=*/false);

step6:开始通过迭代器从m_build_input循环读数据,

1、如果线程被kill的话,返回true。

2、当build input为空,内连接和半连接结果也会为空,然而反连接的输出将是probe input的所有行

3、当读到build 迭代器的最后一行,这说明我们不会再去在probe 迭代器中读取数据了。这时候需要我们禁止probe row保存数据

PFSBatchMode batch_mode(m_build_input.get());
for (;;) {  // Termination condition within loop.
    int res = m_build_input->Read();
    if (res == 1) {
      DBUG_ASSERT(thd()->is_error() ||
                  thd()->killed);  // my_error should have been called.
      return true;
    }

    if (res == -1) {
      m_build_iterator_has_more_rows = false;
      // If the build input was empty, the result of inner joins and semijoins
      // will also be empty. However, if the build input was empty, the output
      // of antijoins will be all the rows from the probe input.
      if (m_row_buffer.empty() && m_join_type != JoinType::ANTI &&
          m_join_type != JoinType::OUTER) {
        m_state = State::END_OF_ROWS;
        return false;
      }

      // As we managed to read to the end of the build iterator, this is the
      // last time we will read from the probe iterator. Thus, we can disable
      // probe row saving again (it was enabled if the hash table ran out of
      // memory _and_ we were not allowed to spill to disk).
      m_write_to_probe_row_saving = false;
      SetReadingProbeRowState();
      return false;
    }

step7

1、请求所有表的行ID

2、存储当前位于表记录缓冲区中的行,将其放到store_row_result中

3、根据store_row_result状态进行处理

  • 如果是*ROW_STORED*,说明已经存储完毕,直接break

     case hash_join_buffer::StoreRowResult::ROW_STORED:
            break;
    
  • 如果是BUFFER_FULL,说明缓存区已经满了.

    如果允许的话,向磁盘操作。如果不允许向磁盘操作,就继续从probe 迭代器中读取数据,并且开启probe row保存,这样没有匹配的probe rows将被写到saving file中。在下一次refill hash表的时候,从saving file中读取probe row。

    if (!m_allow_spill_to_disk) {
        if (m_join_type != JoinType::INNER) {
            // Enable probe row saving, so that unmatched probe rows are written
            // to the probe row saving file. After the next refill of the hash
            // table, we will read rows from the probe row saving file, ensuring
            // that we only read unmatched probe rows.
            InitWritingToProbeRowSavingFile();
        }
        SetReadingProbeRowState();
        return false;
    }
    // If we are not allowed to spill to disk, just go on to reading from
            // the probe iterator.
    if (!m_allow_spill_to_disk) {
        if (m_join_type != JoinType::INNER) {
            // Enable probe row saving, so that unmatched probe rows are written
            // to the probe row saving file. After the next refill of the hash
            // table, we will read rows from the probe row saving file, ensuring
            // that we only read unmatched probe rows.
            InitWritingToProbeRowSavingFile();
        }
        SetReadingProbeRowState();
        return false;
    }
    

    初始化两个input的hashjoinchunk。估计需要多少chunks,planner会事先给出一个数,这里会重新计算得到每个块都合适的磁盘块。

    if (InitializeChunkFiles(
        m_estimated_build_rows, m_row_buffer.size(), kMaxChunks,
        m_probe_input_tables, m_build_input_tables,
        /*include_match_flag_for_probe=*/m_join_type == JoinType::OUTER,
        &m_chunk_files_on_disk)) {
        DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
        return true;
    }
    

    将迭代器上剩余的数据写到磁盘的chunk file上,如果出现IO错误的话,返回true

    if (WriteRowsToChunks(thd(), m_build_input.get(), m_build_input_tables,
                          m_join_conditions, kChunkPartitioningHashSeed,
                          &m_chunk_files_on_disk,
                          true /* write_to_build_chunks */,
                          false /* write_rows_with_null_in_join_key */,
                          m_tables_to_get_rowid_for,
                          &m_temporary_row_and_join_key_buffer)) {
        DBUG_ASSERT(thd()->is_error() ||
                    thd()->killed);  // my_error should have been called.
        return true;
    }
    

    从build input起始地方刷新并定位所有chunk files。

    for (ChunkPair &chunk_pair : m_chunk_files_on_disk) {
        if (chunk_pair.build_chunk.Rewind()) {
            DBUG_ASSERT(thd()->is_error() ||
                        thd()->killed);  // my_error should have been called.
            return true;
        }
    }
    SetReadingProbeRowState();
    return false;
    }
    
  • 如果状态为FATAL_ERROR,说明出现意料之外的错误,可能是malloc失败。返回true。

case hash_join_buffer::StoreRowResult::FATAL_ERROR:
        // An unrecoverable error. Most likely, malloc failed, so report OOM.
        // Note that we cannot say for sure how much memory we tried to allocate
        // when failing, so just report 'join_buffer_size' as the amount of
        // memory we tried to allocate.
        my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
                 thd()->variables.join_buff_size);
        return true;
    }
上一篇:GEE数据集:CHIRPS Pentad高分辨率的全球网格降雨数据集


下一篇:php+html5实现无刷新上传,大文件分片上传,断点续传