TiDB 利用binlog 恢复-反解析binlog

我们知道TiDB的binlog记录了所有已经执行成功的dml语句,类似mysql binlog row模式

,TiDB官方也提供了reparo可以进行解析binlog,如下所示:

[2024/04/26 20:58:02.136 +08:00] [INFO] [config.go:153] ["Parsed start TSO"] [ts=449217508147200000]
[2024/04/26 20:58:02.136 +08:00] [INFO] [config.go:160] ["Parsed stop TSO"] [ts=449222855884800000]
schema: coupon_trade; table: coupon_trade_record; type: Insert
coupon_id(varchar): 1
merchant_id(varchar): 2
coupon_code(varchar): 4
customer_id(varchar): 4
customer_mobile_no(varchar): 5
trade_channel(varchar): 03
trade_store_id(varchar): 1440040
trade_store_name(varchar): <nil>
bill_type(tinyint): 0
bill_code(varchar): 6
external_order_id(varchar): <nil>
receive_channel_code(varchar): 105
coupon_card_type(tinyint): 0
coupon_type(tinyint): 1
receive_type(smallint): 6
receive_src_code(varchar): <nil>
process_type(tinyint): 2
order_food_type(varchar): 1
order_mode(varchar): 0
is_give_away(tinyint): 0
is_available(tinyint): 1
is_deleted(bigint): 0
updated_date(datetime): 2024-04-21 00:00:00
created_date(datetime): 2024-04-21 00:00:00
updated_user(varchar): default
created_user(varchar): default
version(int): 1
schema: coupon_trade; table: coupon_trade_record; type: Insert
coupon_id(varchar): 2
merchant_id(varchar): <nil>
coupon_code(varchar): 5
customer_id(varchar): 6
customer_mobile_no(varchar): 7
trade_channel(varchar): 03
trade_store_id(varchar): 1420452
trade_store_name(varchar): <nil>
bill_type(tinyint): 0
bill_code(varchar): 8
external_order_id(varchar): <nil>
receive_channel_code(varchar): 03
coupon_card_type(tinyint): 0
coupon_type(tinyint): 0
receive_type(smallint): 4
receive_src_code(varchar): <nil>
process_type(tinyint): 2
order_food_type(varchar): 1
order_mode(varchar): 0
is_give_away(tinyint): 0
is_available(tinyint): 1
is_deleted(bigint): 0
updated_date(datetime): 2024-04-21 00:00:00
created_date(datetime): 2024-04-21 00:00:00
updated_user(varchar): default
created_user(varchar): default
version(int): 1

另外reparo 支持print 和mysql 两种模式,print只做解析打印到标准输出,不执行 SQL,mysql:是直接再下游数据库执行SQL

但是我们有的时候需要进去反解析,比如我们误删除了一些数据,我们要把误删除的数据解析成INSERT语句,这怎么办呢,TIDB目前不提供这种反解析工具,于是自己写了一个工具进行解析,代码如下:

package main

import (
	"bufio"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"regexp"
	"strings"
	"time"
)

var (
	binlogFile string
	schema     string
	table      string
	sqlType    string
	whereSql   string
	logPath    string
	filedRe    = regexp.MustCompile(`\(.*?\)+:.*`)
	logOutFile *log.Logger
)

type filed struct {
	Name  string `json:"name"`
	Value string `json:"value"`
}
type Parser struct {
	lines []filed
}

//	type lists struct {
//		filed []filed
//	}
func compressStr(str string) string {
	str = strings.ReplaceAll(str, " ", "")
	r := strings.NewReplacer("\r", "", "\n", "")
	str = r.Replace(str)

	//匹配一个或多个空白符的正则表达式
	reg := regexp.MustCompile("\\s+")
	return reg.ReplaceAllString(str, "")
}

func main() {
	flag.StringVar(&binlogFile, "binlogFile", "", "binlog日志文件路径")
	flag.StringVar(&schema, "schema", "", "要解析的数据库名称")
	flag.StringVar(&table, "table", "", "要解析的表名称")
	flag.StringVar(&sqlType, "sqlType", "", "要解析的dml类型")
	flag.StringVar(&logPath, "logPath", "", "输出文件路径名称")
	flag.Parse()
	if binlogFile == "" {
		log.Println("请输入binlog日志文件路径...")
		return
	}
	if schema == "" {
		log.Println("请输入要解析的数据库名称...")
		return
	}
	if table == "" {
		log.Println("请输入要解析的表名称...")
		return
	}
	if sqlType == "" {
		log.Println("请输入要解析的dml类型...")
		return
	}
	if logPath == "" {
		log.Println("请输入输出日志文件路径...")
		return
	}
	outSlowLogFile(logPath)
	file, err := os.Open(binlogFile)
	if err != nil {
		log.Println("读取文件失败...")
		return
	}
	schemaRe := fmt.Sprintf("schema:%v;table:%v;type:%v", schema, table, sqlType)
	//schemaRe2 := fmt.Sprintf("schema:%v;table:%v;type:%v", schema, table, "Insert")
	defer file.Close()
	scanner := bufio.NewScanner(file)
	scanner.Buffer(make([]byte, 1024*1024), 1024*1024*10)
	inHeader := false
	var f filed
	var array []string
	var time2, _ = time.Parse("2006-01-02 15:04:05", "2024-04-01 00:00:00")
	fields := ""
	for scanner.Scan() {
		line := scanner.Text()
		if compressStr(line) == compressStr(schemaRe) {
			if fields != "" {
				fields = fmt.Sprintf("{%v}", fields)
				array = append(array, fields)
			}
			fields = ""

			inHeader = true
			continue
		} else if strings.Contains(line, "sync binlog success") || strings.Contains(line, "read file end") {
			inHeader = false
			continue
		} else {
			if inHeader {
				if filedRe.MatchString(line) {
					f.Name = strings.Split(line, "(")[0]
					tmpValue := strings.Split(line, ": ")
					if len(tmpValue) == 1 {
						inHeader = false
						continue
					} else {
						f.Value = strings.Split(line, ": ")[1]
					}

					if fields == "" {
						fields = fmt.Sprintf("\"%s\":\"%s\" ", f.Name, f.Value)
					} else {
						fields += fmt.Sprintf(",\"%s\":\"%s\"", f.Name, f.Value)
					}

				} else {
					inHeader = false
					continue
				}
			}
		}

	}
	if fields != "" {
		fields = fmt.Sprintf("{%v}", fields)
		array = append(array, fields)
	}
	if err := scanner.Err(); err != nil {
		log.Println(err)
	}
	for i := 0; i < len(array); i++ {
		//decoder := json.NewDecoder(strings.NewReader(array[i]))
		//
		//for key, value := range JsonToMap(array[i]) {
		//	fmt.Printf("键:%v,值:%d\n", key, value)
		//}
		m := make(map[string]string)

		err = json.Unmarshal([]byte(array[i]), &m)
		if err != nil {
			fmt.Printf("Unmarshal with error: %+v", err)
		}
		keys := ""
		values := ""
		isExec := false
		//newKeys := make([]string, 0, len(m))
		//for k := range m {
		//	newKeys = append(newKeys, k)
		//}
		 对切片进行排序
		//sort.Strings(newKeys)
		for key, value := range m {
			if keys == "" {
				keys = fmt.Sprintf("%v", key)
				if value == "<nil>" {
					values = fmt.Sprintf("%v", "NULL")
				} else {
					values = fmt.Sprintf("'%v'", value)
				}
			} else {
				keys += fmt.Sprintf(",%v", key)
				if value == "<nil>" {
					values += fmt.Sprintf(",%v", "NULL")
				} else {
					values += fmt.Sprintf(",'%v'", value)
				}
			}

			if key == "updated_date" {
				isExec = true
				updateTime, _ := time.Parse("2006-01-02 15:04:05", value)
				if updateTime.After(time2) {
					isExec = true
				} else {
					isExec = false
				}
			}
		}
		if isExec {
			inSql := fmt.Sprintf("insert into coupon_trade_record(%v) values(%v);", keys, values)
			logOutFile.Println(inSql)
		}

	}
}

func outSlowLogFile(outFile string) {
	outFilePath, err := os.OpenFile(outFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)
	if err != nil {
		log.Println(fmt.Sprintf("创建输出文件失败:%s", err))
		return
	}
	logOutFile = log.New(outFilePath, "", log.Lmsgprefix)
}

通过代码可以将DELETE sql直接转成insert 语句:

 至此完成将数据重新插入到业务库里面,即可完成恢复

上一篇:【Docker学习】docker run的--annotation选项


下一篇:【论文笔记 | 异步联邦】PORT:How Asynchronous can Federated Learning Be?