概述
关于dubbo的路由配置,可以查看官网,那么路由到底做了什么呢?起始就是根据一次服务请求,消费者根据路由配置决定调用哪些服务提供者,然后将对应的服务提供者进行负载均衡,集群容错。
路由规则调用流程
调用入口:AbstractClusterInvoker#invoke => List<Invoker<T>> invokers = list(invocation); => AbstractDirectory.list => RegisterDirectory.doList
@Override public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist)."); } if (multiGroup) { return this.invokers == null ? Collections.emptyList() : this.invokers; } List<Invoker<T>> invokers = null; try { // Get invokers from cache, only runtime routers will be executed. invokers = routerChain.route(getConsumerUrl(), invocation); } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); }
public List<Invoker<T>> route(URL url, Invocation invocation) { List<Invoker<T>> finalInvokers = invokers; for (Router router : routers) { finalInvokers = router.route(finalInvokers, url, invocation); } return finalInvokers; }
route方法将在下一节介绍,这里边的routers是哪边来的呢?
在对ZK的routers做了监听之后,有路由配置的变化,都会调到RegisterDirectory#notify => toRouters(routerURLs).ifPresent(this::addRouters);
private Optional<List<Router>> toRouters(List<URL> urls) { if (urls == null || urls.isEmpty()) { return Optional.empty(); } List<Router> routers = new ArrayList<>(); for (URL url : urls) { if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) { continue; } String routerType = url.getParameter(Constants.ROUTER_KEY); if (routerType != null && routerType.length() > 0) { url = url.setProtocol(routerType); } try { Router router = routerFactory.getRouter(url); if (!routers.contains(router)) { routers.add(router); } } catch (Throwable t) { logger.error("convert router url to router error, url: " + url, t); } } return Optional.of(routers); }
根据URL获取Router,最后调用addRouter把routers初始化。 这个router机制的大体流程介绍完毕。
接下来重点具体router的路由过程,再此之前,读者应该对路由的配置是相对熟悉了。
RouterFactory#getRouter 拿到的是 ConditionRouterFactory.getRouter => ConditionRouter 这边具体分析 ConditionRouter
public ConditionRouter(URL url) { this.url = url; //路由器优先级,在多个路由排序用的 this.priority = url.getParameter(Constants.PRIORITY_KEY, 0); //是否强制执行路由规则,哪怕没有合适的invoker this.force = url.getParameter(Constants.FORCE_KEY, false); this.enabled = url.getParameter(Constants.ENABLED_KEY, true); //通过rule key获取路由规则字串 init(url.getParameterAndDecoded(Constants.RULE_KEY)); } public void init(String rule) { try { if (rule == null || rule.trim().length() == 0) { throw new IllegalArgumentException("Illegal route rule!"); } //把字符串里的"consumer." "provider." 替换掉,方便解析 rule = rule.replace("consumer.", "").replace("provider.", ""); //以"=>"为分割线,前面是consumer规则,后面是provider 规则 int i = rule.indexOf("=>"); String whenRule = i < 0 ? null : rule.substring(0, i).trim(); String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim(); //parseRule 方法解析规则,放在Map<String, MatchPair>里 Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule); Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule); // NOTE: It should be determined on the business level whether the `When condition` can be empty or not. // NOTE: When条件是允许为空的,外部业务来保证类似的约束条件 //解析构造的规则放在condition变量里 this.whenCondition = when; this.thenCondition = then; } catch (ParseException e) { throw new IllegalStateException(e.getMessage(), e); } }
从init方法中,就是把路由规则解析成Map<String, MatchPair> 类型的 when和then变量中,其中when存放消费者有关的路由信息 then存放服务提供者的路由信息
以"host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 &method =sayHello => host = 1.2.3.4&host !=4.4.4.4"为例,debug的结果如下:
其中matches表示匹配的值,而mismatches表示不匹配的值。
接下来就是执行路由规则,总的来说就是让符合规则的调用方,可以调用, 让不符合规则的调用方不能调用。 让符合规则的服务提供方,留着服务提供者列表。 让不符合路由规则的服务提供方,从服务者列表中除去。
@Override public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (!enabled) { return invokers; } if (CollectionUtils.isEmpty(invokers)) { return invokers; } try { //前置条件不匹配,说明consumer不在限制之列。说明,路由不针对当前客户,这样就全部放行,所有提供者都可以调用。 //这是consumer的url if (!matchWhen(url, invocation)) { return invokers; } List<Invoker<T>> result = new ArrayList<Invoker<T>>(); //thenCondition为null表示拒绝一切请求 if (thenCondition == null) { logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey()); return result; } for (Invoker<T> invoker : invokers) { if (matchThen(invoker.getUrl(), url)) { result.add(invoker); } } if (!result.isEmpty()) { return result; } else if (force) { //force强制执行路由。哪怕result是空的,也要返回给上层方法。如果为false,最后放回所有的invokers,等于不执行路由 logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY)); return result; } } catch (Throwable t) { logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t); } return invokers; }