如何实现多线程来下载一个文件
多线程现在一些场景还是可以起到加速下载的作用的,例如github下载文件的时候有时候只有100kb/s或者更低,但是github下载服务是支持断点续传的。所谓的断点续传就是服务端支持返回所需文件的指定部分。
举个栗子,master.zip 总10000字节数,我们只需要5001-10000的部分,
我们可以通过设置请求头的方式来告诉服务端我们需要哪部分
Range: bytes=5001-10000
所涉及知识
- 线程池
- io流
- HttpURLConnection
获取getHttpURLConnection 方法
public HttpURLConnection getHttpURLConnection(String fileUrl, HashMap<String, String> requestProperty) {
HttpsURLConnection urlCon = null;
try {
SSLContext sslcontext = SSLContext.getInstance("SSL", "SunJSSE");
sslcontext.init(null, new TrustManager[]{new X509TrustUtiil()}, new java.security.SecureRandom());
URL url = new URL(fileUrl);
HostnameVerifier ignoreHostnameVerifier = new HostnameVerifier() {
@Override
public boolean verify(String s, SSLSession sslsession) {
log.warn("Hostname is not matched for cert.");
return true;
}
};
HttpsURLConnection.setDefaultHostnameVerifier(ignoreHostnameVerifier);
HttpsURLConnection.setDefaultSSLSocketFactory(sslcontext.getSocketFactory());
urlCon = (HttpsURLConnection) url.openConnection();
urlCon.setConnectTimeout(20000);
urlCon.setReadTimeout(20000);
//通用的参数
urlCon.setRequestProperty("Accept-Encoding", "identity");
//设置请求参数
if (null != requestProperty) {
Set<String> strings = requestProperty.keySet();
for (String key : strings) {
urlCon.setRequestProperty(key, requestProperty.get(key));
}
}
return urlCon;
} catch (Exception e) {
if( null != urlCon ) {
urlCon.disconnect();
urlCon = null;
}
}finally {
return urlCon;
}
}
多线程调用逻辑
private void multiThreadDownload(long contentLength, String fileLocal, HttpServletRequest request,String url) throws Exception {
log.info("准备多线程下载任务,线程数->" + THREAD_NUMBER + " 任务总大小->" + contentLength);
long point = contentLength / THREAD_NUMBER + 1;
RandomAccessFile file = new RandomAccessFile(fileLocal, "rw");
//设置本地文件的大小
file.setLength(contentLength);
file.close();
CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUMBER);
ArrayList<MyRunable> myRunables = new ArrayList<>();
for (int i = 0; i < THREAD_NUMBER; i++) {
RandomAccessFile f = new RandomAccessFile(fileLocal, "rw");
long start = i * point;
f.seek(start); //将操作文件的位置指向start位置
long end = start + point;
long curPoint = end > contentLength ? contentLength - start : point;
MyRunable myRunable = new MyRunable(start, curPoint, f, countDownLatch,url);
myRunables.add(myRunable); //记录Runable,稍后会统计进度用到
mThreadPoolExecutor.execute(myRunable); //将任务提交到线程池运行
}
long sum = 0;
while (countDownLatch.getCount()!=0) {
//在规定的最大超时时间内没完成就取消任务
if( (System.currentTimeMillis() - mStartTime) > mTaskTimeout ) {
log.info("超时取消任务");
cancelled = true;
}
sum = 0;
for (MyRunable t : myRunables) {
sum += t.length;
}
Thread.sleep(1000);
long process = (sum * 100) / contentLength;
log.info("当前任务下载进度->" + process);
request.getSession().setAttribute("downProcess", String.valueOf(process));
}
countDownLatch.await();
if(cancelled) {
throw new Exception();
}
log.info("多线程下载任务完成");
}
class MyRunable implements Runnable {
//当前线程的下载位置
private long startPos;
//定义当前线程负责下载的文件大小
private long currentPartSize;
//当前线程需要下载的文件块
private RandomAccessFile currentPart;
//定义该线程已下载的字节数
public long length = 0;
private CountDownLatch countDownLatch;
private String mUrl;
private int mReconnectCount = 0;
public MyRunable(long start, long currentPartSize, RandomAccessFile currentPart, CountDownLatch countDownLatch,String url){
this.startPos = start;
this.currentPartSize = currentPartSize;
this.currentPart = currentPart;
this.countDownLatch = countDownLatch;
this.mUrl = url;
}
@Override
public void run() throws NullPointerException{
System.out.println(Thread.currentThread().getName() + "执行任务,起点->" + startPos + "负责大小->" + currentPartSize);
SSLContext sslcontext = null;
InputStream in = null;
HttpURLConnection urlCon = null;
try {
HashMap<String, String> property = new HashMap<>();
property.put("Range","bytes=" + startPos + "-" + (startPos + currentPartSize));
log.info(property.get("Range"));
urlCon = getHttpURLConnection(mUrl, property);
if( null == urlCon ) {
while(mReconnectCount < 2 && null != urlCon) {
urlCon = getHttpURLConnection(mUrl, property);
mReconnectCount++;
}
if( null == urlCon ) throw new Exception("多线程任务连接失败");
}
int code = urlCon.getResponseCode();
System.out.println(code);
//这里需要注意一下,断开续传的连接方式服务端传递回来的状态码是206。我们自己写服务端的时候也需要注意。
if (code != HttpURLConnection.HTTP_OK && code != 206) {
throw new Exception("文件读取失败");
}
log.info("连接完成");
// 读文件流
in = urlCon.getInputStream();
byte[] buffer = new byte[1024 * 10];
int count = 0;
while (!cancelled && length < currentPartSize && (count = in.read(buffer)) > 0) {
currentPart.write(buffer, 0, count);
this.length += count;
}
log.info((cancelled==true ? "该任务取消" : "流读取完毕") + Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
cancelled = true;
throw new NullPointerException("单线程下载异常,终止当前任务");
} finally {
try {
if ( null != urlCon ) {
urlCon.disconnect();
urlCon = null;
}
currentPart.close();
in.close();
} catch (IOException e) {
e.printStackTrace();
}
countDownLatch.countDown();
log.info(Thread.currentThread().getName() + "执行完毕!");
}
}
}
记录下遇到的问题
- 由于一开始不明白原理,在网上找了个栗子没仔细看,栗子是每次获取完整的文件流,通过inputStream.skip(int); 方法跳过不要的内容。后来发现这个skip方法并不能直接跳到需要的对应位置,只是会阻塞到这里等到读到对应的位置上,然后再继续执行,这样用多线程只会多做很多无用功。根本没有用到支持下载指定部分的特性。
- 采用断点续传的方式,也就是服务端只返回指定的部分。服务端给出的响应code是206!!
- 添加Range: bytes=start-end 返回start-end之间的部分。bytes=start- 返回start- 文件结尾
- Accept-Ranges 可以通过这个头属性判断是否支持断点续传