MySQL · 源码分析 · change master to

重要数据结构

MySQL · 源码分析 · change master to


Rpl_info 的基类,保存了一些错误信息,如 IO/SQL thread last error

class Slave_reporting_capability
{

  // 获取last error
  Error const& last_error() const { return m_last_error; }

}


Master_info 、Relay_log_info 的基类,很多重要的锁和信号量都在这里


class Rpl_info : public Slave_reporting_capability
{
  /*
    为了避免死锁,需要按照以下顺序加锁
    run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
    run_lock, sleep_lock
    run_lock, info_thd_lock
    
    info_thd_lock 保护对 info_thd 的操作
    读操作需要获取 info_thd_lock 或 run_lock
    写操作需要获取 info_thd_lock 和 run_lock
    
    data_lock : 保护对数据的读写
    run_lock  : 保护运行状态,变量 slave_running, slave_run_id
    sleep_lock: 对slave_sleep做互斥,防止同时进入多个slave_sleep
  */
  mysql_mutex_t data_lock, run_lock, sleep_lock, info_thd_lock;

  /*
    data_cond: data_lock 保护的数据被修改时发出此信号楼,只有 Relay_log_info 会使用
    start_cond: sql thread start (start slave 先启动 io thread,后启动 sql thread,sql thread 启动代表全部启动成功)
    stop_cond: sql/io thread stop
    sleep_cond: slave被kill,目前只发现5.6中有应用,5.7 中没找到发出此信号量的代码
  */
  mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond;
  
  /* 
    关联 io/sql/worker thread 的 thd
    Master_info 对应 io thread
    Relay_log_info 对应 sql thread
    Slave_worker 对应 worker thread 
  */
  THD *info_thd;
  
  
  /* 已初始化 */
  bool inited;
  
  /* 已终止slave */
  volatile bool abort_slave;
  
  /*
    当前slave运行状态,io thread 即 mi->slave_running有三种状态
    MYSQL_SLAVE_NOT_RUN         0
	MYSQL_SLAVE_RUN_NOT_CONNECT 1
	MYSQL_SLAVE_RUN_CONNECT     2 
	
	sql thread 即 mi->slave_running 只有 0 1两种状态
  */
  volatile uint slave_running;
  
  /* 用于判断slave thread 是否启动的变量,每次启动时递增1 */
  volatile ulong slave_run_id;

  /* repository's handler */
  Rpl_info_handler *handler;
  
  /* 
    唯一标识一条 info 记录的 id,标识一行或一个文件
    Master_info 和 Relay_log_info 可以通过 channel 标识唯一
    Worker_info 需要通过 {id, channel} 标识唯一
  */
  uint internal_id;

  /* 通道名,多源复制使用 */
  char channel[CHANNEL_NAME_LENGTH+1];
  
  /* 实现了两个虚函数,读写 Rpl_info */
  virtual bool read_info(Rpl_info_handler *from)= 0;
  virtual bool write_info(Rpl_info_handler *to)= 0;
}

/*
  Master_info 用户 IO thread
  主要保存以下信息:
    连接到Master的用户信息
    当前 master log name
    当前 master log offset
    一些其他控制变量
    
  Master_info 读取 master.info repository 初始化,通常是表或文件
  通过函数 mi_init_info() 进行初始化
  
  调用 flush_info() 可以将 master.info 写入磁盘,每次从master读取数据都需要刷盘
*/
class Master_info : public Rpl_info
{
  /* 前面保存了 user、host 等连接信息 */
  /* 下面是连接信息的 set/get 函数 */

  /* 和 master 的连接 */
  MYSQL* mysql;
  
  
  /* 对应的 Relay_log_info */        
  Relay_log_info *rli;
  
  /* IO 线程复制延迟 */
  long clock_diff_with_master;
  
  /* 心跳间隔 */
  float heartbeat_period;         // interface with CHANGE MASTER or master.info
  
  /* 收到心跳次数 */
  ulonglong received_heartbeats;  // counter of received heartbeat events

  /* 上次心跳时间 */
  time_t last_heartbeat;

  /* 上次收到event的时间 */
  time_t last_recv_event_timestamp;

  /* 忽略复制的server_id */
  Server_ids *ignore_server_ids;

  /* master server_id */
  ulong master_id;
  
  /* FORMAT_DESCRIPTION_EVENT前的checksum 算法 */
  binary_log::enum_binlog_checksum_alg checksum_alg_before_fd;
  
  /* 初始化 Master_info, 里面调用read_info() 从 Rpl_info_handler 中读取信息 */
  int mi_init_info();
  
  /* 清理 Master_info,里面会调用 Rpl_info_handler 的 eng_info() */
  void end_info();
  
  /* Master_info 信息落盘,每次从主库读取数据后都会执行 */
  int flush_info(bool force= FALSE);
  
  /*
    从master收到的 Format_description_log_event 写在 relay log 末尾
    
    IO thread 开始时创建,IO thread 结束时销毁
    IO thread 收到一个 Format_description_log_event 时更新
    每次rotate时,IO thread 写Format_description_log_event到新relay log
    每次执行FLUSH LOGS,client 写Format_description_log_event到新relay log
  */
  Format_description_log_event *mi_description_event;

  /* 最近一个GTID,可能是未完成事务的GTID,用于事务结束时写入 Retrieved_Gtid_Set */
  Gtid last_gtid_queued;
  
  /* 用于判断事务边界 */
  Transaction_boundary_parser transaction_parser;
  
  /* 
    channel lock,以下操作需要持有写锁
    START SLAVE;
	STOP SLAVE;
	CHANGE MASTER;
	RESET SLAVE;
	end_slave();
  */
  Checkable_rwlock *m_channel_lock;
	
  /* channel 被引用的次数,只有为0时可以删除channel */
  Atomic_int32 references;
}

/*
  主要保存以下信息:
    当前relay log
    当前relay log offset
    master log name
    与上次更新对应的主库日志序列
    sql thread 其他信息

	初始化过程和 Master_info 类似
	
	以下情况下 relay.info table/file 需要更新:
	1. relay log file rotated
	2. SQL thread stopped
	3. while processing a Xid_log_event
	4. after a Query_log_event(commit or rollback)
	5. after processing any statement written to the binary log without a transaction context.
	
	并行复制相关代码留作以后分析,本次暂不涉及
*/
class Relay_log_info : public Rpl_info
{
  /* 备份状态标志位,用于标志是否在语句中 */
  enum enum_state_flag {
    /* 在语句中 */
    IN_STMT,

    /** Flag counter.  Should always be last */
    STATE_FLAGS_COUNT
  };
  
  /* 是否复制相同server_id的event,一般是false */
  bool replicate_same_server_id;
  
  /* 正在执行或最后一个执行的GTID,用来填充 performance_schema.replication_applier_status_by_worke 的 last_seen_transaction 列
  */
  Gtid_specification currently_executing_gtid;
  
  
  /* 读取下面变量时,必须受data_lock保护 */
  
  /* 当前relay log的文件描述符 */
  File cur_log_fd;
  
  /* reay_log 对象,MYSQL_BIN_LOG类留作以后分析*/
  MYSQL_BIN_LOG relay_log;
  
  /* 主要用于查询log_pos */
  LOG_INFO linfo;
  
  /*
    cur_log 指向 relay_log.get_log_file() 或者 cache_buf
    取决于relay_log_file是热日志,还是需要打开冷日志
    
    cache_buf 在打开冷日志时适用
  */
  IO_CACHE cache_buf,*cur_log;
  
  /* 标识是否正在recovery */
  bool is_relay_log_recovery;
  
  /* 下面的变量可以不加锁读 */
  
  /*
    restart slave 时需要访问临时表。
    这个变量值在 init/end 时修改
    SQL thread 只读
  */
  TABLE *save_temporary_tables;
  
  /* 对应的 Master_info */
  Master_info *mi;
  
  /* 打开临时表的数量 */
  Atomic_int32 channel_open_temp_tables;
  
  /* 保存 relay_log.get_open_count() */
  uint32 cur_log_old_open_count;
  
  /* init_info() 曾经失败,RESET SLAVE 可以修复错误 */
  bool error_on_rli_init_info;
  
  /* 这里跳过一些 Group replication 相关变量 */
  
  /* received gtid set */
  Gtid_set gtid_set;
  
  /* 标识此对象是否属于SQL线程(属于SQL线程为0) */
  bool rli_fake;
  
  /* 标识 retrieved GTID set 是否已被初始化 */
  bool gtid_retrieved_initialized;
  
  /* 上一个错误的GTID */
  Gtid last_sql_error_gtid;
  
  
  /* 日志空间限制,日志空间总量(用于sys_var:relay_log_space,控制relay log空间) */
  ulonglong log_space_limit,log_space_total;
  
  /* 是否忽略日志空间限制 */
  bool ignore_log_space_limit;

  
  /* 需要清理空间时,SQL线程指示IO线程rotate logs */
  bool sql_force_rotate_relay;
  
  /* 上次主库记录binlog的时间 */
  time_t last_master_timestamp;
  
  /* 上次执行event的时间 */
  time_t last_exec_event_timestamp;
  
  /* 跳过error event */
  volatile uint32 slave_skip_counter;
  
  /* 标记是否需要中断pos_wait,change master 和 reset slave时需要中断 */
  volatile ulong abort_pos_wait;
  
  /* log_space 相关信号量*/
  mysql_mutex_t log_space_lock;
  mysql_cond_t log_space_cond;

  /* 这里有一些 START SLAVE UNTIL 相关变量 */
  
  /* 重试事务次数(trans_retries是重试次数上限),重试事务计数(retried_trans记录重试了多少次) */
  ulong trans_retries, retried_trans;
 
  /* 
    延迟复制时间
    CHANGE MASTER TO MASTER_DELAY=X.
    由data_lock保护, SQL thread 读取
    SQL thread 运行时该变量不可写
  */
  time_t sql_delay;
  
  /* sql_delay 结束时间 */
  time_t sql_delay_end;

  /* enum_state_flag 的标志位 */
  uint32 m_flags;  
}

函数分析

mysql_execute_command() 当做入口开始分析,可以看出change_master需要SUPER权限


  case SQLCOM_CHANGE_MASTER:
  {
    if (check_global_access(thd, SUPER_ACL))
      goto error;
    res= change_master_cmd(thd);
    break;
  }
  
/*
  函数具备以下功能
    更改接收/执行日志的配置/位点
    purge relay log
    删除 worker info(并行复制使用)
*/

/* 分析 change_master 函数会略过一部分逻辑 */
int change_master(THD* thd, Master_info* mi, LEX_MASTER_INFO* lex_mi,
                  bool preserve_logs)
{
  /*
    如果SQL thread 和 IO thread已经停止,并且没有指定 relay_log_pos和relay_log_file
    会purge relay log
  */
  bool need_relay_log_purge= 1;

  /* 为了修改 mysql.slave_master_info,需要无视read_only和super_read_only */
  thd->set_skip_readonly_check();
  
  /* channel 加读写锁,即将对channel做修改,函数结束时才会释放锁 */
  mi->channel_wrlock();
  
  /*
    对 mi->run_lock 和 rli->run_lock 加锁
    防止线程运行状态发生变化
  */
  lock_slave_threads(mi);

  /* 设置thread_mask,用来标识 IO/SQL thread 的运行状态 */
  init_thread_mask(&thread_mask, mi, 0);

  /* 设置auto_position=1需要IO/SQL thread 都不在运行状态,否则报错退出 */
  if (thread_mask)
  {
    if (lex_mi->auto_position != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
    {
      error= ER_SLAVE_CHANNEL_MUST_STOP;
      my_error(ER_SLAVE_CHANNEL_MUST_STOP, MYF(0), mi->get_channel());
      goto err;
    }
    
    /* 如果 SQL thread 和 IO thread 没有全部停止,不能purge relay log */
    need_relay_log_purge= 0;
  }

  
  /*
    下面是一些错误判断,都是很明显的错误
      1. 如果设置了auto_position,同时又指定了复制位点,如 relay_log_pos,报错退出
      2. auto_position 需要 GTID_MODE != OFF
      3. IO thread 运行时不能改变 IO thread 相关配置
      4. SQL thread 运行时不能改变 SQL thread 相关配置
      5. 如果指定了master_host,那么master_host不能是空串  
  */

  /* 记录当前状态 */
  THD_STAGE_INFO(thd, stage_changing_master); 

  /* 标识停止的线程,给load_mi_and_rli_from_repositories()使用 */
  init_thread_mask(&thread_mask_stopped_threads, mi, 1);

  /* 
    从仓库加载 mi 和 rli 的配置
    只有停止状态的线程可以加载配置(SQL thread 对应 rli,IO thread 对应 mi)
  */
  if (load_mi_and_rli_from_repositories(mi, false, thread_mask_stopped_threads))                                                                                                                            
  {
    error= ER_MASTER_INFO;
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
    goto err;
  }

  /* 
    修改mi相关配置,并保存老配置
    save_ 变量中保存的老配置用于打印日志
  */
  if (have_receive_option)
  {
    strmake(saved_host, mi->host, HOSTNAME_LENGTH);
    strmake(saved_bind_addr, mi->bind_addr, HOSTNAME_LENGTH);
    saved_port= mi->port;
    strmake(saved_log_name, mi->get_master_log_name(), FN_REFLEN - 1);
    saved_log_pos= mi->get_master_log_pos();

    if ((error= change_receive_options(thd, lex_mi, mi)))                                                                                                                                                   
    {
      goto err;
    }
  }

  /* 打印日志,change master 的源值和目标值 */
  if (have_receive_option)
    sql_print_information("'CHANGE MASTER TO%s executed'. "
      "Previous state master_host='%s', master_port= %u, master_log_file='%s', "
      "master_log_pos= %ld, master_bind='%s'. "
      "New state master_host='%s', master_port= %u, master_log_file='%s', "
      "master_log_pos= %ld, master_bind='%s'.",
      mi->get_for_channel_str(true),
      saved_host, saved_port, saved_log_name, (ulong) saved_log_pos,
      saved_bind_addr, mi->host, mi->port, mi->get_master_log_name(),
      (ulong) mi->get_master_log_pos(), mi->bind_addr);

  /* 修改rli相关配置 */
  if (have_execute_option)
    change_execute_options(lex_mi, mi);
    
  /* 持久化master_info */
  if ((thread_mask & SLAVE_IO) == 0 && flush_master_info(mi, true))
  {
    error= ER_RELAY_LOG_INIT;
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
    goto err;
  }
  
  if ((thread_mask & SLAVE_SQL) == 0)
  {

    /* 记录全局变量 relay_log_purge */                                                                                                                                                            
    bool save_relay_log_purge= relay_log_purge;

    if (need_relay_log_purge)
    {
      const char* errmsg= 0;

      /* purge relay logs */
      relay_log_purge= 1;
      THD_STAGE_INFO(thd, stage_purging_old_relay_logs);
      if (mi->rli->purge_relay_logs(thd,
                                    0 /* not only reset, but also reinit */,
                                    &errmsg))
      {
        error= ER_RELAY_LOG_FAIL;
        my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
        goto err;
      }
    }
    else
    {

      const char* msg;
      relay_log_purge= 0;

      DBUG_ASSERT(mi->rli->inited);
      
      /*初始化 relay_log_pos */
      if (mi->rli->init_relay_log_pos(mi->rli->get_group_relay_log_name(),
                                      mi->rli->get_group_relay_log_pos(),
                                      true/*we do need mi->rli->data_lock*/,
                                      &msg, 0))
      {
        error= ER_RELAY_LOG_INIT;
        my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
        goto err;
      }
    }
    
    /* 恢复全局变量 relay_log_purge 的值 */
    relay_log_purge= save_relay_log_purge;

    /* 清理until condition */
    mi->rli->clear_until_condition();

    /* relay_log_info 持久化到磁盘 */
    if (mi->rli->flush_info(true))
    {
      error= ER_RELAY_LOG_INIT;
      my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush relay info file.");
      goto err;
    }

  }

/* 出错后跳到次数,释放之前申请的锁 */
err:

  unlock_slave_threads(mi);
  mi->channel_unlock();
  DBUG_RETURN(error);

}

总结

change master 主要功能是修改 SQL 和 IO 线程的配置信息,执行时可能会purge relay log

没有特殊情况,建议指定auto_position=1,不要自己指定复制位点,避免数据丢失风险

如需对change master 做修改,需要注意在锁保护下修改变量,同时注意加锁顺序,避免死锁

上一篇:从零到百万级iot系统架构演进(会不断更新


下一篇:无人驾驶汽车激光雷达系统存安全缺陷 被破解寸步难行