CloseableHttpClient未设置Socket超时导致线程池堵塞

 

线上使用 CompletableFuture.supplyAsync  来多线程下载影像,使用了默认的ForkJoinPool线程池。

项目运行一段时间后下载影像方法全部报错:

java.util.concurrent.TimeoutException

下载方法如下:

public File download(String url, String filename) throws IOException {
    File file = new File(parent, filename);
    try (CloseableHttpClient http = HttpClientBuilder.create().build();
         FileOutputStream fos = new FileOutputStream(file);
         CloseableHttpResponse response = http.execute(new HttpGet(url));) {
        int sc = response.getStatusLine().getStatusCode();
        if (sc == HttpStatus.SC_OK) {
            HttpEntity entity = response.getEntity();
            entity.writeTo(fos);
        } else {
            ...
        }
    }
    return file;
}

 

排查代码发现出现在 CompletableFuture.get() 步骤超时

CompletableFuture<List<File>> sequence = sequence(
        imgs.stream().map(p -> CompletableFuture.supplyAsync(() -> {
            ...
        }).exceptionally(e -> {
            ...
        })).collect(toList()));
try {
    return sequence.get(300L, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    ...
}

考虑到当影像量庞大时候下载速度问题,超时时间设置为5分钟之久。

这么久的超时时间设置,每次方法调用出现都出现TimeoutException问题,猜测是线程堵塞。

 

项目部署在docker容器,进入bash 。

jps -l // 获取项目进程ID

 

 由于使用了CompletableFuture 异步操作的默认线程池ForkJoinPool 

 使用jstack 获取项目堆栈日志。

jstack 1(项目进程ID) | grep ForkJoinPool -A 10

 获取到的日志部分如下所示。

"ForkJoinPool.commonPool-worker-1" #521 daemon prio=5 os_prio=0 tid=0x00007f4b901be800 nid=0x20e runnable [0x00007f4b5b693000]                                                                                                                                                                                 
                                                                                                                                                                                                                                                                                                               
   java.lang.Thread.State: RUNNABLE                                                                                                                                                                                                                                                                            
                                                                                                                                                                                                                                                                                                               
        at java.net.SocketInputStream.socketRead0(Native Method)                                                                                                                                                                                                                                               
                                                                                                                                                                                                                                                                                                               
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                                                                                               
        at java.net.SocketInputStream.read(SocketInputStream.java:171)                                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                                                               
        at java.net.SocketInputStream.read(SocketInputStream.java:141)                                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                                                               
        at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                                                               
        at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198)                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                                               
        at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)                                                                                                                                                                                                            
                                                                                                                                                                                                                                                                                                               
        at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:198)                                                                                                                                                                                                            
                                                                                                                                                                                                                                                                                                               
        at org.apache.http.entity.BasicHttpEntity.writeTo(BasicHttpEntity.java:116)                                                                                                                                                                                                                            
                                                                                                                                                                                                                                                                                                               
        at org.apache.http.impl.execchain.ResponseEntityProxy.writeTo(ResponseEntityProxy.java:101)                                                                                                                                                                                                            
                                                                                                                                                                                                                                                                                                               
        at net.ebaolife.tpa.common.util.XXX.download(XXX.java:57)                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                                               
        at net.ebaolife.tpa.common.util.XXX$BatchOp.lambda$null$0(XXX.java:93)                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                                                               
        at net.ebaolife.tpa.common.util.XXX$BatchOp$$Lambda$57/1027039641.get(Unknown Source)                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                                               
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)                                                                                                                                                                                                                 
                                                                                                                                                                                                                                                                                                               
        at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                                               
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                                                               
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                                                               
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)                                                                                                                                                                                                                                 
                                                                                                                                                                                                                                                                                                               
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 

几个ForkJoinPool的线程全部处于RUNNABLE 状态,而且都阻塞在java.net.SocketInputStream#socketRead0 方法上。

查阅资料发现该方法在请求远程服务时候不设置超时时间,由于网络原因很容易阻塞线程,具体原因不做详述。

发现问题后解决即可,设置上socket超时时间:

public File download(String url, String filename) throws IOException {
    File file = new File(parent, filename);
    RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(TIME_OUT)
            .setSocketTimeout(TIME_OUT).build();
    HttpGet httpGet = new HttpGet(url);
    httpGet.setConfig(requestConfig);
    try (CloseableHttpClient http = HttpClientBuilder.create().build();
         FileOutputStream fos = new FileOutputStream(file);
         CloseableHttpResponse response = http.execute(httpGet)) {
        int sc = response.getStatusLine().getStatusCode();
        if (sc == HttpStatus.SC_OK) {
            HttpEntity entity = response.getEntity();
            entity.writeTo(fos);
        } else {
           ...
    }
    return file;
}

线程池阻塞导致的问题还是很大的,特此记录,使用 CloseableHttpClient 的时候一定注意设置 socket超时时间 以及在使用完后及时关闭各个流,代码中使用到了try resource的方法来优雅关闭流。

一般类只要实现了 Closeable 接口即可使用这类方式。

public abstract class CloseableHttpClient implements HttpClient, Closeable {
    ...
}


public interface Closeable extends AutoCloseable {

    /**
     * Closes this stream and releases any system resources associated
     * with it. If the stream is already closed then invoking this
     * method has no effect.
     *
     * <p> As noted in {@link AutoCloseable#close()}, cases where the
     * close may fail require careful attention. It is strongly advised
     * to relinquish the underlying resources and to internally
     * <em>mark</em> the {@code Closeable} as closed, prior to throwing
     * the {@code IOException}.
     *
     * @throws IOException if an I/O error occurs
     */
    public void close() throws IOException;
}

 

上一篇:Docker安装RocketMQ以及使用


下一篇:消息队列 RocketMQ