Aerospike用例分析

       Aerospike是一个以分布式为核心,T级别大数据高并发的……,参见《Aerospike入坑导读》https://yq.aliyun.com/articles/622455?spm=a2c4e.11155435.0.0.36e43312Nq4iTX,针对常用的一般性操作给出九个用例,代码采用GO语言实现,JAVA、C\C++以及其他语言的实现都差不多,同样具有参考意义,完整的代码请从https://gitee.com/gonglibin/codes/0hqjzto12argkmbsyv97n32下载。

func asInformation() {
	url := fmt.Sprintf("%s:%d", AS_HOST, AS_PORT)

	if con, err := as.NewConnection(url, time.Second * 10); err == nil {
		defer con.Close()

		if ifs, err := as.RequestInfo(con, ""); nil == err {
			for k, v := range ifs {
				/**
				 * features: peers;cdt-list;cdt-map;cluster-stable;pipelining;geo ...
				 * build: 4.2.0.5
				 * version: Aerospike Community Edition build 4.2.0.5
				 * build_os: el7
				 * statistics: cluster_size=1;cluster_key=E50AB9AFD60C;cluster_in ...
				 * services-alumni:
				 * services:
				 * partition-generation: 0
				 * node: BB9822D9A270008
				 * edition: Aerospike Community Edition
				 * build_time: Mon Jul 16 19:02:48 UTC 2018
				 */
				fmt.Printf("%s: %s\n", k, v)
			}
		}
	} else {
		panic(err.Error())
	}
}

       用例一:asInformation获取aerospike的基础信息,第64行调用RequestInfo函数,函数的第一个参数为aerospike的连接句柄,第二个参数为信息名称,空则返回全部信息,结果保存在map容器中。

func asMapToRecord(cli *as.Client) {
	arr := []byte{0x11, 0x21, 0x31}
	lst := []interface{}{111, "STRING", arr}
	mmp := map[interface{}] interface{} {
		"key1": 1,
		"key2": "A",
		"key3": arr,
		"key4": lst,
	}

	mybin := "mybin1"
	bin := as.NewBin(mybin, mmp)
	cli.PutBins(AS_WPLC, AS_NKEY, bin)

	if rst, err := cli.Get(AS_PLCY, AS_NKEY, bin.Name); nil == err {
		val := rst.Bins[bin.Name].(map[interface{}] interface{})
		fmt.Printf("Intege: %d, String: %s, []Byte: %v, Map: %v\n", val["key1"], val["key2"], val["key3"], val["key4"])
	}
}

       用例二:asMapToRecord以map数据结构作为bin存储的数据类型,93、94、95行构造了一个map实例,且每个节点的数据类型都不一样,分别为整形、字符串、字符数组和链表(其中链表中包含三个元素,分别为整形、字符串和字符数组),可见GO语言的map对数据类型的支持是多么的肆意妄为。查询的结果以键值对的方式返回,指定key从而读取value。

func asListToRecord(cli *as.Client) {
	mybin := "mybin2"
	bin := as.NewBin(mybin, []interface{}{111, "STRING_111", []byte{0x11, 0x21, 0x31}})

	cli.PutBins(AS_WPLC, AS_NKEY, bin)

	if rst, err := cli.Get(AS_PLCY, AS_NKEY, bin.Name); nil == err {
		val := rst.Bins[bin.Name].([]interface{})
		fmt.Printf("Intege: %d, String: %s, []Byte: %v\n", val[0], val[1], val[2])
	}
}

       用例三:asListToRecord以链表数据结构作为bin存储的数据类型,第119行构造了一个链表实例,并附加bin名称定义了一个bin对象,链表结构共三个节点,分别存储整形、字符串和字符数组。查询的结果集以链表方式返回,可以直接通过下标得到数据项。

func asBatchToRecord(cli *as.Client) {
	bn := "mybin3"
	bk := []string{"b_key1", "b_key2", "b_key3", "b_key4"}
	bv := []string{"b_val1", "b_val2", "b_val3", "b_val4"}

	// 写入操作
	for i, k := range bk {
		bin := as.NewBin(bn, bv[i])
		key, _ := as.NewKey(AS_NMSP, AS_MYST, k)
		cli.PutBins(AS_WPLC, key, bin)
	}

	// 存在判断
	ks := make([]*as.Key, len(bk) * 2)
	for i, k := range bk {
		ks[i], _ = as.NewKey(AS_NMSP, AS_MYST, k)
	}
	ks[4], _ = as.NewKey(AS_NMSP, AS_MYST, "b_key5")
	ks[5], _ = as.NewKey(AS_NMSP, AS_MYST, "b_key6")
	ks[6], _ = as.NewKey(AS_NMSP, AS_MYST, "b_key7")
	ks[7], _ = as.NewKey(AS_NMSP, AS_MYST, "b_key8")
	if ext, err := cli.BatchExists(nil, ks); nil == err {
		for i, e := range ext {
			fmt.Printf("ns = %s, set = %s, key = %s, exists = %t\n", ks[i].Namespace(), ks[i].SetName(), ks[i].Value(), e)
		}
	} else {
		panic(err.Error())
	}

	// 批量获取
	if rst, err := cli.BatchGet(nil, ks, bn); nil == err {
		for i, r := range rst {
			if nil != r {
				fmt.Printf("ns = %s, set = %s, key = %s, bin = %s, val = %s\n", ks[i].Namespace(), ks[i].SetName(), ks[i].Value(), bn, r.Bins[bn])
			} else {
				fmt.Printf("ns = %s, set = %s, key = %s, bin = %s, val = %s\n", ks[i].Namespace(), ks[i].SetName(), ks[i].Value(), bn, "nil")
			}
		}
	}
}

       用例四:asBatchToRecord批量操作,140 - 144行写入四条记录,147行故意定义其两倍查询主键数组,其中前四个为刚才写入记录的主键,151 - 154行构造了四个不存在的主键,155行提请批量判断记录是否存在,156 - 158行打印结果至标准输出。164 - 172行采用两倍主键数组批量获取结果集,存在的结果打印,不存在的显示nil。

func asAppendToRecord(cli *as.Client) {
	mybin := "mybin4"

	bin := as.NewBin(mybin, "aero")
	cli.AppendBins(AS_WPLC, AS_NKEY, bin)
	asGetBinsAndPrint("AppendBins第一次调用", cli, AS_NKEY, bin)

	bin = as.NewBin(mybin, "spike")
	cli.AppendBins(AS_WPLC, AS_NKEY, bin)
	asGetBinsAndPrint("AppendBins第二次调用", cli, AS_NKEY, bin)
}

       用例五:asAppendToRecord从右侧向bin中追加数据,183 - 185行写入aero,187 - 189行写入spike,分两次写入完整aerospike数据。

func asAddBinsToRecord(cli *as.Client) {
	val := 8
	mybin := "mybin5"

	bm := make(as.BinMap)
	bm[mybin] = val
	cli.Add(AS_WPLC, AS_NKEY, bm)
	asGetBinsAndPrint("BinMap 传入调用", cli, AS_NKEY, as.NewBin(mybin, val))

	// NewBin 调用累加
	bin := as.NewBin(mybin, val << 1)
	cli.AddBins(AS_WPLC, AS_NKEY, bin)
	asGetBinsAndPrint("NewBin 调用累加", cli, AS_NKEY, bin)

	// Operate调用累加
	bin = as.NewBin(mybin, val << 2)
	cli.Operate(AS_WPLC, AS_NKEY, as.AddOp(bin), as.GetOp())
	asGetBinsAndPrint("Operate调用累加", cli, AS_NKEY, bin)
	//上面一行与下面几行等价,Operate调用累加的同时返回结果。
	//if rst, err := cli.Operate(AS_WPLC, AS_NKEY, as.AddOp(bin), as.GetOp()); nil == err {
	//	fmt.Printf(
	//		"Operate调用累加 ---> ns = %s, set = %s, AS_NKEY = %s, bin = %s, val = %d\n",
	//		AS_NKEY.Namespace(), AS_NKEY.SetName(), AS_NKEY.Value(), bin.Name, rst.Bins[bin.Name])
	//}
}

       用例六:asAddBinsToRecord对bin值进行累加,201 - 204行构造一个map,调用as的Add函数累加,207 - 209行调用as的AddBins函数累加,212 - 214行调用as的Operate函数累加,三种方式导致的结果一致,不同的只是入口参数不同。

func asPrependToRecord(cli *as.Client) {
	mybin := "mybin6"
	wds := []string{"工程师", "研发", "软件", "是", "我"}

	for i, w := range wds {
		bin := as.NewBin(mybin, w)
		cli.PrependBins(AS_WPLC, AS_NKEY, bin)
		asGetBinsAndPrint(strconv.Itoa(i + 1), cli, AS_NKEY, bin)
	}
}

       用例七:asPrependToRecord从左侧向bin中追加数据,为了演示效果特将一句中文分词后从右向左逐一写入bin,每写一次打印输出一次。

func asReplaceToRecord(cli *as.Client) {
	bn := []string{"b_name_1", "b_name_2", "b_name_3"}
	bv := []string{"b_value_1", "b_value_2", "b_value_3"}

	for i, b := range bv {
		bin := as.NewBin(bn[i], b)
		cli.PutBins(AS_WPLC, AS_NKEY, bin)
	}

	fmt.Printf("覆盖前 --- > ")
	if rst, err := cli.Get(AS_PLCY, AS_NKEY, bn[0], bn[1], bn[2]); nil == err {
		for k, v := range rst.Bins {
			fmt.Printf("{bin = %s, val = %s}, ", k, v)
		}
		fmt.Println()
	}

	wp := as.NewWritePolicy(0, 0)
	wp.RecordExistsAction = as.REPLACE
	nb := as.NewBin("b_name_replace", "b_value_replace")

	cli.PutBins(wp, AS_NKEY, nb)
	asGetBinsAndPrint("覆盖后", cli, AS_NKEY, nb)
}

       用例八:asReplaceToRecord对原有记录进行覆盖操作,需要特别注意的一点是PutBins时务必指定所有的bin,已存在的但未传入的bin系统将不会自动保存。248 - 259行写入三个bin并打印至标准输出,261 - 266行采用一个新的bin执行覆盖操作成功后,之前写入的三个bin被删除。

func asGetHeaderFromKey(cli *as.Client) {
	if rst, _ := cli.GetHeader(AS_PLCY, AS_NKEY); nil != rst {
		fmt.Printf("Key.generation = %d, Key.expiration = %d (%s)\n", rst.Generation, rst.Expiration, time.Duration(rst.Expiration) * time.Second)
	} else {
		fmt.Printf("Failed to GetHeader: namespace = %s, set = %s, key = %s\n", AS_NKEY.Namespace(), AS_NKEY.SetName(), AS_NKEY.Value())
	}
}

       用例九:asGetHeaderFromKey查询一个已存在主键的被修改次数和生存期,一个key在没有被修改的情况下,系统默认生存期为720个小时(30天)

       Redis和Aerospike经常被拿来比较,至于用哪里好更多的时候还是要看需求,从总体上看,Aerospike比Redis对结构化数据支持的更好些,数据有效期、集群化管理、容灾容错方面也是各有所长,一定要找一个彼此不可替代的场景还真不好找。现在做个项目工程哪个不需要存点临时的中间数据给各个子系统读读写写的,多个备选方案总不是坏事。

上一篇:网页正文提取方法一二


下一篇:C#与JAVA线程间同步实现比较