本周接手了一个Cassandra系统的维护工作,有一项是需要将应用方的数据导入我们维护的Cassandra集群,并且为应用方提供HTTP的方式访问服务。这是我第一次接触KV系统,原来只是走马观花似的看过KV啊,NoSQL啊。但是实际上没有实际的使用经验。经过两天的学习和接手,终于搞明白了在生产环境中的使用方式。在此简要的笔记一下。本文主要包括的内容有:
Cassandra的简介,
Cassandra的相关CLI
Cassandra的Python API,并且给出一个批量导入数据的例子。
1. Cassandra简介
Cassandra的主要特点就是它不是一个数据库,而是由一堆数据库节点共同构成的一个分布式网络服务,对Cassandra 的一个写操作,会被复制到其他节点上去,对Cassandra的读操作,也会被路由到某个节点上面去读取。对于一个Cassandra群集来说,扩展性能 是比较简单的事情,只管在群集里面添加节点就可以了。
Cassandra是一个混合型的非关系的数据库,类似于Google的BigTable。其主要功能比 Dynomite(分布式的Key-Value存 储系统)更丰富,但支持度却不如文档存储MongoDB(介于关系数据库和非关系数据库之间的开源产品,是非关系数据库当中功能最丰富,最像关系数据库 的。支持的数据结构非常松散,是类似json的bjson格式,因此可以存储比较复杂的数据类型。)Cassandra最初由Facebook开发,后转变成了开源项目。它是一个网络社交云计算方面理想的数据库。以Amazon专有的完全分布式的Dynamo为基础,结合了Google BigTable基于列族(Column Family)的数据模型。P2P去中心化的存储。很多方面都可以称之为Dynamo 2.0。
和其他数据库比较,有几个突出特点:
- 模式灵活 :使用Cassandra,像文档存储,你不必提前解决记录中的字段。你可以在系统运行时随意的添加或移除字段。这是一个惊人的效率提升,特别是在大型部 署上。
- 真正的可扩展性 :Cassandra是纯粹意义上的水平扩展。为给集群添加更多容量,可以指向另一台电脑。你不必重启任何进程,改变应用查询,或手动迁移任何数据。
- 多数据中心识别 :你可以调整你的节点布局来避免某一个数据中心起火,一个备用的数据中心将至少有每条记录的完全复制。
一些使Cassandra提高竞争力的其他功能:
- 范围查询 :如果你不喜欢全部的键值查询,则可以设置键的范围来查询。
- 列表数据结构 :在混合模式可以将超级列添加到5维。对于每个用户的索引,这是非常方便的。
- 分布式写操作 :可以在任何地方任何时间集中读或写任何数据。并且不会有任何单点失败。
2. 基础命令
连接
./cassandra-cli-h 10.224.52.73 -port 9160
集群式自动负载的,因此连接任意一个节点即可。
Check schema
show schema;
在创建了schema或者列族后,可以使用时命令确认是否成功
在运行改命令前,需要使用命令use keyspace_name; 否则会遇到以下错误:
Not authorized to a working keyspace
list
list column_family_name;
可以显示列族的前100列。
3. 批量导入
实验数据来自搜狗实验室的中文词语搭配库,http://www.sogou.com/labs/dl/r.html。
数据格式如下:
词语1_词语2 \t 两个词共同出现的次数
在这里并不讨论该数据的具体意义,只是以这个数据为起点来说明如何向应用方提供服务。
部分实际数据:
都要_打牌>--4
等候_一次>--26
本刊_重要>--3 关系_全方位>14 加热_迅速>--107
设计列族名为 test_only, cli 如下:
create column family test_only
with column_type = 'Standard'
andcomparator = 'UTF8Type'
anddefault_validation_class = 'BytesType'
andkey_validation_class = 'UTF8Type'
andread_repair_chance = 0.1
anddclocal_read_repair_chance = 0.0
andgc_grace = 864000
andmin_compaction_threshold = 4
andmax_compaction_threshold = 32
andreplicate_on_write = true
andcompaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
andcaching = 'KEYS_ONLY'
and column_metadata = [
{column_name : 'count',
validation_class : UTF8Type}]
andcompression_options = {'sstable_compression' :'org.apache.cassandra.io.compress.SnappyCompressor'};
连接到Cassandra:pycassa.ConnectionPool(‘keyspace_name’, server_list)
具体到我们的例子就是:
con = pycassa.ConnectionPool('History',server_list=["server1:9160", "server2:9160","server3:9160"])
获取列族:
cf = pycassa.ColumnFamily(con, cfName)
插入一条数据:
cf.insert('row_key', {'col_name': 'col_val'})
批量插入:
cf.batch_insert({'row1': {'name1': 'val1', 'name2': 'val2'}, 'row2': {'foo': 'bar'}})
获取一条数据:
cf.get(‘row_key’)
获取某一列的值:
cf.get(‘row_key’)[‘column_name’]
下面是具体的代码实现:
import pycassa import time batch_size = 100 def pycassa_connect(): #start = time.time() return pycassa.ConnectionPool('History', server_list=["192.168.1.20:9160"]) #end = time.time() #print "Mola init time: ", (end - start) def batch_insert(file_path, cf): global batch_size f = open(file_path, "r") count=0 error_count = 0 kvmap = {} for line in f:-- list = line.split("\t") if len(list) != 2 : print "skip error data" continue column = {} column['count'] = list[1].replace('\n', '') try: kvmap[list[0].decode('gb2312').encode('utf-8')] = column- if len(kvmap) % batch_size == 0: cf.batch_insert(kvmap) kvmap.clear() count = count + 1 except Exception, ex: print "found execption" print ex error_count = error_count + 1 f.close() if len(kvmap) > 0 : cf.batch_insert(kvmap) ---- for key in kvmap: print "key is %s, value is %s"%(key, kvmap[key])- print "total insert data is %d, error is %d"%(count, error_count)
如何测试数据是正确的?
def test_after_insert(file_path, cf): f = open(file_path, "r") error_count=0 print "Test started" for line in f:-- list = line.split("\t") if len(list) != 2 : print "skip error data" continue count = list[1].replace('\n', '') if cf.get(list[0].decode('gb2312').encode('utf-8'))['count'] != count: print "Key %s doesn't match value %s"%(list[0].decode('gb2312').encode('utf-8'), count) error_count = error_count + 1 print "Test completed, found %d error(s)."%error_count f.close()