Nginx一致性哈希模块的Lua实现

Nginx一致性哈希模块的Lua重新实现

技术背景:

最近在工作中使用了nginx+redis 的架构,redis在后台做分布式存储,每个redis都存放不同的数据,这些数据都是某门户网站通过Hadoop分析出来的用户行为日志,key是uid,value是user profile,每小时更新量在500-800万条记录,而这些记录一旦生成,我需要在5分钟左右的时间完成所有导入过程。

首先,我在nginx中使用了第三方模块HttpUpstreamConsistent来做负载均衡策略,针对不同用户(uid)选取不同的backend redis:

   upstream somestream {
consistent_hash $arg_uid;
server 10.50.1.3:;
server 10.50.1.4:;
server 10.50.1.5:;
}

现在问题来了,由于Hadoop系统处理日志的速度非常快,如果把每条记录都通过Nginx来写入Redis中,这样的速度是无法接受的,而且会影响Nginx对正常请求的服务能力。所以,需要将这些数据以离线的方式导入redis集群中,这样就要重新实现HttpUpstreamConsistent模块了,才能保证读写的哈希策略一致。

下面的源码演示了如何将HttpUpstreamConsistent模块翻译成Lua的过程,(使用了CRC32作散列,依赖库的路径已列在Reference中)。

#!/usr/bin/lua

-- chenqi@2014/04/02
--[Reference]
--https://github.com/yaoweibin/ngx_http_consistent_hash
--https://github.com/davidm/lua-digest-crc32lua local CRC = require('CRC32') local M = {} local CONSISTENT_BUCKETS =
local VIRTUAL_NODE = local HASH_PEERS = {}
local CONTINUUM = {}
local BUCKETS = {} local function hash_fn(key)
return CRC.crc32(key)
end -- in-place quicksort
function quicksort(array,compareFunc)
quick(array,,#array,compareFunc)
end function quick(array,left,right,compareFunc)
if(left < right ) then
local index = partion(array,left,right,compareFunc)
quick(array,left,index-,compareFunc)
quick(array,index+,right,compareFunc)
end
end function partion(array,left,right,compareFunc)
local key = array[left]
local index = left
array[index],array[right] = array[right],array[index]
local i = left
while i< right do
if compareFunc( key,array[i]) then
array[index],array[i] = array[i],array[index]
index = index +
end
i = i +
end
array[right],array[index] = array[index],array[right]
return index;
end -- binary search
local function chash_find(point)
local mid, lo, hi = , , #CONTINUUM
while do
if point <= CONTINUUM[lo][] or point > CONTINUUM[hi][] then
return CONTINUUM[lo]
end -- test middle point
mid = lo + math.floor((hi-lo)/) -- perfect match
if point <= CONTINUUM[mid][] and point > (mid > and CONTINUUM[mid-][] or ) then
return CONTINUUM[mid]
end -- too low, go up
if CONTINUUM[mid][] < point then
lo = mid +
else
hi = mid -
end
end
end local function chash_init()
local n = #HASH_PEERS
if n == then
print("There is no backend servers")
return
end local C = {}
for i,peer in ipairs(HASH_PEERS) do
for k=, math.floor(VIRTUAL_NODE * peer[]) do
local hash_data = peer[] .. "-" .. (k - )
table.insert(C, {peer[], hash_fn(hash_data)})
end
end quicksort(C, function(a,b) return a[] > b[] end)
CONTINUUM = C --[[
for i=1,#C do
print(CONTINUUM[i][1],CONTINUUM[i][2])
end
--]] local step = math.floor(0xFFFFFFFF / CONSISTENT_BUCKETS) BUCKETS = {}
for i=, CONSISTENT_BUCKETS do
table.insert(BUCKETS, i, chash_find(math.floor(step * (i - ))))
-- print(BUCKETS[i][],BUCKETS[i][])
end end
M.init = chash_init local function chash_get_upstream_crc32(point)
return BUCKETS[(point % CONSISTENT_BUCKETS)+][]
end
M.get_upstream_crc32 = chash_get_upstream_crc32 local function chash_get_upstream(key)
local point = math.floor(hash_fn(key))
return chash_get_upstream_crc32(point)
end
M.get_upstream = chash_get_upstream local function chash_add_upstream(upstream, weigth)
weight = weight or
table.insert(HASH_PEERS, {weight, upstream})
end
M.add_upstream = chash_add_upstream return M

API调用方式:

local redis_login= {
"10.50.1.3:11211",
"10.50.1.4:11211",
"10.50.1.5:11211",
} for k, backend in ipairs(redis_login) do
chash_login.add_upstream(backend)
end
chash_login.init() uid=""
chash_login.chash_get_upstream(uid)

返回一个backend地址,将该uid对应的数据写入对应的redis中即可,稍后可以使用Nginx读到。

PS:关于redis的mass insertion问题,最高效的方式是批量写入文件(文件格式遵循redis协议),然后使用 redis-cli --pipe 直接导入。

上一篇:AndroidStudio第一个项目HelloWorld


下一篇:编译nginx平滑添加stream模块