详解 Seata Golang 客户端 AT 模式及其使用

  概述

  我们知道 Seata Java Client 的 AT 模式,通过代理数据源,实现了对业务代码无侵入的分布式事务协调机制,将与 Transaction Coordinator (TC) 交互的逻辑、Commit 的逻辑、Rollback 的逻辑,隐藏在切面和代理数据源相应的代码中,使开发者无感知。那如果这个方法,要用 Golang 来实现一遍,应该如何操作呢?关于这个问题,我想了很久,最初的设想是,对 database/sql 的 mysql driver 进行增强,在对包

  github/go-sql-driver/mysql 研究了一段时间后,还是没有头绪,不知如何下手,最后转而增强 database/sql 包。由于 AT 模式必须保证本地事务的正确处理,在具体业务开发时,首先要通过 db.Begin() 获得一个 Tx 对象,然后再 tx.Exec() 执行数据库操作,最后 txmit() 提交或 tx.Rollback() 回滚。这种处理方式算是一个 Golang 数据库事务处理的基本操作。 所以对 database/sql 的增强,我们重点关注这几个方法 db.Begin() 、 tx.Exec() 、 txmit() 、 tx.Rollback 。

  事务提交、回滚

  通过 Seata Java Client 的相关代码,我们知道,在本地事务提交的时候,主要是将分支事务注册到 TC 上,并将数据库操作产生的 undoLog 一起写入到 undoLog 表;本地事务回滚的时候,需要将分支事务(即本地事务)的执行状态报告给 TC,使 TC 好知道是否通知参与全局事务的其他分支回滚。

  func (tx *Tx) Commit() error {

  //注册分支事务

  branchId,err :=tx.register()

  if err !=nil {

  return errors.WithStack(err)

  }

  tx.tx.Context.BranchId=branchId

  if tx.tx.Context.HasUndoLog() {

  //将 undoLog 写入 undoLog 表

  err=manager.GetUndoLogManager().FlushUndoLogs(tx.tx)

  if err !=nil {

  err1 :=tx.report(false)

  if err1 !=nil {

  return errors.WithStack(err1)

  }

  return errors.WithStack(err)

  }

  err=tx.txmit()

  if err !=nil {

  err1 :=tx.report(false)

  if err1 !=nil {

  return errors.WithStack(err1)

  }

  return errors.WithStack(err)

  }

  } else {

  return tx.txmit()

  }

  if tx.reportSuccessEnable {

  tx.report(true)

  }

  tx.tx.Context.Reset()

  return nil

  }

  db.Begin() 会产生一个 Tx 对象, tx.Exec() 会产生 undoLog, txmit() 将 undoLog 刷到数据库中。那么 undoLog 保存到哪里呢?答案是 Tx_Context 中。

  type TxContext struct {

  *context.RootContext

  Xid string

  BranchId int64

  IsGlobalLockRequire bool

  LockKeysBuffer *model.Set

  SqlUndoItemsBuffer []*undo.SqlUndoLog

  }

  Commit() 方法中的 tx.tx.Context ,第一个 tx 是封装的 Tx 对象,第二个 tx 是 database/sql 的 Tx, tx.tx.Context 则是 Tx_Contex。UndoLogManager 则是操作 undoLog 的核心对象,处理 undoLog 的插入、删除,并查询出 undoLog 用于回滚。

  func (tx *Tx) Rollback() error {

  err :=tx.tx.Rollback()

  if tx.tx.Context.InGlobalTransaction() && tx.tx.Context.IsBranchRegistered() {

  // 报告 TC 分支事务执行失败

  tx.report(false)

  }

  tx.tx.Context.Reset()

  return err

  }

  通过上面的代码呢,我们知道增强型 Tx 对象需要向 TC 注册分支事务,并报告分支事务的执行状态,相应代码如下:

  func (tx *Tx) register() (int64,error) {

  return dataSourceManager.BranchRegister(meta.BranchTypeAT,tx.tx.ResourceId,"",tx.tx.Context.Xid,

  nil,tx.tx.Context.BuildLockKeys())

  }

  func (tx *Tx) report(commitDone bool) error {

  retry :=tx.reportRetryCount

  for retry > 0 {

  var err error

  if commitDone {

  err=dataSourceManager.BranchReport(meta.BranchTypeAT, tx.tx.Context.Xid, tx.tx.Context.BranchId,

  meta.BranchStatusPhaseoneDone,nil)

  } else {

  err=dataSourceManager.BranchReport(meta.BranchTypeAT, tx.tx.Context.Xid, tx.tx.Context.BranchId,

  meta.BranchStatusPhaseoneFailed,nil)

  }

  if err !=nil {

  logging.Logger.Errorf("Failed to report [%d/%s] commit done [%t] Retry Countdown: %d",

  tx.tx.Context.BranchId,tx.tx.Context.Xid,commitDone,retry)

  retry=retry -1

  if retry==0 {

  return errors.WithMessagef(err,"Failed to report branch status %t",commitDone)

  }

  }

  }

  return nil

  }

  和 TC 进行通信的主要逻辑还是在 DataSourceManager 里面。AT 模式涉及的两个关键对象 DataSourceManager、UndoLogManager 就浮出水面。一个用于远程 TC 交互,一个用于本地数据库处理。

  事务执行

  func (tx *Tx) Exec(query string, args ...interface{}) (sql.Result, error) {

  var parser=p.New()

  // 解析业务 sql

  act,_ :=parser.ParseOneStmt(query,"","")

  deleteStmt,isDelete :=act.(*ast.DeleteStmt)

  if isDelete {

  executor :=&DeleteExecutor{

  tx: tx.tx,

  sqlRecognizer: mysql.NewMysqlDeleteRecognizer(query,deleteStmt),

  values: args,

  }

  return executor.Execute()

  }

  insertStmt,isInsert :=act.(*ast.InsertStmt)

  if isInsert {

  executor :=&InsertExecutor{

  tx: tx.tx,

  sqlRecognizer: mysql.NewMysqlInsertRecognizer(query,insertStmt),

  values: args,

  }

  return executor.Execute()

  }

  updateStmt,isUpdate :=act.(*ast.UpdateStmt)

  if isUpdate {

  executor :=&UpdateExecutor{

  tx: tx.tx,

  sqlRecognizer: mysql.NewMysqlUpdateRecognizer(query,updateStmt),

  values: args,

  }

  return executor.Execute()

  }

  return tx.tx.Tx.Exec(query,args)

  }

  执行业务 sql,并生成 undoLog 的关键,在于识别业务 sql 执行了什么操作:插入?删除?修改?这里使用 tidb 的 sql parser 去解析业务 sql,再使用相应的执行器去执行业务 sql,生成 undoLog 保存在 Tx_Context 中。

  事务开启

  db.Begin() 返回增强型的 Tx 对象。

  func (db *DB) Begin(ctx *context.RootContext) (*Tx,error) {

  tx,err :=db.DB.Begin()

  if err !=nil {

  return nil,err

  }

  proxyTx :=&tx2xyTx{

  Tx: tx,

  DSN: db.conf.DSN,

  ResourceId: db.GetResourceId(),

  Context: tx2.NewTxContext(ctx),

  }

  return &Tx{

  tx: proxyTx,

  reportRetryCount: db.conf.ReportRetryCount,

  reportSuccessEnable: db.conf.ReportSuccessEnable,

  },nil

  }seata-golang at 模式的使用

  sample 代码

  首先执行 scripts 脚本,初始化数据库

  如果之前没有初始化过 seata 数据库,先执行 seata-golang/scripts/server/db/mysql.sql 脚本修改 dsn 数据库配置,修改下列文件:

  seata-golang/tc/app/profiles/dev/config.yml

  seata-golang/samples/at/product_svc/conf/client.yml

  seata-golang/samples/at/product_svc/conf/client.yml将下列文件中的 configPath 修改为 client.yml 配置文件的路径

  seata-golang/samples/at/product_svc/main.go

  seata-golang/samples/at/order_svc/main.go

  seata-golang/samples/at/aggregation_svc/main.go依次运行 tc、order_svc、product_svc、aggragation_svc,访问下列地址开始测试:

  localhost:8003/createSoCommit

  localhost:8003/createSoRollback

上一篇:linux下的PGSQL安装步骤


下一篇:【无标题】