对于多线程来说,刚开始是比较蒙的,不了解其中的运行机制。
最近项目中需要用多线程解决一个加载缓慢的问题,特此写了一个例子,供大家参考,如有建议,请多指教,哈哈哈
那么,话不多说。
先说下需求:此接口供xxx公司调用,实现对数据库的读取和修改。而且是全量读取,不进行分页读取。(百万级别数据量)
那就要考虑下,如果将数据分批进行处理,看下哪里运行时间相对较长,将哪部分代码进行多线程处理。
注:程序员最重要的不是写代码,而是对问题的思考和拿出最好的解决方案,再写代码进行解决。
在此例中,将全量数据分批处理,每批5000条数据进行读取操作。
先上代码:
/**
* 查询指定SN的IP值
*
* @return 结果
*/
@RequestMapping("/findIP")
@ResponseBody
public Map<String, Object> getIpFromSn(int flag) {
Map<String, Object> map = new HashMap<String, Object>();
// JSONObject json = null;
try {
//分批进行处理
int total = this.routeInfoService.getRouteFindIPCount();
int onet = 5000;
int begt = 0;
int Wtime = total / onet + 1;
int totalCount = 1;
if (flag == 1) {
KEY_CHECK_DEAL_TYPE = "stop";
} else {
KEY_CHECK_DEAL_TYPE = "deal";
}
for (int j = 1; j <= Wtime; j++) {
logger.info("当前的处理标识为{}"+KEY_CHECK_DEAL_TYPE);
if(KEY_CHECK_DEAL_TYPE.equals("deal")){
begt = (j - 1) * onet;
List<RouteCheckIp> infos = new ArrayList<RouteCheckIp>();
// SimpleDateFormat begint = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Date beginti = begint.parse(begint.format(new Date()));
//查询需要新增处理的SN号码
List<RouteInfo> SnList = this.routeInfoService.selectRouteInfoSn(begt, onet);
// 增加线程,处理循环中的每5000条数据
int threadNum = THREAD_NUM;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
int perSize = SnList.size() / threadNum;
for (int i = 0; i < threadNum; i++) {
MultiThread thread = new MultiThread();
thread.setRouteInfoList(SnList.subList(i * perSize, (i + 1) * perSize));
thread.setRedisReportUtil(redisReportUtil);
thread.setTotalCount(totalCount);
thread.setTotal(total);
thread.setRouteInfoService(routeInfoService);
thread.setJ(j);
thread.setCountDownLatch(countDownLatch);
executorService.submit(thread);
}
countDownLatch.await();
executorService.shutdown();
// 优化前
// for (RouteInfo info : SnList) {
// String dateBySn = redisReportUtil.getString(CommonConstant.ROUTE_INFO_REPORT_REDIS_PRE + info.getRouteSn());
// if (!"".equals(dateBySn) && (dateBySn != null)) {
// RouteCheckIp rci = new RouteCheckIp();
// json = new JSONObject(dateBySn);
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// String str = json.isNull("routerIp") ? "" : json.get("routerIp").toString();
// int inter = 0;
// if(str==null || str.length()<2){
// inter=3;
// }else if (checkIp("192",str) || checkIp("10",str) || checkIp("100",str) || checkIp("172",str)) {
// inter = 0;
// }else{
// inter=1;
// }
// /**
// * 2018/6/27 新增redis中除去deviceList字段的所有字段
// * 开始
// */
// rci.setWanUpSpeed(json.isNull("wanUpSpeed") ? "" : json.get("wanUpSpeed").toString());
// rci.setReport_time_local(json.isNull("report_time_local") ? "" : json.get("report_time_local").toString());
// rci.setWifiUsernum(json.isNull("wifiUsernum") ? "" : json.get("wifiUsernum").toString());
// rci.setSystemUptime(json.isNull("systemUptime") ? "" : json.get("systemUptime").toString());
// rci.setLan1(json.isNull("lan1") ? "" : json.get("lan1").toString());
// rci.setLan2(json.isNull("lan2") ? "" : json.get("lan2").toString());
// rci.setLan3(json.isNull("lan3") ? "" : json.get("lan3").toString());
// rci.setLan4(json.isNull("lan4") ? "" : json.get("lan4").toString());
// rci.setMac(json.isNull("mac") ? "" : json.get("mac").toString());
// rci.setFreeRam(json.isNull("freeRam") ? "" : json.get("freeRam").toString());
// rci.setPriDns(json.isNull("priDns") ? "" : json.get("priDns").toString());
// rci.setGatewayMask(json.isNull("gatewayMask") ? "" : json.get("gatewayMask").toString());
// rci.setWanDownSpeed(json.isNull("wanDownSpeed") ? "" : json.get("wanDownSpeed").toString());
// rci.setUserNumber(json.isNull("userNumber") ? "" : json.get("userNumber").toString());
// rci.setSecDns(json.isNull("secDns") ? "" : json.get("secDns").toString());
// rci.setLinkNumber(json.isNull("linkNumber") ? "" : json.get("linkNumber").toString());
// rci.setWan(json.isNull("wan") ? "" : json.get("wan").toString());
// rci.setCpu(json.isNull("cpu") ? "" : json.get("cpu").toString());
// rci.setOnlineUsernum(json.isNull("onlineUsernum") ? "" : json.get("onlineUsernum").toString());
// rci.setUseRam(json.isNull("useRam") ? "" : json.get("useRam").toString());
// /**
// * 2018/6/27 新增redis中除去deviceList字段的所有字段
// * 结束
// */
// rci.setRouteIp(json.isNull("routerIp") ? "" : json.get("routerIp").toString());
// rci.setInternet(inter);
// rci.setRouteSn(info.getRouteSn());
// rci.setVersion(json.isNull("version") ? "" : json.get("version").toString());
// rci.setGatewayIP(json.isNull("gatewayIP") ? "" : json.get("gatewayIP").toString());
// rci.setAdmuser(json.isNull("admuser") ? "" : json.get("admuser").toString());
// rci.setAdmpass(json.isNull("admpass") ? "" : json.get("admpass").toString());
// rci.setPppoeuser(json.isNull("pppoeuser") ? "" : json.get("pppoeuser").toString());
// rci.setPPPoepass(json.isNull("PPPoepass") ? "" : json.get("PPPoepass").toString());
// rci.setGetIFtime(sdf.format(System.currentTimeMillis()));
// infos.add(rci);
//
// } else {
// RouteCheckIp rci = new RouteCheckIp();
// SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// rci.setRouteSn(info.getRouteSn().toString());
// rci.setInternet(2);
// rci.setGetIFtime(sdf1.format(System.currentTimeMillis()));
// infos.add(rci);
// }
// totalCount++;
// }
// this.routeInfoService.addRouteIpDate(infos);
//
// SimpleDateFormat endt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Date endti = endt.parse(endt.format(new Date()));
// logger.info("总计数:{},第{}个5000批处理结束,花费时间为:{}毫秒", totalCount, j, endti.getTime() - beginti.getTime());
}
else{
System.out.println("获取到停止指令!即将停止!");
break;
}
}
map.put("result", "success");
map.put("msg", "操作成功!");
return map;
} catch (Exception e) {
logger.error("数据库操作失败!", e);
map.put("result", "failure");
map.put("msg", "数据库操作失败!");
return map;
}
}
1、查看数据库中相应的表中数据量,很大,处理起来很慢,而且是嵌套for循环,运行速度不用多说,肯定会慢很多。
2、此例中,查询到的数据量为300万,分600批次进行处理,每批次5000条处理。本人是创建了20个线程,进行处理。读者可以根据自己的需求修改线程数,看是否可以效率更高。
3、多线程处理代码
// 增加线程,处理循环中的每5000条数据
int threadNum = THREAD_NUM;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
int perSize = SnList.size() / threadNum;
for (int i = 0; i < threadNum; i++) {
MultiThread thread = new MultiThread();
thread.setRouteInfoList(SnList.subList(i * perSize, (i + 1) * perSize));
thread.setRedisReportUtil(redisReportUtil);
thread.setTotalCount(totalCount);
thread.setTotal(total);
thread.setRouteInfoService(routeInfoService);
thread.setJ(j);
thread.setCountDownLatch(countDownLatch);
executorService.submit(thread);
}
countDownLatch.await();
executorService.shutdown();
4、写对应的Tread类
package com.cmcc.iot.util;
import com.cmcc.iot.consts.CommonConstant;
import com.cmcc.iot.routeInfo.domain.RouteCheckIp;
import com.cmcc.iot.routeInfo.domain.RouteInfo;
import com.cmcc.iot.routeInfo.service.RouteInfoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.json.JSONObject;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
/**
* @ClassName MultiThread
* @Description 线程类:缩短处理routeInfo的时间
* @Date 2019/7/12 17:32
**/
public class MultiThread extends Thread {
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisReportUtil redisReportUtil;
private RouteInfoService routeInfoService;
private int totalCount;
// 第一层for循环执行次数
int j;
private static final int TOTAL_ACOUNT = 5000;
private List<RouteInfo> routeInfoList;
private CountDownLatch countDownLatch;
// 表数据总数
private int total;
public void setJ(int j) {
this.j = j;
}
public void setRedisReportUtil(RedisReportUtil redisReportUtil) {
this.redisReportUtil = redisReportUtil;
}
public void setRouteInfoList(List<RouteInfo> routeInfoList) {
this.routeInfoList = routeInfoList;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public void setRouteInfoService(RouteInfoService routeInfoService) {
this.routeInfoService = routeInfoService;
}
public void setTotalCount(int totalCount) {
this.totalCount = totalCount;
}
public void setTotal(int total) {
this.total = total;
}
@Override
public void run() {
try {
Thread.sleep(1000);
forRouteInfoList();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
}
/**
* @Description: for循环查询routeInfo并处理
* @return:
* @Date: 2019/7/12
*/
public void forRouteInfoList(){
try{
Map<String, Object> map = new HashMap<String, Object>();
JSONObject json = null;
SimpleDateFormat begint = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date beginti = begint.parse(begint.format(new Date()));
int totalCount = 0;
List<RouteInfo> SnList = this.routeInfoList;
List<RouteCheckIp> infos = new ArrayList<RouteCheckIp>();
for (RouteInfo info : SnList) {
String dateBySn = this.redisReportUtil.getString(CommonConstant.ROUTE_INFO_REPORT_REDIS_PRE + info.getRouteSn());
if (!"".equals(dateBySn) && (dateBySn != null)) {
RouteCheckIp rci = new RouteCheckIp();
json = new JSONObject(dateBySn);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String str = json.isNull("routerIp") ? "" : json.get("routerIp").toString();
int inter = 0;
if (str == null || str.length() < 2){
inter = 3;
} else if (checkIp("192",str) || checkIp("10",str) || checkIp("100",str) || checkIp("172",str)) {
inter = 0;
} else {
inter = 1;
}
// 2018/6/27 新增redis中除去deviceList字段的所有字段 --- 开始
rci.setWanUpSpeed(json.isNull("wanUpSpeed") ? "" : json.get("wanUpSpeed").toString());
rci.setReport_time_local(json.isNull("report_time_local") ? "" : json.get("report_time_local").toString());
rci.setWifiUsernum(json.isNull("wifiUsernum") ? "" : json.get("wifiUsernum").toString());
rci.setSystemUptime(json.isNull("systemUptime") ? "" : json.get("systemUptime").toString());
rci.setLan1(json.isNull("lan1") ? "" : json.get("lan1").toString());
rci.setLan2(json.isNull("lan2") ? "" : json.get("lan2").toString());
rci.setLan3(json.isNull("lan3") ? "" : json.get("lan3").toString());
rci.setLan4(json.isNull("lan4") ? "" : json.get("lan4").toString());
rci.setMac(json.isNull("mac") ? "" : json.get("mac").toString());
rci.setFreeRam(json.isNull("freeRam") ? "" : json.get("freeRam").toString());
rci.setPriDns(json.isNull("priDns") ? "" : json.get("priDns").toString());
rci.setGatewayMask(json.isNull("gatewayMask") ? "" : json.get("gatewayMask").toString());
rci.setWanDownSpeed(json.isNull("wanDownSpeed") ? "" : json.get("wanDownSpeed").toString());
rci.setUserNumber(json.isNull("userNumber") ? "" : json.get("userNumber").toString());
rci.setSecDns(json.isNull("secDns") ? "" : json.get("secDns").toString());
rci.setLinkNumber(json.isNull("linkNumber") ? "" : json.get("linkNumber").toString());
rci.setWan(json.isNull("wan") ? "" : json.get("wan").toString());
rci.setCpu(json.isNull("cpu") ? "" : json.get("cpu").toString());
rci.setOnlineUsernum(json.isNull("onlineUsernum") ? "" : json.get("onlineUsernum").toString());
rci.setUseRam(json.isNull("useRam") ? "" : json.get("useRam").toString());
// 2018/6/27 新增redis中除去deviceList字段的所有字段 --- 结束
rci.setRouteIp(json.isNull("routerIp") ? "" : json.get("routerIp").toString());
rci.setInternet(inter);
rci.setRouteSn(info.getRouteSn());
rci.setVersion(json.isNull("version") ? "" : json.get("version").toString());
rci.setGatewayIP(json.isNull("gatewayIP") ? "" : json.get("gatewayIP").toString());
rci.setAdmuser(json.isNull("admuser") ? "" : json.get("admuser").toString());
rci.setAdmpass(json.isNull("admpass") ? "" : json.get("admpass").toString());
rci.setPppoeuser(json.isNull("pppoeuser") ? "" : json.get("pppoeuser").toString());
rci.setPPPoepass(json.isNull("PPPoepass") ? "" : json.get("PPPoepass").toString());
rci.setGetIFtime(sdf.format(System.currentTimeMillis()));
infos.add(rci);
} else {
RouteCheckIp rci = new RouteCheckIp();
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
rci.setRouteSn(info.getRouteSn().toString());
rci.setInternet(2);
rci.setGetIFtime(sdf1.format(System.currentTimeMillis()));
infos.add(rci);
}
this.totalCount++;
}
this.routeInfoService.addRouteIpDate(infos);
SimpleDateFormat endt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date endti = endt.parse(endt.format(new Date()));
if (TOTAL_ACOUNT * j < this.total) {
if (j == 1){
total = TOTAL_ACOUNT;
} else {
total = TOTAL_ACOUNT * j;
}
} else {
total = SnList.size();
}
logger.info("总计数:{},第{}个5000批处理结束,花费时间为:{}毫秒" ,total, this.j, endti.getTime() - beginti.getTime());
} catch (Exception e) {
logger.error("数据库操作失败!", e);
}
}
/**
* 校验IP
* @param prefix ip前缀
* @param ipAddr 完整ip地址
* @return
*/
public boolean checkIp(String prefix, String ipAddr){
StringBuilder reg = new StringBuilder(prefix);
reg.append("(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}");
return ipAddr.matches(reg.toString());
}
}
5、当创建线程结束,执行线程中的run()方法,那我们将自己需要处理的逻辑代码放在run()方法中,即可。
@Override
public void run() {
try {
Thread.sleep(1000);
forRouteInfoList(); // 自己的处理逻辑代码,此处本人是封装成一个方法,直接调用
} catch (Exception e) {
e.printStackTrace();
} finally {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
}
6、此例,调用接口从之前的6个小时,降低到50分钟,处理300万的数据和逻辑。
可见,多线程效率之高,对于数据量大级别的当然就要考虑用到多线程处理,节省代码运行时间,给用户以最好的体验。
希望通过这个示例,给当前在解决多线程怎么处理的程序猿们一个思路。
我是进阶的小强,大家一起2019年的爬坑历程。