分布式数据库中间件–(3) Cobar对简单select命令的处理过程

友情提示:非原文链接可能会影响您的阅读体验,欢迎查看原文。(http://blog.geekcome.com)


在上一篇中介绍了Cobar和client初次建立连接的过程,Cobar监听端口,client发起连接请求,Cobar发送握手数据包,client发送认证数据包最后依据认证的结果Cobar向client发送认证结果。

在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。

所以在client再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。以下详细分析一下简单select语句的运行过程。

1、事件的产生

NIOReactor的R线程一直在监听selector上的每一个连接的感兴趣事件是否发生,当client发送了一条select * from tb1,select函数会返回,然后获取到该连接SelectionKey,而且该SelectKey的兴趣事件是OP_READ。此时会调用read(NIOConnection)函数。

01 public void run() {
02             final Selector selector = this.selector;
03             for (;;) {
04                 ++reactCount;
05                 try {
06                     int res = selector.select();
07                     LOGGER.debug(reactCount + ">>NIOReactor接受连接数:" + res);
08                     register(selector);
09                     Set<SelectionKey> keys = selector.selectedKeys();
10                     try {
11                         for (SelectionKey key : keys) {
12                             Object att = key.attachment();
13                             if (att != null && key.isValid()) {
14                                 int readyOps = key.readyOps();
15                                 if ((readyOps & SelectionKey.OP_READ) != 0) {
16                                     LOGGER.debug("select读事件");
17                                     read((NIOConnection) att);
18                                ..............................
19                             }
20                              ...........................
21                         }
22                     } ..................
23                 } ............
24             }
25   }

 2、调用该连接的read函数进行处理

该函数在上一篇中提到过,该函数的实如今AbstractConnection中,实现从channel中读取数据到缓冲区,然后从缓冲区完整的取出整包数据交给FrontendConnection类的handle()函数处理。

该函数交给processor进行异步处理。从processor中的线程池获取一个线程来运行该任务。这里调用详细的handler来进行处理。

刚開始提到的,当认证成功后,Cobar将连接的回调处理函数设置为FrontendCommandHandler。所以这里会调用前端命令处理器的handler函数进行数据的处理。

在这里须要先了解MySQL数据包的格式:

分布式数据库中间件–(3) Cobar对简单select命令的处理过程

MySQLclient命令请求报文

该处理函数例如以下:

01 public void handle(byte[] data) {
02     LOGGER.info("data[4]:"+data[4]);
03     switch (data[4]) {
04     case MySQLPacket.COM_INIT_DB:
05         commands.doInitDB();
06         source.initDB(data);
07         break;
08     case MySQLPacket.COM_QUERY:
09         commands.doQuery();
10         source.query(data);
11         break;
12     case MySQLPacket.COM_PING:
13         commands.doPing();
14         source.ping();
15         break;
16     case MySQLPacket.COM_QUIT:
17         commands.doQuit();
18         source.close();
19         break;
20     case MySQLPacket.COM_PROCESS_KILL:
21         commands.doKill();
22         source.kill(data);
23         break;
24     case MySQLPacket.COM_STMT_PREPARE:
25         commands.doStmtPrepare();
26         source.stmtPrepare(data);
27         break;
28     case MySQLPacket.COM_STMT_EXECUTE:
29         commands.doStmtExecute();
30         source.stmtExecute(data);
31         break;
32     case MySQLPacket.COM_STMT_CLOSE:
33         commands.doStmtClose();
34         source.stmtClose(data);
35         break;
36     case MySQLPacket.COM_HEARTBEAT:
37         commands.doHeartbeat();
38         source.heartbeat(data);
39         break;
40     default:
41         commands.doOther();
42         source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
43     }
44 }

 

由于每一个报文都有消息头,消息头固定的是4个字节,前3个字节是消息长度,后面的一个字节是报文序号,例如以下所看到的

分布式数据库中间件–(3) Cobar对简单select命令的处理过程

所以data[4]是第五个字节。也就是消息体的第一个字节。client向Cobar端发送的是命令报文,第一个字节是详细的命令。

假设是select语句,那么data[4]就是COM_QUERY,然后会调用详细连接的query成员函数,其定义在FrontendConnection类中。

01 public void query(byte[] data) {
02     if (queryHandler != null) {
03         // 取得语句
04         MySQLMessage mm = new MySQLMessage(data);
05         mm.position(5);
06         String sql = null;
07         try {
08             sql = mm.readString(charset);
09         catch (UnsupportedEncodingException e) {
10             writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset ‘" + charset + "‘");
11             return;
12         }
13         if (sql == null || sql.length() == 0) {
14             writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL");
15             return;
16         }
17         LOGGER.debug("解析的SQL语句:"+sql);
18         // 运行查询
19         queryHandler.query(sql);
20     else {
21         writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!");
22     }
23 }

首先新建一个MySQLMessage对象,将数据包的索引位置定位到第6个字节位置处。然后将后面的全部的字节读取成指定编码格式的SQL语句,这里就形成了完整的SQL语句。

查询的时候Cobar控制台输出例如以下内容:

11:35:33,392 INFO data[4]:3
11:35:33,392 DEBUG 解析的SQL语句:select * from tb2

解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,事实上现的query函数例如以下:

01 public void query(String sql) {
02     //这里就得到了完整的SQL语句,接收自client
03     ServerConnection c = this.source;
04     if (LOGGER.isDebugEnabled()) {
05         LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
06     }
07     //该函数对SQL语句的语法和语义进行分析,并返回SQL语句的对于类型,运行对应的操作
08     int rs = ServerParse.parse(sql);
09     switch (rs & 0xff) {
10     .......................
11     case ServerParse.SELECT:
12         //select操作运行
13         SelectHandler.handle(sql, c, rs >>> 8);
14         break;
15     .......................
16     }
17 }

首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。

假设语句没有语法错误,则直接交给SelectHandler进行处理。假设是一般的select语句,则直接调用ServerConnection的execute运行sql

c.execute(stmt, ServerParse.SELECT);

在ServerConnection中的execute函数中须要进行路由检查,由于select的数据不一定在一个数据库中,须要按拆分的规则进行路由的检查。

1 // 路由计算
2 RouteResultset rrs = null;
3 try {
4     rrs = ServerRouter.route(schema, sql, this.charset, this);
5     LOGGER.debug("路由计算结果:"+rrs.toString());
6 }

详细的路由算法也是比較复杂,以后会专门分析。

Cobar的DEBUG控制台输出路由的计算结果例如以下:

11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={
1 -> dnTest2.default{select * from tb2}
2 -> dnTest3.default{select * from tb2}
}

该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。

经过比較复杂的资源处理最后在每一个后端数据库上运行函数execute0。

01 private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) {
02     ServerConnection sc = ss.getSource();
03     .........................
04     try {
05         // 运行并等待返回
06         BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit);
07         // 接收和处理数据,运行到这里就说明上面的运行已经得到运行结果的返回
08         final ReentrantLock lock = MultiNodeExecutor.this.lock;
09         lock.lock();
10         try {
11             switch (bin.data[0]) {
12             case ErrorPacket.FIELD_COUNT:
13                 c.setRunning(false);
14                 handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn));
15                 break;
16             case OkPacket.FIELD_COUNT:
17                 OkPacket ok = new OkPacket();
18                 ok.read(bin);
19                 affectedRows += ok.affectedRows;
20                 // set lastInsertId
21                 if (ok.insertId > 0) {
22                     insertId = (insertId == 0) ? ok.insertId : Math.min(insertId, ok.insertId);
23                 }
24                 c.setRunning(false);
25                 handleSuccessOK(ss, rrn, autocommit, ok);
26                 break;
27             default// HEADER|FIELDS|FIELD_EOF|ROWS|LAST_EOF
28                 final MySQLChannel mc = (MySQLChannel) c;
29                 if (fieldEOF) {
30                     for (;;) {
31                         bin = mc.receive();
32                         switch (bin.data[0]) {
33                         case ErrorPacket.FIELD_COUNT:
34                             c.setRunning(false);
35                             handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
36                             return;
37                         case EOFPacket.FIELD_COUNT:
38                             handleRowData(rrn, c, ss);
39                             return;
40                         default:
41                             continue;
42                         }
43                     }
44                 else {
45                     bin.packetId = ++packetId;// HEADER
46                     List<MySQLPacket> headerList = new LinkedList<MySQLPacket>();
47                     headerList.add(bin);
48                     for (;;) {
49                         bin = mc.receive();
50                         switch (bin.data[0]) {
51                         case ErrorPacket.FIELD_COUNT:
52                             c.setRunning(false);
53                             handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
54                             return;
55                         case EOFPacket.FIELD_COUNT:
56                             bin.packetId = ++packetId;// FIELD_EOF
57                             for (MySQLPacket packet : headerList) {
58                                 buffer = packet.write(buffer, sc);
59                             }
60                             headerList = null;
61                             buffer = bin.write(buffer, sc);
62                             fieldEOF = true;
63                             handleRowData(rrn, c, ss);
64                             return;
65                         default:
66                             bin.packetId = ++packetId;// FIELDS
67                             switch (flag) {
68                             case RouteResultset.REWRITE_FIELD:
69                                 StringBuilder fieldName = new StringBuilder();
70                                 fieldName.append("Tables_in_").append(ss.getSource().getSchema());
71                                 FieldPacket field = PacketUtil.getField(bin, fieldName.toString());
72                                 headerList.add(field);
73                                 break;
74                             default:
75                                 headerList.add(bin);
76                             }
77                         }
78                     }
79                 }
80             }
81         finally {
82             lock.unlock();
83         }
84     }//异常处理....................
85 }

这里真正的运行SQL语句,然后等待后端运行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。

当client发起认证请求或命令请求后,server会返回对应的运行结果给client。client在收到响应报文后,须要首先检查第1个字节的值,来区分响应报文的类型。

响应报文类型 第1个字节取值范围
OK 响应报文 0×00
Error 响应报文 0xFF
Result Set 报文 0×01 – 0xFA
Field 报文 0×01 – 0xFA
Row Data 报文 0×01 – 0xFA
EOF 报文 0xFE

注:响应报文的第1个字节在不同类型中含义不同,比方在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。

Result Set 消息分为五部分,结构例如以下:

结构 说明
[Result Set Header] 列数量
[Field] 列信息(多个)
[EOF] 列结束
[Row Data] 行数据(多个)
[EOF] 数据结束

函数运行完毕后,返回的结果都放入LinkedList中,当读取结果完毕后放入多节点运行器的缓冲区。假设buffer满了,就通过前端连接写出给client。

作者:Yong Man
提示:本文版权归作者,欢迎转载,但未经作者允许必须保留此段声明,且在文章页面明显位置给出原文连接。
假设对文章有不论什么问题,都能够在评论中留言,我会尽可能的答复您,谢谢你的阅读

分布式数据库中间件–(3) Cobar对简单select命令的处理过程,布布扣,bubuko.com

分布式数据库中间件–(3) Cobar对简单select命令的处理过程

上一篇:《Linux内核设计与实现》课本第五章学习笔记——20135203齐岳


下一篇:Oracle RAMN 备份解决方案一例