golang从ES获取数据并计算存在那个codis的slot
环境是已经将redis的bigkey存入ES
1. 获取字符串存在那个slot
1.1 官方命令
SLOTSHASHKEY key1 [key2 …]
命令说明:计算并返回给定 key 的 slot 序号
命令参数:输入为 1 个或多个 key
返回结果: 操作返回 array
response := []int{slot1, slot2...}
其中:
INT slot : 表示对应 key 的 slot 序号,即 hash32(key) % NUM_OF_SLOTS
例如:
localhost:6379> slotshashkey a b c # 计算 <a,b,c> 的 slot 序号
1) (integer) 579
2) (integer) 1017
3) (integer) 879
1.2 使用golang获取
Codis 采用 Pre-sharding 的技术来实现数据的分片, 默认分成 1024 个 slots (0-1023), 对于每个key来说, 通过以下公式确定所属的 Slot Id : SlotId = crc32(key) % 1024。参考代码
package slotcrc32
import "hash/crc32"
// GetIntvalKey 返回crc32算法的返回结果
func GetIntvalKey(strKey string) uint32 {
table := crc32.MakeTable(crc32.IEEE)
ret := crc32.Checksum([]byte(strKey), table)
return ret % 1024
}
golang从elasticsearch获取数据,并调用函数计算keyName的slot位置
package esgetkey
import (
"context"
"encoding/json"
"fmt"
"local/dev/esgetkey/slotcrc32"
"log"
"os"
"sort"
"github.com/olivere/elastic/v7"
)
type Employee struct {
CodisName string json:"codisName"
Size int64 json:"size"
KeyType string json:"keyType"
RedisHost string json:"redisHost"
Database int64 json:"database"
Ex string json:"过期时间"
KeyName string json:"keyName"
}
type MapSorter []slotInfo
type slotInfo struct {
totalSize int64
totalCount int64
slotNum uint32
}
var (
slotInfoMap map[uint32]slotInfo = make(map[uint32]slotInfo, 1024)
)
// GetEsKey 从es中获取codis的key
func GetEsKey(host string) {
var err error
var res *elastic.SearchResult
errorlog := log.New(os.Stdout, "APP", log.LstdFlags)
client, err := elastic.NewClient(elastic.SetErrorLog(errorlog), elastic.SetURL(host))
if err != nil {
panic(err)
}
info, code, err := client.Ping(host).Do(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
esversion, err := client.ElasticsearchVersion(host)
if err != nil {
panic(err)
}
fmt.Printf("Elasticsearch version %s\n", esversion)
// 查询
q := elastic.NewTermQuery("codisName", "THEIA_PUSH")
res, err = client.Search("codis-bigkey-2021-04-02*").Query(q).TrackTotalHits(true).Do(context.Background())
if err != nil {
panic(err)
}
fmt.Println(res.Hits.TotalHits.Value)
for i := 0; int64(i) < res.Hits.TotalHits.Value; i += 1000 {
res, err = client.Search("codis-bigkey-2021-04-02*").Query(q).From(i).Size(1000).TrackTotalHits(true).Do(context.Background())
if err != nil {
panic(err)
}
for _, data := range res.Hits.Hits {
var typ Employee
err = json.Unmarshal(data.Source, &typ)
if err != nil {
panic(err)
}
keySlotNum := slotcrc32.GetIntvalKey(typ.KeyName)
if typ.KeyName == "guess_item2vec_similar_popular_level_novel" {
fmt.Println("guess_item2vec_similar_popular_level_novel", typ)
}
value, ok := slotInfoMap[keySlotNum]
switch ok {
case true:
value.totalCount += 1
value.totalSize += typ.Size
// fmt.Println(keySlotNum, value)
slotInfoMap[keySlotNum] = value
case false:
slotInfoMap[keySlotNum] = slotInfo{totalSize: typ.Size, totalCount: 1}
}
}
// break
}
// fmt.Println(slotInfoMap)
ms := NewMapSorter(slotInfoMap)
sort.Sort(ms)
totalCount := 0
for _, item := range ms {
totalCount += int(item.totalCount)
fmt.Println(item)
}
fmt.Println(totalCount)
}
func (ms MapSorter) Len() int {
return len(ms)
}
func (ms MapSorter) Less(i, j int) bool {
return ms[i].totalSize < ms[j].totalSize // 按值排序
//return ms[i].Key < ms[j].Key // 按键排序
}
func (ms MapSorter) Swap(i, j int) {
ms[i], ms[j] = ms[j], ms[i]
}
func NewMapSorter(m map[uint32]slotInfo) MapSorter {
ms := make(MapSorter, 0, len(m))
for k, v := range m {
ms = append(ms, slotInfo{totalSize: v.totalSize, totalCount: v.totalCount, slotNum: k})
}
return ms
}