Dubbo 并发控制

消费端并发控制

<dubbo:reference id="userService" interface="com.test.UserServiceBo"
        group="dubbo" version="1.0.0" timeout="3000" actives="10"/>
  • 在服务消费方设置接口中每个方法并发请求个数,通过设置actives参数。
<dubbo:reference id="userService" interface="com.test.UserServiceBo"
        group="dubbo" version="1.0.0" timeout="3000">
                <dubbo:method name="sayHello" actives="10" />
</dubbo:reference>
  • 在服务消费方设置接口中的某个方法的并发请求个数,通过设置actives参数。


服务端并发控制

<dubbo:service interface="com.test.UserServiceBo" ref="userService"
            group="dubbo"  version="1.0.0" timeout="3000" executes="10"/>
  • 在服务提供方设置接口中每个方法的并发请求数,通过设置executes参数。
<dubbo:service interface="com.test.UserServiceBo" ref="userService"
            group="dubbo" version="1.0.0" timeout="3000" >
            <dubbo:method name="sayHello" executes="10" />
</dubbo:service>
  • 在服务提供方设置接口中某个方法的并发请求数,通过设置executes参数。


消费端并发限制 - ActiveLimitFilter

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        // 获取方法级别的并发限制的RpcStatus对象
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            // 通过synchronized和wait实现客户端并发限制超过时候需要等待直至超时。
            if (active >= max) {
                synchronized (count) {
                    while ((active = count.getActive()) >= max) {
                        try {
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                    + invoker.getInterface().getName() + ", method: "
                                    + invocation.getMethodName() + ", elapsed: " + elapsed
                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
                                    + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }

        try {
            long begin = System.currentTimeMillis();
            // 累加方法级别的并发数
            RpcStatus.beginCount(url, methodName);
            try {
                // 执行方法调用
                Result result = invoker.invoke(invocation);
                // 递减方法级别的并发数
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);

                return result;
            } catch (RuntimeException t) {
                // 递减方法级别的并发数
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            // 针对有并发限制的通过notify进行唤醒
            if (max > 0) {
                synchronized (count) {
                    count.notify();
                }
            }
        }
    }

}
  • 1、首先会去获得服务消费端每服务每方法最大可并行执行请求数。
  • 2、如果方法设置并发请求数就需要判断是否超并发数,超过并发数就等待直至超时。
  • 3、按照累加并发数、执行方法、递减并发数,最后进行唤醒炒作。
  • 4、 消费端设置actives时候会等待直至超时。


服务端并发限制 - ExecuteLimitFilter

@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) {
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
//            if (count.getActive() >= max) {
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
            executesLimit = count.getSemaphore(max);
            // 服务提供方设置并发数量后,如果同时请求数量大于了设置的executes的值,则会抛出异常。
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        RpcStatus.beginCount(url, methodName);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            // 如果需要获取过信号量就进行释放动作。
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }
}
  • 1、首先会去获得服务提供者每服务每方法最大可并行执行请求数。
  • 2、如果每服务每方法最大可并行执行请求数大于零,那么就基于基于服务 URL + 方法维度获取一个RpcStatus实例。
  • 3、通过RpcStatus实例获取一个信号量,若果获取的这个信号量调用tryAcquire返回false,则抛出异常。
  • 4、如果没有抛异常,那么久调用RpcStatus静态方法beginCount,给这个 URL + 方法维度开始计数。
  • 5、调用服务。
  • 6、调用结束后计数调用RpcStatus静态方法endCount,计数结束。
  • 7、释放信号量。
  • 8、需要注意的是,服务提供方设置并发数量后,如果同时请求数量大于了设置的executes的值,则会抛出异常。


并发限制实现核心 - RpcStatus

public class RpcStatus {
    // service级别的并发限制全局变量
    private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
    // method级别的并发限制全局变量
    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
    private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
    // 记录活跃的记录
    private final AtomicInteger active = new AtomicInteger();
    private final AtomicLong total = new AtomicLong();
    private final AtomicInteger failed = new AtomicInteger();
    private final AtomicLong totalElapsed = new AtomicLong();
    private final AtomicLong failedElapsed = new AtomicLong();
    private final AtomicLong maxElapsed = new AtomicLong();
    private final AtomicLong failedMaxElapsed = new AtomicLong();
    private final AtomicLong succeededMaxElapsed = new AtomicLong();
    // 用于记录服务提供端的限制
    private volatile Semaphore executesLimit;
    private volatile int executesPermits;

    private RpcStatus() {
    }

    // 获取service级别的并发限制变量
    public static RpcStatus getStatus(URL url) {
        String uri = url.toIdentityString();
        RpcStatus status = SERVICE_STATISTICS.get(uri);
        if (status == null) {
            SERVICE_STATISTICS.putIfAbsent(uri, new RpcStatus());
            status = SERVICE_STATISTICS.get(uri);
        }
        return status;
    }

    // 获取method级别的并发限制变量,根据service=>method顺序查找
    public static RpcStatus getStatus(URL url, String methodName) {
        String uri = url.toIdentityString();
        ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
        if (map == null) {
            METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
            map = METHOD_STATISTICS.get(uri);
        }
        RpcStatus status = map.get(methodName);
        if (status == null) {
            map.putIfAbsent(methodName, new RpcStatus());
            status = map.get(methodName);
        }
        return status;
    }

    // 累加service 和 method的并发限制变量
    public static void beginCount(URL url, String methodName) {
        beginCount(getStatus(url));
        beginCount(getStatus(url, methodName));
    }
    // 并发限制变量的原子累加
    private static void beginCount(RpcStatus status) {
        status.active.incrementAndGet();
    }
    // 递减service 和 method的并发限制变量
    public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
        endCount(getStatus(url), elapsed, succeeded);
        endCount(getStatus(url, methodName), elapsed, succeeded);
    }

    private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
        status.active.decrementAndGet();
        status.total.incrementAndGet();
        status.totalElapsed.addAndGet(elapsed);
        if (status.maxElapsed.get() < elapsed) {
            status.maxElapsed.set(elapsed);
        }
        if (succeeded) {
            if (status.succeededMaxElapsed.get() < elapsed) {
                status.succeededMaxElapsed.set(elapsed);
            }
        } else {
            status.failed.incrementAndGet();
            status.failedElapsed.addAndGet(elapsed);
            if (status.failedMaxElapsed.get() < elapsed) {
                status.failedMaxElapsed.set(elapsed);
            }
        }
    }

    public Semaphore getSemaphore(int maxThreadNum) {
        if(maxThreadNum <= 0) {
            return null;
        }

        if (executesLimit == null || executesPermits != maxThreadNum) {
            synchronized (this) {
                if (executesLimit == null || executesPermits != maxThreadNum) {
                    executesLimit = new Semaphore(maxThreadNum);
                    executesPermits = maxThreadNum;
                }
            }
        }

        return executesLimit;
    }
}
  • service级别的并发限制全局变量 private static final ConcurrentMap SERVICE_STATISTICS。
  • method级别的并发限制全局变量 private static final ConcurrentMap> METHOD_STATISTICS。
  • 记录活跃的记录 private final AtomicInteger active。
上一篇:三部门共建国家智能交通综合测试基地


下一篇:Dubbo Consumer 共享连接说明