从日志及外围工具了解大致流程
cassandra@cqlsh:ycsb> TRACING on;
cassandra@cqlsh:ycsb> insert into usertable(y_id, field0, field1) values('1', 'f0', 'f1');
Tracing session: 9f327c10-db85-11e9-93d6-478d8c046f59
activity | timestamp | source | source_elapsed | client
------------------------------------------------------------------------------------------------------------+----------------------------+--------------+----------------+--------------
Execute CQL3 query | 2019-09-20 17:04:15.953000 | 172.17.30.80 | 0 | 172.17.30.80
MUTATION message received from /172.17.30.80 [MessagingService-Incoming-/172.17.30.80] | 2019-09-20 17:04:15.953000 | 172.17.30.79 | 14 | 172.17.30.80
Parsing insert into usertable(y_id, field0, field1) values('1', 'f0', 'f1'); [Native-Transport-Requests-1] | 2019-09-20 17:04:15.953000 | 172.17.30.80 | 125 | 172.17.30.80
Appending to commitlog [MutationStage-1] | 2019-09-20 17:04:15.953000 | 172.17.30.79 | 152 | 172.17.30.80
Preparing statement [Native-Transport-Requests-1] | 2019-09-20 17:04:15.953000 | 172.17.30.80 | 339 | 172.17.30.80
Determining replicas for mutation [Native-Transport-Requests-1] | 2019-09-20 17:04:15.953000 | 172.17.30.80 | 867 | 172.17.30.80
Appending to commitlog [MutationStage-2] | 2019-09-20 17:04:15.954000 | 172.17.30.80 | 1137 | 172.17.30.80
Adding to usertable memtable [MutationStage-1] | 2019-09-20 17:04:15.954000 | 172.17.30.79 | 304 | 172.17.30.80
Sending MUTATION message to /172.17.30.79 [MessagingService-Outgoing-/172.17.30.79-Small] | 2019-09-20 17:04:15.954000 | 172.17.30.80 | 1155 | 172.17.30.80
Adding to usertable memtable [MutationStage-2] | 2019-09-20 17:04:15.954000 | 172.17.30.80 | 1248 | 172.17.30.80
Enqueuing response to /172.17.30.80 [MutationStage-1] | 2019-09-20 17:04:15.958000 | 172.17.30.79 | 5275 | 172.17.30.80
Sending REQUEST_RESPONSE message to /172.17.30.80 [MessagingService-Outgoing-/172.17.30.80-Small] | 2019-09-20 17:04:15.959000 | 172.17.30.79 | 5469 | 172.17.30.80
REQUEST_RESPONSE message received from /172.17.30.79 [MessagingService-Incoming-/172.17.30.79] | 2019-09-20 17:04:15.960000 | 172.17.30.80 | 11 | 172.17.30.80
Processing response from /172.17.30.79 [RequestResponseStage-4] | 2019-09-20 17:04:15.960000 | 172.17.30.80 | 93 | 172.17.30.80
Request complete | 2019-09-20 17:04:15.959483 | 172.17.30.80 | 6483 | 172.17.30.80
上述执行过程如下:
因为大家ts都是一样的,所以我们分别看172.17.30.80, 172.17.30.79集群日志
172.17.30.80是cordinate,日志序:
- 执行一个cql3 query
- parse 这条insert cql
- Preparing statement,转变为mutation
- 判断mutation对应的副本分布
- 追加到commitlog
- 写入 cql指定表(usertable) 的memtable
- 向172.17.30.79发送mutation request
- 从 172.17.30.79 收到REQUEST_RESPONSE消息
- 处理响应,给客户端ack
172.17.30.79 tracing日志流程
- 从172.17.30.80收到mutation请求
- 追加到本地commitlog
- 写入 cql指定表(usertable) 的memtable
- 给172.17.30.80的回复压入队列
- 发送REQUEST_RESPONSE 消息给 172.17.30.80
图示流程
持久化后的数据格式
bin/nodetool flush
tools/bin/sstabledump /data/ycsb/usertable-07b25290d05711e9a6daafb876512a23/md-4-big-Data.db
WARN 17:09:09,147 Only 58.696GiB free across all data volumes. Consider adding more capacity to your cluster or removing obsolete snapshots
[
{
"partition" : {
"key" : [ "1" ],
"position" : 0
},
"rows" : [
{
"type" : "row",
"position" : 27,
"liveness_info" : { "tstamp" : "2019-09-20T09:04:15.952637Z" },
"cells" : [
{ "name" : "field0", "value" : "f0" },
{ "name" : "field1", "value" : "f1" }
]
}
]
}
]
可以看到数据组织格式,一个sstable有很多partiion,一个partition有很多行,一个行有很多cells,内存的memtable也大体是这种格式,只不过重度依赖btree格式。
deep diving
让我们深入了解单机的写io路径,我们会花些篇幅深入了解单机引擎,如果不是开发者,可以跳过下面这些内容,不了解也没关系
io写流程
单机因为是一个标准的lsm引擎,所以可分为append wal,及append memtable,而memtable使用跳表及内部的btree描述整个memtable数据结构。
OpOrder写栅栏(Barrier)
有个细节挺有意思,写开始会通过writeOrder.start()会给当前Group计数器+1, 当写结束通过try语法糖自动close,close时候计数器会减一,如果计数器最后状态是finish状态,当前group无引用语义,会将自己unlink掉,唤醒该group waitQueue堵塞着的所有线程。如memtable已满,我们都知道要开始flush到本地,flush时候不能堵塞前端写,会switch一下memtable,把当前current memtable变成old,但是是一定要确保前端线程都对old memtable写完才开始flush sstable到本地,flush线程通过以下几个函数调用
writeOrder.newBarrier():
writeBarrier.issue():新创建了一个Group,新的写入会关联到这个Group上,旧grop不再接受新写入,相当于封箱了
writeBarrier.await():flush线程会堵塞,不再执行,等待前端线程写完,通过waitQueue唤醒,类似条件变量
如果读者觉得理解有难度,可以想象成flush线程跟前端写线程对于memtable的锁操作,以达到线程安全。上面的OpOrder就是lock-free的一种实现方式。
append mutation to commitLog
segment是由AbstractCommitLogSegmentManager一段段create出来的,这一段段segment就组成了连续不断的日志流。segment被创建时,会使用memoryMapped做内存映射,memoryMapped可以减少一次内存拷贝,非常适合大量小IO。往commitLog追加mutation时候,先向segmentManager alloc一段文件空间,代码里Allocation标记,相当于预先申请room,然后再把mutation序列化成bytes,写入Allocation底层的buffer,写入过程会写入相应的checksum。这个时候还没结束,只是写到了pagecache,掉电会丢数据,还需要周期性的做fsync。cassandra提供两种fsync策略
- BatchCommitLogService: sync线程会周期性默认每2ms,fsync一次,此时前端线程需要同步等待,直到收到fsync结束条件变量。一个sync间隔内其余线程可搭顺风车,只需同步等待,groupcommit思路。
- PeriodicCommitLogService:后台sync线程默认10s刷一次,前端写线程可以立即返回
append memtable
通过写时opGroup遭到对应的mt, mt内部主要数据结构ConcurrentSkipListMap partitions
AtomicBTreePartition 有个比较重要的属性Holder ref,表示一个Partition的数据结构,展开Holder:
protected static final class Holder
{
final PartitionColumns columns; //列定义
final DeletionInfo deletionInfo;//partition级别deletion info
// the btree of rows
final Object[] tree; //所有行,btree格式排好序
final Row staticRow; //静态行,仅一行
final EncodingStats stats;
}
一个partition有很多行,每一个有很多cell,这个cell集合也是使用btree格式组织的。
写mt主要流程就是
1.写之前一直携带了opGroup,mutation指定了要往哪个table写,但内存中table/cfs有很多memtable,有新有旧,这时候就通过opGroup找到要往哪个memtable写入。
- 通过memtable的跳表找到AtomicBTreePartition数据结构,对要更新的mutation每行进行如下操作,对Holder.tree进行二分查找,如果找到了说明要做rowMerge,没找到的话说明直接insert到全局序对应的位置就好。c*规定每个partition可以上数G,但那是磁盘整体数据量,当前memtable中的partition只存放了一个时间窗口内的,所以不会很大,大了直接就通过flush线程刷成sstable了。
- memtable操作有大量的cas操作,避免加锁,减少上下文切换。
结语
通过上述内容介绍,我们大致了解了cassandra的写流程,后续还会有系列文章介绍cassandra如何flush memtable,以及cassandra的读流程,尽情期待,可入群拿到最新资讯。
入群邀约
为了营造一个开放的 Cassandra 技术交流环境,社区建立了微信群公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。
另外阿里云为广大开发者提供云上Cassandra资源,可用于动手实践:9.9元可使用三月(限首购)。
直达链接:https://www.aliyun.com/product/cds