Java多线程下载同一个文件的实现,断点续传(Range)

如何实现多线程来下载一个文件

多线程现在一些场景还是可以起到加速下载的作用的,例如github下载文件的时候有时候只有100kb/s或者更低,但是github下载服务是支持断点续传的。所谓的断点续传就是服务端支持返回所需文件的指定部分。

举个栗子,master.zip 总10000字节数,我们只需要5001-10000的部分,
我们可以通过设置请求头的方式来告诉服务端我们需要哪部分

Range: bytes=5001-10000

所涉及知识

  • 线程池
  • io流
  • HttpURLConnection

获取getHttpURLConnection 方法

public HttpURLConnection getHttpURLConnection(String fileUrl, HashMap<String, String> requestProperty) {
        HttpsURLConnection urlCon = null;
        try {
            SSLContext sslcontext = SSLContext.getInstance("SSL", "SunJSSE");
            sslcontext.init(null, new TrustManager[]{new X509TrustUtiil()}, new java.security.SecureRandom());
            URL url = new URL(fileUrl);
            HostnameVerifier ignoreHostnameVerifier = new HostnameVerifier() {
                @Override
                public boolean verify(String s, SSLSession sslsession) {
                    log.warn("Hostname is not matched for cert.");
                    return true;
                }
            };
            HttpsURLConnection.setDefaultHostnameVerifier(ignoreHostnameVerifier);
            HttpsURLConnection.setDefaultSSLSocketFactory(sslcontext.getSocketFactory());
            urlCon = (HttpsURLConnection) url.openConnection();
            urlCon.setConnectTimeout(20000);
            urlCon.setReadTimeout(20000);
            //通用的参数
            urlCon.setRequestProperty("Accept-Encoding", "identity");
            //设置请求参数
            if (null != requestProperty) {
                Set<String> strings = requestProperty.keySet();
                for (String key : strings) {
                    urlCon.setRequestProperty(key, requestProperty.get(key));
                }
            }
            return urlCon;
        } catch (Exception e) {
          if( null != urlCon ) {
              urlCon.disconnect();
              urlCon = null;
          }
        }finally {
            return urlCon;
        }

    }

多线程调用逻辑

private void multiThreadDownload(long contentLength, String fileLocal, HttpServletRequest request,String url) throws Exception {
        log.info("准备多线程下载任务,线程数->" + THREAD_NUMBER + " 任务总大小->" + contentLength);
        long point = contentLength / THREAD_NUMBER + 1;
        RandomAccessFile file = new RandomAccessFile(fileLocal, "rw");
        //设置本地文件的大小
        file.setLength(contentLength);
        file.close();
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUMBER);
        ArrayList<MyRunable> myRunables = new ArrayList<>();
        for (int i = 0; i < THREAD_NUMBER; i++) {
            RandomAccessFile f = new RandomAccessFile(fileLocal, "rw");
            long start = i * point;
            f.seek(start); //将操作文件的位置指向start位置
            long end = start + point;
            long curPoint = end > contentLength ? contentLength - start : point;
            MyRunable myRunable = new MyRunable(start, curPoint, f, countDownLatch,url);
            myRunables.add(myRunable); //记录Runable,稍后会统计进度用到
            mThreadPoolExecutor.execute(myRunable); //将任务提交到线程池运行
        }
        long sum = 0;
        while (countDownLatch.getCount()!=0) {
            //在规定的最大超时时间内没完成就取消任务
            if( (System.currentTimeMillis() - mStartTime) > mTaskTimeout ) {
                log.info("超时取消任务");
                cancelled = true;
            }
            sum = 0;
            for (MyRunable t : myRunables) {
                sum += t.length;
            }
            Thread.sleep(1000);
            long process = (sum * 100) / contentLength;
            log.info("当前任务下载进度->" + process);
            request.getSession().setAttribute("downProcess", String.valueOf(process));
        }
        countDownLatch.await();
        if(cancelled) {
            throw new Exception();
        }
        log.info("多线程下载任务完成");
    }
    class MyRunable implements Runnable {
        //当前线程的下载位置
        private long startPos;
        //定义当前线程负责下载的文件大小
        private long currentPartSize;
        //当前线程需要下载的文件块
        private RandomAccessFile currentPart;
        //定义该线程已下载的字节数
        public long length = 0;
        private CountDownLatch countDownLatch;
        private String mUrl;
        private int mReconnectCount = 0;

        public MyRunable(long start, long currentPartSize, RandomAccessFile currentPart, CountDownLatch countDownLatch,String url){
            this.startPos = start;
            this.currentPartSize = currentPartSize;
            this.currentPart = currentPart;
            this.countDownLatch = countDownLatch;
            this.mUrl = url;
        }

        @Override
        public void run() throws NullPointerException{
            System.out.println(Thread.currentThread().getName() + "执行任务,起点->" + startPos + "负责大小->" + currentPartSize);
            SSLContext sslcontext = null;
            InputStream in = null;
            HttpURLConnection urlCon = null;
            try {
                HashMap<String, String> property = new HashMap<>();

                property.put("Range","bytes=" + startPos + "-" + (startPos + currentPartSize));
                log.info(property.get("Range"));
                urlCon = getHttpURLConnection(mUrl, property);
                if( null == urlCon ) {
                    while(mReconnectCount < 2 && null != urlCon) {
                        urlCon = getHttpURLConnection(mUrl, property);
                        mReconnectCount++;
                    }
                    if( null == urlCon ) throw new Exception("多线程任务连接失败");
                }
                int code = urlCon.getResponseCode();
                System.out.println(code);
                //这里需要注意一下,断开续传的连接方式服务端传递回来的状态码是206。我们自己写服务端的时候也需要注意。
                if (code != HttpURLConnection.HTTP_OK && code != 206) {
                    throw new Exception("文件读取失败");
                }
                log.info("连接完成");
                // 读文件流

                in = urlCon.getInputStream();
                byte[] buffer = new byte[1024 * 10];
                int count = 0;
                while (!cancelled && length < currentPartSize && (count = in.read(buffer)) > 0) {
                    currentPart.write(buffer, 0, count);
                    this.length += count;
                }
                log.info((cancelled==true ? "该任务取消" : "流读取完毕") + Thread.currentThread().getId());
            } catch (Exception e) {
                e.printStackTrace();
                cancelled = true;
                throw new NullPointerException("单线程下载异常,终止当前任务");

            } finally {
                try {
                    if ( null != urlCon ) {
                        urlCon.disconnect();
                        urlCon = null;
                    }
                    currentPart.close();
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                countDownLatch.countDown();
                log.info(Thread.currentThread().getName() + "执行完毕!");
            }


        }
    }

记录下遇到的问题

  • 由于一开始不明白原理,在网上找了个栗子没仔细看,栗子是每次获取完整的文件流,通过inputStream.skip(int); 方法跳过不要的内容。后来发现这个skip方法并不能直接跳到需要的对应位置,只是会阻塞到这里等到读到对应的位置上,然后再继续执行,这样用多线程只会多做很多无用功。根本没有用到支持下载指定部分的特性。
  • 采用断点续传的方式,也就是服务端只返回指定的部分。服务端给出的响应code是206!!
  • 添加Range: bytes=start-end 返回start-end之间的部分。bytes=start- 返回start- 文件结尾
  • Accept-Ranges 可以通过这个头属性判断是否支持断点续传
上一篇:Educational Codeforces Round 121 (Rated for Div. 2)思路分享


下一篇:QT 程序调用MATLAB的执行文件