MySQL数据传输注意事项

文章目录

背景

业务方迁移时,总的数据量大小只有300多M,但是迁移时MySQL server提示packet超过了max_allowed_packet,而在测试迁移时10G的压测数据反而没有发生任何问题?

该问题最终的解决方案很简单,调整合适的max_allowed_packet的大小、新建连接进行操作,

但我们可以借此深入了解一波MySQL Packet相关知识,比如MySQL是在哪里对包大小做了限制等等

net_buffer_length

版本 mysqldump mysql client mysql server
5.7 default:1MB
max:16MB
default:16KB default:16KB
max:1MB
8.0 default:1MB default:16KB default:16KB

max_allowed_packet

max allowed packet设置MySQL Server和Client之间任何单个消息的大小的上限,包括副本(主从或者MGR)

版本 mysqldump mysql client mysql server
5.5 default:24MB default:16MB default:1MB
5.7 default:24MB default:16MB default:4MB
8.0 default:24MB default:16MB default:64MB

这里需要提到的一个MySQL的API函数:mysql_stmt_send_long_data()

该函数的作用是(使用预处理功能时)允许将参数数据分块多次的发送给服务器,例如,当blob或者text的大小超过max_allowed_packet的大小。前提是该列类型必须是TEXTBLOB数据类型

作用

上述两个参数的作用为:每个包发送到网络或者从网络读包都会先把数据包保存在net->buff里,待到net->buff满了或者一次命令结束才会通过socket发出给对端。net->buff有个初始大小(net->max_packet),会随读取数据的增多而扩展直到max_allowed_packet的大小

相关源码

MySQL Packet结构

MySQL 的客户端和服务端交互是以数据包(Packet) 为单位进行的, 每个包的大小长度有限制, 最长为 2^24−1 个字节(即 16MB), 若包长度过大, 则客户端需要自行将包分片, 使得每段的长度在 MySQL 包的最大长度之下, MySQL Packet 由 HeaderBody 组成,。

Header 包含两个字段: 包长度(payload_length)、序列号(sequence_id), Body 则是包的主体部分, 它的长度由 Header 中的 payload_length 字段指示。

其中payload_length占3字节,sequence_id占1字节。

所以,MySQL一个Pakcet的最大长度为 16M + 4 字节,大于 max_packet_size(2^24-1字节)的数据都会被拆包发送

MySQL数据传输注意事项

接收方

my_net_read()

大包处理,最后都会循环调用net_read_packet()直到满足退出条件

ulong my_net_read(NET *net) {
  size_t len;
  /* turn off non blocking operations */
  if (!vio_is_blocking(net->vio)) vio_set_blocking_flag(net->vio, true);

  // 是否启用压缩
  if (net->compress)
    net_read_compressed_packet(net, len);
  else
    net_read_uncompressed_packet(net, len);

  return static_cast<ulong>(len);
}

两者类似,我们看一下读取未压缩的packet过程

net_read_uncompressed_packet()

将一个数据包读入net->buff + net->where_b,如果是多包报文的第一个报文(由数据包的长度= 0xffffff表示[16M]),则读取并拼接所有子包。

static void net_read_uncompressed_packet(NET *net, size_t &len) {
  size_t complen;
  assert(!net->compress);
  // 第一次读取包,并返回包长
  len = net_read_packet(net, &complen);
  // 包长度为为0xffff时,循环读取后续包
  // 直到后续包长度不为0xffff
  if (len == MAX_PACKET_LENGTH) {
    /* First packet of a multi-packet.  Concatenate the packets */
    ulong save_pos = net->where_b;
    size_t total_length = 0;
    do {
      net->where_b += len;
      total_length += len;
      len = net_read_packet(net, &complen);
    } while (len == MAX_PACKET_LENGTH);
    if (len != packet_error) len += total_length;
    net->where_b = save_pos;
  }
  net->read_pos = net->buff + net->where_b;
  if (len != packet_error)
    net->read_pos[len] = 0; /* Safeguard for mysql_use_result */
}

net_read_packet()

将packet数据读入buffer中, 并返回当前接收到的这个packet的长度

/*
   @return The length of the packet, or @c packet_error on error.
*/
static size_t net_read_packet(NET *net, size_t *complen) {
  size_t pkt_len, pkt_data_len;
  ...
  /* Retrieve packet length and number. */
  if (net_read_packet_header(net)) goto error;
  ...
  /* 读取packet的前3个字节,获取当前收到packet的payload_length */
  pkt_len = uint3korr(net->buff + net->where_b);
  ...
  /* 计算包括之前一共获取到的packet数据长度 */
  pkt_data_len = max(pkt_len, *complen) + net->where_b;
  ...
  /* 
  	 通过net_realloc()扩容net_buffer
  	 若总的packet长度超过max_packet(16M),但是并未超过
  	 max_allowed_packet时,正常执行扩容net_buffer并读入payload数据
  */
  if ((pkt_data_len >= net->max_packet) && net_realloc(net, pkt_data_len))
    goto error;

  /* Read the packet data (payload). */
  if (net_read_raw_loop(net, pkt_len)) goto error;
  ...
}

#define uint3korr(A)  (uint32_t) (((uint32_t) ((uint8_t) (A)[0])) +\
                                  (((uint32_t) ((uint8_t) (A)[1])) << 8) +\
                                  (((uint32_t) ((uint8_t) (A)[2])) << 16))

net_realloc()

net_buffer动态扩容,总buffer length不能超过 max_allowed_packet大小

bool net_realloc(NET *net, size_t length) {
  uchar *buff;
  size_t pkt_length;
  DBUG_TRACE;
  DBUG_PRINT("enter", ("length: %lu", (ulong)length));

  // 当总的packet长度已经超过max_allowed_packet大小
  // 记录错误,返回true
  // 可以在mysql server日志中看到ER_NET_PACKET_TOO_LARGE的错误
  if (length >= net->max_packet_size) {
    DBUG_PRINT("error",
               ("Packet too large. Max size: %lu", net->max_packet_size));
    /* Error, but no need to stop using the socket. */
    net->error = NET_ERROR_SOCKET_RECOVERABLE;
    net->last_errno = ER_NET_PACKET_TOO_LARGE;
#ifdef MYSQL_SERVER
    my_error(ER_NET_PACKET_TOO_LARGE, MYF(0));
#endif
    return true;
  }
  pkt_length = (length + IO_SIZE - 1) & ~(IO_SIZE - 1);
  ...
#ifdef MYSQL_SERVER
  net->buff = net->write_pos = buff;
#else
  size_t cur_pos_offset = NET_ASYNC_DATA(net)->cur_pos - net->buff;
  net->buff = net->write_pos = buff;
  NET_ASYNC_DATA(net)->cur_pos = net->buff + cur_pos_offset;
#endif
  net->buff_end = buff + (net->max_packet = (ulong)pkt_length);
  return false;
}

发送方

发送过程比较容易理解,直接用go中经常使用的Go-MySQL-Driver包为例

// Write packet buffer 'data'
func (mc *mysqlConn) writePacket(data []byte) error {
	pktLen := len(data) - 4

    // 超过包的最大值(可以修改),不同版本默认值不同
	if pktLen > mc.maxAllowedPacket {
		return ErrPktTooLarge
	}

	// Perform a stale connection check. We only perform this check for
	// the first query on a connection that has been checked out of the
	// connection pool: a fresh connection from the pool is more likely
	// to be stale, and it has not performed any previous writes that
	// could cause data corruption, so it's safe to return ErrBadConn
	// if the check fails.
	if mc.reset {
		mc.reset = false
		conn := mc.netConn
		if mc.rawConn != nil {
			conn = mc.rawConn
		}
		var err error
		// If this connection has a ReadTimeout which we've been setting on
		// reads, reset it to its default value before we attempt a non-blocking
		// read, otherwise the scheduler will just time us out before we can read
		if mc.cfg.ReadTimeout != 0 {
			err = conn.SetReadDeadline(time.Time{}) // 设置超时
		}
		if err == nil {
			err = connCheck(conn) // 检查连接
		}
		if err != nil {
			errLog.Print("closing bad idle connection: ", err)
			mc.Close()
			return driver.ErrBadConn
		}
	}

	for {
		var size int
         // 大于 1<<24 - 1字节(16M,硬编码,不允许修改,mysql server也是如此),要进行拆包发送
		if pktLen >= maxPacketSize {
            // 接收方读到这个头就会知道还有'后续包'
			data[0] = 0xff
			data[1] = 0xff
			data[2] = 0xff
			size = maxPacketSize
		} else {
			data[0] = byte(pktLen)
			data[1] = byte(pktLen >> 8)
			data[2] = byte(pktLen >> 16)
			size = pktLen
		}
		data[3] = mc.sequence // 包的编号

		// Write packet
		if mc.writeTimeout > 0 { // 写超时
			if err := mc.netConn.SetWriteDeadline(time.Now().Add(mc.writeTimeout)); err != nil {
				return err
			}
		}

		n, err := mc.netConn.Write(data[:4+size]) // 发送数据
		if err == nil && n == 4+size {
			mc.sequence++
			if size != maxPacketSize {
				return nil
			}
			pktLen -= size
			data = data[size:]
			continue // 继续发送'剩余包的数据'
		}

		// Handle error
		if err == nil { // n != len(data)
			mc.cleanup()
			errLog.Print(ErrMalformPkt)
		} else {
			if cerr := mc.canceled.Value(); cerr != nil {
				return cerr
			}
			if n == 0 && pktLen == len(data)-4 {
				// only for the first loop iteration when nothing was written yet
				return errBadConnNoWrite
			}
			mc.cleanup()
			errLog.Print(err)
		}
		return ErrInvalidConn
	}
}

注意事项

  • mysqldump的语句合并问题

需要注意的是mysqldump导出时如果不指定–net-buffer-length,但指定了–opt或–extended-insert, -e(创建多行Insert语句),那么默认单条insert语句的大小是1MB

假设这里的单条insert语句未达到1MB,

insert into t1 values(1,2);

那么mysqldump会帮你把多条insert拼接起来直到大小达到1MB,如下所示

insert into t1 values(1,2),(3,4),(5,6),(7,8).....(n,n);

假如你本来单条insert就超过了1MB,那么mysqldump是不会再拼接的

  • max_allowed_packet变量设置的问题
  1. 该系统变量需要在新的MySQL连接中才生效
  2. 该参数需要在传输双端都进行调整,否则任何一端溢出都会导致数据传输失败
  • 要考虑到max_allowed_packet设置的太大而实例内存不足的情况

总结

上面的注意事项也就解释了为什么在业务方迁移时,总的数据量大小只有300多M,但是迁移时mysql server提示packet超过了max_allowed_packet,而在测试迁移时10G的压测数据反而没有发生任何问题:

上一篇:实验3:OpenFlow协议分析实践


下一篇:SDN第三次上机实验