使用Spring Boot在Java中发送异步HTTP请求

我正在开发一个需要连续测试1000个代理服务器的应用程序.该应用程序基于Spring Boot.

我正在使用的当前方法是@Async装饰的方法,该方法采用代理服务器并返回结果.

我经常收到OutOfMemory错误,处理速度很慢.我认为这是因为每个异步方法都是在一个单独的线程中执行的,该线程在I / O上受阻?

我到处都读到Java异步的知识,人们将线程中的并行执行与无阻塞IO混合在一起.在Python世界中,有一个异步库,该库在一个线程中执行I / O请求.当一个方法正在等待服务器的响应时,它开始执行其他方法.

我认为就我而言,我需要这样的东西,因为Spring的@Async不适合我.有人可以帮我消除困惑,并建议我如何应对这一挑战?

我想同时检查100个代理,而又不增加过多的负担.
我已经阅读过有关Apache Async HTTP Client的信息,但我不知道它是否合适?

这是我正在使用的线程池配置:

    public Executor proxyTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2 - 1);
        executor.setMaxPoolSize(100);
        executor.setDaemon(true);
        return executor;
    }

解决方法:

I am often getting OutOfMemory error and the processing is very slow.
I assume that is because each async method is executed in a separate
thread which blocks on I/O?

对于OOME,我将在第二点进行解释.
关于慢度,确实与请求/响应处理中执行的I / O有关.
问题来自有效并行运行的线程数.
使用您的实际配置,最大池数永远不会达到(我在下面解释原因).
假设您的情况下corePoolSize == 10.这意味着10个线程并行运行.假设每个线程运行大约3秒钟来测试站点.
这意味着您将在0.3秒内测试一个站点.要测试1000个站点,需要300秒.
它足够慢,并且时间的一个重要部分是等待时间:从当前测试站点发送/接收请求/响应的I / O.
为了提高整体速度,最初并行运行的线程可能要比核心容量多得多.这样,由于线程之间的调度将很频繁,因此I / O等待时间将不再是问题,因此您将在线程被暂停的同时进行一些I / O处理,而这些线程没有任何价值.

它应该可以处理OOME问题,并且可以大大改善执行时间,但是不能保证您会花很短的时间.
要实现它,您可能应该更精细地处理多线程逻辑,并依赖具有无阻塞IO的API /库.

the official documentation的一些信息应该会有所帮助.
这部分说明提交任务时的总体逻辑(重点是我的):

The configuration of the thread pool should also be considered in
light of the executor’s queue capacity. For the full description of
the relationship between pool size and queue capacity, see the
documentation for ThreadPoolExecutor. The main idea is that, when a
task is submitted, the executor first tries to use a free thread if
the number of active threads is currently less than the core size. If
the core size has been reached, the task is added to the queue, as
long as its capacity has not yet been reached. Only then, if the
queue’s capacity has been reached, does the executor create a new
thread beyond the core size. If the max size has also been reached,
then the executor rejects the task.

这解释了对队列大小的影响(重点仍然是我的):

By default, the queue is unbounded, but this is rarely the desired
configuration, because it can lead to OutOfMemoryErrors if enough
tasks are added to that queue while all pool threads are busy
.
Furthermore, if the queue is unbounded, the max size has no effect at
all. Since the executor always tries the queue before creating a new
thread beyond the core size, a queue must have a finite capacity for
the thread pool to grow beyond the core size (this is why a fixed-size
pool is the only sensible case when using an unbounded queue).

长话短说:您没有设置默认情况下*的队列大小(Integer.MAX_VALUE).因此,您将数百个任务填充到队列中,这些任务只会在很长时间之后弹出.这些任务占用大量内存,而OOME却上升了.

此外,如文档中所述,此设置对于*队列是无奈的,因为只有在队列已满时才会创建新线程:

executor.setMaxPoolSize(100);

使用相关值设置这两个信息更有意义:

public Executor proxyTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2 - 1);
    executor.setMaxPoolSize(100);
    executor.setQueueCapacity(100); 
    executor.setDaemon(true);
    return executor;
}

或者,也可以使用初始值和最大池大小相同的fixed-size pool

Rather than only a single size, an executor’s thread pool can have
different values for the core and the max size. If you provide a
single value, the executor has a fixed-size thread pool (the core and
max sizes are the same).

public Executor proxyTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(100);
    executor.setMaxPoolSize(100);
    executor.setDaemon(true);
    return executor;
}

还请注意,在不暂停的情况下调用异步服务的1000倍似乎很有害,因为它无法直接处理它们.您可能应该通过在它们之间执行thread.sleep()将这些调用分成较小的部分(2、3或更多).

上一篇:Asynchronous method in while loop


下一篇:【Selenium2】【项目实战】