对于k8s外部服务暴露的方法是很多的(ingress,nodeport,直接通过api server 访问)个有利弊
问题
我们需要一个统一的入口方便访问spring cloud 部署的pod 服务,一般大家想到的是gateway
gateway 的确很不错,但是需要对于部署的每个pod 进行灵活的访问就不是很方便了
权衡
- ingress 模式
暴露的ingress太多了,需要手动处理的也太多了(不是很灵活) - nodeport 模式
同ingress 一样,我们需要处理很多的nodeport ,同样需要管理端口 - 正向代理
我们需要进行配置(每个开发者),而且容易出现问题
思路
我们肯定是需要一个外部的入口,当时因为容器都是外部ip,所以我们需要一个方便的代理(支持灵活的策略)
单独基于ip的模式,肯定不方便,nginx 的虚拟主机就是一个不错的选择,所以我们需要一个灵活的域名处理机制
泛域名很好,但是因为特殊性(我们可能很难或者不是很方便提供类似的服务),xip 服务是一个不错的选择
我们可以基于xip 提供<servicename>.<ip>.xipdoman
格式的服务,servicename 就是spring cloud 在k8s中部署
的pod 名称,但是因为我们需要动态代理的是ip,所以我们需要获取ip地址(可以通过注册中心api),实际上整个
过程可以基于其他语言进行数据的获取,但是为了方便,直接使用openresty 处理各种api 数据的处理,对于动态代理
我们可以在set 阶段处理,但是有一个问题就是set 阶段不同使用cosocket,所以为了方便我们可以通过timer 绕过
简单的方法就是在init_worker 阶段同时我们通过shared 进行数据共享,方便数据的查询处理
解决方法
- 参考图
- 简单说明
整体处理就是基于上边思路的,只是我们的openresty服务部署在k8s中(主要是网络打通),同时为了方便域名访问我们还是部署了一个ingress
(基于nginx) - 一些参考技术
处理列表的我们使用了一个模版引擎lua-resty-template 实际上可以不用,主要是比较懒,用模版可以省好多时间,xip 服务使用了一个开源的实现
https://github.com/peterhellberg/xip.name , 具体rpm 包参考https://github.com/rongfengliang/xip-rpm, 注册中心api 调用使用了lua-resty-http - 参考代码
nginx 配置
worker_processes 1;
user root;
events {
worker_connections 1024;
}
env REGISTRY_HOST;
env REGISTRY_PORT;
env XIP_DOMAIN;
env XIP_IP;
env WEB_DOMAIN;
http {
include mime.types;
default_type application/octet-stream;
gzip on;
resolver <dnsserver>;
lua_shared_dict servicelist 100m;
lua_code_cache off;
real_ip_header X-Forwarded-For;
lua_package_path '/opt/app/?.lua;;';
# fetch list
init_worker_by_lua_block {
local delay = 10 -- in seconds
local log = ngx.log
local ERR = ngx.ERR
local check
check = function(premature)
if not premature then
local eurake_services = require("ds-api/services")
local cjson = require("cjson.safe")
log(ERR, "invoke serrvice list")
local sharedservicelist = ngx.shared.servicelist;
local servicelist = eurake_services.servicelist()
sharedservicelist:set("servicelist", cjson.encode(servicelist))
end
end
if 0 == ngx.worker.id() then
local ok, err = ngx.timer.every(delay, check)
if not ok then
log(ERR, "failed to create timer: ", err)
return
end
end
}
server {
listen 80;
charset utf-8;
default_type text/html;
server_name *.xipservice;
location / {
add_header Access-Control-Allow-Origin "*";
add_header Access-Control-Allow-Credentials 'true';
add_header Access-Control-Allow-Methods 'GET, POST, PUT, DELETE, OPTIONS';
add_header Access-Control-Allow-Headers 'Accept,Authorization,Cache-Control,Content-Type,DNT,If-Modified-Since,Keep-Alive,Origin,User-Agent,X-Mx-ReqToken,X-Requested-With';
if ($request_method = OPTIONS){
return 200;
}
set_by_lua_block $my_host {
local ngx_re = require("ngx.re")
local proxy_server_host = ngx.var.host
local servicelist = require("ds-api/services")
ngx.log(ngx.ERR,"proxy_server_host "..proxy_server_host)
local from,to = string.find(proxy_server_host,[[xipip]])
local result = string.sub(proxy_server_host,1,from-2)
local service_name = result
ngx.log(ngx.ERR,"service_name "..service_name)
local server_address = servicelist.fetchinstance_backendip_cache(service_name) or "127.0.0.1"
ngx.log(ngx.ERR,"server_address:"..server_address)
return server_address
}
proxy_set_header Host $host;
proxy_pass http://$my_host;
}
}
server {
listen 80;
server_name <webpageforview>
charset utf-8;
default_type text/html;
set $template_root /usr/local/openresty/nginx/html/templates;
location / {
root html;
content_by_lua_block {
local template = require "resty.template"
local servicelist = require("ds-api/services").servicelist();
template.render("index.html", servicelist)
}
}
location /api/services/endpoint {
content_by_lua_block {
local cjson = require("cjson.safe")
local eurake_services = require("ds-api/services");
local servicename = ngx.req.get_uri_args()["servicename"]
local server_address = eurake_services.fetchinstance_backendip(servicename)
ngx.say(server_address)
}
}
location /api/services/endpointcache {
content_by_lua_block {
local cjson = require("cjson.safe")
local eurake_services = require("ds-api/services");
local servicename = ngx.req.get_uri_args()["servicename"]
local server_address = eurake_services.fetchinstance_backendip_cache(servicename)
ngx.say(server_address)
}
}
location /api/services {
content_by_lua_block {
local cjson = require("cjson.safe")
local servicelist = require("ds-api/services").servicelist();
ngx.say(cjson.encode(servicelist))
}
}
location /api/services/ips {
content_by_lua_block {
local cjson = require("cjson.safe")
local servicename = ngx.req.get_uri_args()["servicename"]
local servicelist = require("ds-api/services").serviceips(servicename);
ngx.say(cjson.encode(servicelist))
}
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
注册中心服务获取
-- get service list
local http = require "resty.http"
local json = require "cjson.safe"
local utils = require("ds-api/utils")
local registryuri = os.getenv("REGISTRY_HOST");
local registryport = os.getenv("REGISTRY_PORT");
local xipdomainname = os.getenv("XIP_DOMAIN")
-- get service list
local function get_service_lists()
ngx.log(ngx.ERR, "registryuri: ",registryuri)
ngx.log(ngx.ERR, "registryport: ",registryport)
local httpc = http:new()
--connect_timeout, send_timeout, read_timeout (in ms)
httpc:set_timeouts(3000, 10000, 10000)
local params = utils.build_eureka_apps()
local ok, err = httpc:connect(registryuri, registryport)
if err ~= nil then
ngx.log(ngx.ERR, "failed to connect eureka server: ",err)
return nil, err
end
local res, err = httpc:request(params)
if err ~= nil then
ngx.log(ngx.ERR, "failed to request eureka service list: ", err)
return nil, err
end
if res.status ~= 200 then
return nil, "bad response code: " .. res.status
end
local body = res:read_body()
local service = utils.parse_servicelist(body)
local ok, err = httpc:set_keepalive(http_max_idle_timeout, http_pool_size)
--local ok, err = httpc:set_keepalive()
if err ~= nil then
ngx.log(ngx.ERR, "eureka: failed to set keepalive for http client: ", err)
end
-- return ngx.say(json.encode(service))
return service
end
local function get_service_ips(servicename)
ngx.log(ngx.ERR, "registryuri: ",registryuri)
ngx.log(ngx.ERR, "registryport: ",registryport)
local httpc = http:new()
--connect_timeout, send_timeout, read_timeout (in ms)
httpc:set_timeouts(3000, 10000, 10000)
local params = utils.build_app_upips(servicename)
ngx.log(ngx.ERR, "servicename: ",servicename)
local ok, err = httpc:connect(registryuri, registryport)
if err ~= nil then
ngx.log(ngx.ERR, "failed to connect eureka server: ",err)
return nil, err
end
local res, err = httpc:request(params)
if err ~= nil then
ngx.log(ngx.ERR, "failed to request eureka service list: ", err)
return nil, err
end
if res.status ~= 200 then
return nil, "bad response code: " .. res.status
end
local body = res:read_body()
local service, err = utils.parse_service(body)
if err ~= nil then
ngx.log(ngx.ERR, "eureka : failed to parse eureka service response: ", s_ip, ":", s_port, " ", err)
return nil, err
end
local ok, err = httpc:set_keepalive(http_max_idle_timeout, http_pool_size)
--local ok, err = httpc:set_keepalive()
if err ~= nil then
ngx.log(ngx.ERR, "eureka: failed to set keepalive for http client: ", err)
end
-- return ngx.say(json.encode(service))
return service
end
local function fetchinstance_backendip_cache(instancename)
local ipadd = "127.0.0.1";
local serevicelists = json.decode(ngx.shared.servicelist:get("servicelist"))
for _, v in pairs(serevicelists) do
for _, instance in pairs(v) do
-- if instance.
if instance.instancename == instancename then
ipadd = instance.instance
break;
end
end
end
return ipadd
end
local function fetchinstance_backendip(instancename)
local ipadd = "127.0.0.1";
local serevicelists = get_service_lists();
for _, v in pairs(serevicelists) do
for _, instance in pairs(v) do
if instance.instancename == instancename then
ipadd = instance.instance
break;
end
end
end
return ipadd
end
return {
servicelist = get_service_lists,
serviceips = get_service_ips,
fetchinstance_backendip = fetchinstance_backendip,
fetchinstance_backendip_cache =fetchinstance_backendip_cache,
}
local json = require("cjson.safe")
local registryuri = os.getenv("REGISTRY_HOST")
local registryport = os.getenv("REGISTRY_PORT")
local xipdomainname = os.getenv("XIP_DOMAIN")
local xipip = os.getenv("XIP_IP")
local webdomain = os.getenv("WEB_DOMAIN")
-- build registry apps paramas
local function build_eureka_apps()
local uri = "/eureka/apps/"
local headers = {["Accept"]="application/json"}
local params = {path=uri, method="GET", headers=headers}
return params
end
-- build registry app available ips
local function build_app_upips(service_name)
local uri = "/eureka/apps/" .. service_name
local headers = {["Accept"]="application/json"}
local params = {path=uri, method="GET", headers=headers}
return params
end
-- parse service list
local function parse_servicelist(body)
local ok, res_json = pcall(function()
return json.decode(body)
end)
if not ok then
return nil, "JSON decode error"
end
local service = {}
-- service.upstreams = {}
for k, v in pairs(res_json.applications.application) do
local servicename = v["name"]
local serviceinstances =v["instance"]
service[servicename] = {}
for i, instance in pairs(serviceinstances) do
local status = instance["status"]
if status == "UP" then
local ipAddr = instance["ipAddr"]
local port = instance["port"]["$"]
local instancename = instance["instanceId"]
instancename = ngx.re.gsub(instancename,":","_")
instancename = ngx.re.gsub(instancename,"-","_")
local instance = ipAddr..":"..port
local xip_name = instancename
table.insert(service[servicename], {ip=ipAddr,instance=instance, port=port,instancename= instancename,servicename=servicename,xip = xip_name.."."..xipdomainname})
end
end
end
return service
end
-- parse service ips
local function parse_service(body)
local ok, res_json = pcall(function()
return json.decode(body)
end)
if not ok then
return nil, "JSON decode error"
end
local service = {}
local instances = res_json.application
local servicename = instances["name"]
service[servicename] ={};
local serviceinstances =instances["instance"]
for i, instance in pairs(serviceinstances) do
local status = instance["status"]
if status == "UP" then
local ipAddr = instance["ipAddr"]
local port = instance["port"]["$"]
local instancename = instance["instanceId"]
instancename = ngx.re.gsub(instancename,":","_")
instancename = ngx.re.gsub(instancename,"-","_")
local instance = ipAddr..":"..port
local xip_name = instancename
table.insert(service[servicename], {ip=ipAddr,instance=instance, port=port,instancename= instancename,servicename=servicename,xip = xip_name.."."..xipdomainname})
end
end
return service
end
-- fetch env config
local function envutils()
local registryuri = os.getenv("REGISTRY_HOST")
local registryport = os.getenv("REGISTRY_PORT")
local xipdomainname = os.getenv("XIP_DOMAIN")
local xipip = os.getenv("XIP_IP")
local webdomain = os.getenv("WEB_DOMAIN")
return {
registryuri = registryuri,
registryport = registryport,
xipip = xipip,
webdomain= webdomain,
}
end
-- return object for better request
return {
build_eureka_apps = build_eureka_apps,
build_app_upips = build_app_upips,
parse_servicelist = parse_servicelist,
parse_service = parse_service,
envutils = envutils,
}
首页模版
index.html
<h1>servielist</h1>
<ul>
{% for name, v in pairs(context) do %}
<li>{{name}}</li>
{% for _, v2 in pairs(v) do %}
{{v2.instance}} <a href="http://{{v2.xip}}" target="_blank">{{v2.instancename}}</a>
<br>
{% end %}
{% end %}
</ul>
一些说明
对于数据或者为了规避cosocket 的问题,使用了timer定时器执行任务,同时数据通过shared cache 共享,部分参数通过env 处理,完整代码没有完全
贴上,只是说明了基本的处理流程,对于xip 的使用可以参考相关资料
参考资料
https://github.com/peterhellberg/xip.name
https://github.com/rongfengliang/xip-rpm
https://www.cnblogs.com/rongfengliang/p/9797526.html
https://github.com/Netflix/eureka/wiki/Eureka-REST-operations