java 多线程提高大数据量的读写效率

对于多线程来说,刚开始是比较蒙的,不了解其中的运行机制。
最近项目中需要用多线程解决一个加载缓慢的问题,特此写了一个例子,供大家参考,如有建议,请多指教,哈哈哈

那么,话不多说。
先说下需求:此接口供xxx公司调用,实现对数据库的读取和修改。而且是全量读取,不进行分页读取。(百万级别数据量)
那就要考虑下,如果将数据分批进行处理,看下哪里运行时间相对较长,将哪部分代码进行多线程处理。

注:程序员最重要的不是写代码,而是对问题的思考和拿出最好的解决方案,再写代码进行解决。

在此例中,将全量数据分批处理,每批5000条数据进行读取操作。
先上代码:

/**
 * 查询指定SN的IP值
 *
 * @return 结果
 */
@RequestMapping("/findIP")
@ResponseBody
public Map<String, Object> getIpFromSn(int flag) {

    Map<String, Object> map = new HashMap<String, Object>();
//      JSONObject json = null;
    try {
        //分批进行处理
        int total = this.routeInfoService.getRouteFindIPCount();
        int onet = 5000;
        int begt = 0;
        int Wtime = total / onet + 1;
        int totalCount = 1;
    	if (flag == 1) {
    		KEY_CHECK_DEAL_TYPE = "stop";
    	} else {
    		KEY_CHECK_DEAL_TYPE = "deal";
    	}
        for (int j = 1; j <= Wtime; j++) {
        	logger.info("当前的处理标识为{}"+KEY_CHECK_DEAL_TYPE);
        	if(KEY_CHECK_DEAL_TYPE.equals("deal")){
            begt = (j - 1) * onet;
            List<RouteCheckIp> infos = new ArrayList<RouteCheckIp>();
//              SimpleDateFormat begint = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//              Date beginti = begint.parse(begint.format(new Date()));
                //查询需要新增处理的SN号码
                List<RouteInfo> SnList = this.routeInfoService.selectRouteInfoSn(begt, onet);
                // 增加线程,处理循环中的每5000条数据
                int threadNum = THREAD_NUM;
                ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
                CountDownLatch countDownLatch = new CountDownLatch(threadNum);
                int perSize = SnList.size() / threadNum;
                for (int i = 0; i < threadNum; i++) {
                    MultiThread thread = new MultiThread();
                    thread.setRouteInfoList(SnList.subList(i * perSize, (i + 1) * perSize));
                    thread.setRedisReportUtil(redisReportUtil);
                    thread.setTotalCount(totalCount);
                    thread.setTotal(total);
                    thread.setRouteInfoService(routeInfoService);
                    thread.setJ(j);
                    thread.setCountDownLatch(countDownLatch);
                    executorService.submit(thread);
                }
                countDownLatch.await();
                executorService.shutdown();
                // 优化前
//                for (RouteInfo info : SnList) {
//                    String dateBySn = redisReportUtil.getString(CommonConstant.ROUTE_INFO_REPORT_REDIS_PRE + info.getRouteSn());
//                    if (!"".equals(dateBySn) && (dateBySn != null)) {
//                        RouteCheckIp rci = new RouteCheckIp();
//                        json = new JSONObject(dateBySn);
//                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//                        String str = json.isNull("routerIp") ? "" : json.get("routerIp").toString();
//                        int inter = 0;
//                        if(str==null || str.length()<2){
//                            inter=3;
//                        }else if (checkIp("192",str) || checkIp("10",str) || checkIp("100",str) || checkIp("172",str)) {
//                            inter = 0;
//                        }else{
//                            inter=1;
//                        }
//                        /**
//                    	 * 2018/6/27 新增redis中除去deviceList字段的所有字段
//                    	 * 开始
//                    	 */
//                        rci.setWanUpSpeed(json.isNull("wanUpSpeed") ? "" : json.get("wanUpSpeed").toString());
//                        rci.setReport_time_local(json.isNull("report_time_local") ? "" : json.get("report_time_local").toString());
//                        rci.setWifiUsernum(json.isNull("wifiUsernum") ? "" : json.get("wifiUsernum").toString());
//                        rci.setSystemUptime(json.isNull("systemUptime") ? "" : json.get("systemUptime").toString());
//                        rci.setLan1(json.isNull("lan1") ? "" : json.get("lan1").toString());
//                        rci.setLan2(json.isNull("lan2") ? "" : json.get("lan2").toString());
//                        rci.setLan3(json.isNull("lan3") ? "" : json.get("lan3").toString());
//                        rci.setLan4(json.isNull("lan4") ? "" : json.get("lan4").toString());
//                        rci.setMac(json.isNull("mac") ? "" : json.get("mac").toString());
//                        rci.setFreeRam(json.isNull("freeRam") ? "" : json.get("freeRam").toString());
//                        rci.setPriDns(json.isNull("priDns") ? "" : json.get("priDns").toString());
//                        rci.setGatewayMask(json.isNull("gatewayMask") ? "" : json.get("gatewayMask").toString());
//                        rci.setWanDownSpeed(json.isNull("wanDownSpeed") ? "" : json.get("wanDownSpeed").toString());
//                        rci.setUserNumber(json.isNull("userNumber") ? "" : json.get("userNumber").toString());
//                        rci.setSecDns(json.isNull("secDns") ? "" : json.get("secDns").toString());
//                        rci.setLinkNumber(json.isNull("linkNumber") ? "" : json.get("linkNumber").toString());
//                        rci.setWan(json.isNull("wan") ? "" : json.get("wan").toString());
//                        rci.setCpu(json.isNull("cpu") ? "" : json.get("cpu").toString());
//                        rci.setOnlineUsernum(json.isNull("onlineUsernum") ? "" : json.get("onlineUsernum").toString());
//                        rci.setUseRam(json.isNull("useRam") ? "" : json.get("useRam").toString());
//                    	/**
//                    	 * 2018/6/27 新增redis中除去deviceList字段的所有字段
//                    	 * 结束
//                    	 */
//                        rci.setRouteIp(json.isNull("routerIp") ? "" : json.get("routerIp").toString());
//                        rci.setInternet(inter);
//                        rci.setRouteSn(info.getRouteSn());
//                        rci.setVersion(json.isNull("version") ? "" : json.get("version").toString());
//                        rci.setGatewayIP(json.isNull("gatewayIP") ? "" : json.get("gatewayIP").toString());
//                        rci.setAdmuser(json.isNull("admuser") ? "" : json.get("admuser").toString());
//                        rci.setAdmpass(json.isNull("admpass") ? "" : json.get("admpass").toString());
//                        rci.setPppoeuser(json.isNull("pppoeuser") ? "" : json.get("pppoeuser").toString());
//                        rci.setPPPoepass(json.isNull("PPPoepass") ? "" : json.get("PPPoepass").toString());
//                        rci.setGetIFtime(sdf.format(System.currentTimeMillis()));
//                        infos.add(rci);
//
//                    } else {
//                        RouteCheckIp rci = new RouteCheckIp();
//                        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//                        rci.setRouteSn(info.getRouteSn().toString());
//                        rci.setInternet(2);
//                        rci.setGetIFtime(sdf1.format(System.currentTimeMillis()));
//                        infos.add(rci);
//                    }
//                    totalCount++;
//                }
//                this.routeInfoService.addRouteIpDate(infos);
//
//                SimpleDateFormat endt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//                Date endti = endt.parse(endt.format(new Date()));
//                logger.info("总计数:{},第{}个5000批处理结束,花费时间为:{}毫秒", totalCount, j, endti.getTime() - beginti.getTime());
            }
            else{
            	System.out.println("获取到停止指令!即将停止!");
            	break;
            	}	
            }

            map.put("result", "success");
            map.put("msg", "操作成功!");
            return map;
        } catch (Exception e) {
            logger.error("数据库操作失败!", e);
            map.put("result", "failure");
            map.put("msg", "数据库操作失败!");
            return map;
        }
    }

1、查看数据库中相应的表中数据量,很大,处理起来很慢,而且是嵌套for循环,运行速度不用多说,肯定会慢很多。
2、此例中,查询到的数据量为300万,分600批次进行处理,每批次5000条处理。本人是创建了20个线程,进行处理。读者可以根据自己的需求修改线程数,看是否可以效率更高。
3、多线程处理代码

 // 增加线程,处理循环中的每5000条数据
 int threadNum = THREAD_NUM;
  ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
  CountDownLatch countDownLatch = new CountDownLatch(threadNum);
  int perSize = SnList.size() / threadNum;
  for (int i = 0; i < threadNum; i++) {
      MultiThread thread = new MultiThread();
      thread.setRouteInfoList(SnList.subList(i * perSize, (i + 1) * perSize));
      thread.setRedisReportUtil(redisReportUtil);
      thread.setTotalCount(totalCount);
      thread.setTotal(total);
      thread.setRouteInfoService(routeInfoService);
      thread.setJ(j);
      thread.setCountDownLatch(countDownLatch);
      executorService.submit(thread);
  }
  countDownLatch.await();
  executorService.shutdown();

4、写对应的Tread类

	package com.cmcc.iot.util;		
	import com.cmcc.iot.consts.CommonConstant;
	import com.cmcc.iot.routeInfo.domain.RouteCheckIp;
	import com.cmcc.iot.routeInfo.domain.RouteInfo;
	import com.cmcc.iot.routeInfo.service.RouteInfoService;
	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import org.json.JSONObject;
	
	import java.text.SimpleDateFormat;
	import java.util.*;
	import java.util.concurrent.CountDownLatch;
	
	/**
	 * @ClassName MultiThread
	 * @Description 线程类:缩短处理routeInfo的时间
	 * @Date 2019/7/12 17:32
	 **/
	public class MultiThread extends Thread {
	
	    private Logger logger = LoggerFactory.getLogger(getClass());
	
	    private RedisReportUtil redisReportUtil;
	
	    private RouteInfoService routeInfoService;
	
	    private int totalCount;
	
	    // 第一层for循环执行次数
	    int j;
	
	    private static final  int TOTAL_ACOUNT = 5000;
	
	    private List<RouteInfo> routeInfoList;
	
	    private CountDownLatch countDownLatch;
	
	    // 表数据总数
	    private int total;
	
	    public void setJ(int j) {
	        this.j = j;
	    }
	
	    public void setRedisReportUtil(RedisReportUtil redisReportUtil) {
	        this.redisReportUtil = redisReportUtil;
	    }
	
	    public void setRouteInfoList(List<RouteInfo> routeInfoList) {
	        this.routeInfoList = routeInfoList;
	    }
	
	    public void setCountDownLatch(CountDownLatch countDownLatch) {
	        this.countDownLatch = countDownLatch;
	    }
	
	    public void setRouteInfoService(RouteInfoService routeInfoService) {
	        this.routeInfoService = routeInfoService;
	    }
	
	    public void setTotalCount(int totalCount) {
	        this.totalCount = totalCount;
	    }
	
	    public void setTotal(int total) {
	        this.total = total;
	    }
	
	    @Override
	    public void run() {
	        try {
	            Thread.sleep(1000);
	            forRouteInfoList();
	        } catch (Exception e) {
	            e.printStackTrace();
	        } finally {
	            if (countDownLatch != null) {
	                countDownLatch.countDown();
	            }
	        }
	    }
	
	    /**
	     * @Description: for循环查询routeInfo并处理
	     * @return:
	     * @Date: 2019/7/12
	     */
	    public void forRouteInfoList(){
	        try{
	            Map<String, Object> map = new HashMap<String, Object>();
	            JSONObject json = null;
	            SimpleDateFormat begint = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	            Date beginti = begint.parse(begint.format(new Date()));
	            int totalCount = 0;
	            List<RouteInfo> SnList = this.routeInfoList;
	            List<RouteCheckIp> infos = new ArrayList<RouteCheckIp>();
	            for (RouteInfo info : SnList) {
	                String dateBySn = this.redisReportUtil.getString(CommonConstant.ROUTE_INFO_REPORT_REDIS_PRE + info.getRouteSn());
	                if (!"".equals(dateBySn) && (dateBySn != null)) {
	                    RouteCheckIp rci = new RouteCheckIp();
	                    json = new JSONObject(dateBySn);
	                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	                    String str = json.isNull("routerIp") ? "" : json.get("routerIp").toString();
	                    int inter = 0;
	                    if (str == null || str.length() < 2){
	                        inter = 3;
	                    } else if (checkIp("192",str) || checkIp("10",str) || checkIp("100",str) || checkIp("172",str)) {
	                        inter = 0;
	                    } else {
	                        inter = 1;
	                    }
	                    // 2018/6/27 新增redis中除去deviceList字段的所有字段 --- 开始
	                    rci.setWanUpSpeed(json.isNull("wanUpSpeed") ? "" : json.get("wanUpSpeed").toString());
	                    rci.setReport_time_local(json.isNull("report_time_local") ? "" : json.get("report_time_local").toString());
	                    rci.setWifiUsernum(json.isNull("wifiUsernum") ? "" : json.get("wifiUsernum").toString());
	                    rci.setSystemUptime(json.isNull("systemUptime") ? "" : json.get("systemUptime").toString());
	                    rci.setLan1(json.isNull("lan1") ? "" : json.get("lan1").toString());
	                    rci.setLan2(json.isNull("lan2") ? "" : json.get("lan2").toString());
	                    rci.setLan3(json.isNull("lan3") ? "" : json.get("lan3").toString());
	                    rci.setLan4(json.isNull("lan4") ? "" : json.get("lan4").toString());
	                    rci.setMac(json.isNull("mac") ? "" : json.get("mac").toString());
	                    rci.setFreeRam(json.isNull("freeRam") ? "" : json.get("freeRam").toString());
	                    rci.setPriDns(json.isNull("priDns") ? "" : json.get("priDns").toString());
	                    rci.setGatewayMask(json.isNull("gatewayMask") ? "" : json.get("gatewayMask").toString());
	                    rci.setWanDownSpeed(json.isNull("wanDownSpeed") ? "" : json.get("wanDownSpeed").toString());
	                    rci.setUserNumber(json.isNull("userNumber") ? "" : json.get("userNumber").toString());
	                    rci.setSecDns(json.isNull("secDns") ? "" : json.get("secDns").toString());
	                    rci.setLinkNumber(json.isNull("linkNumber") ? "" : json.get("linkNumber").toString());
	                    rci.setWan(json.isNull("wan") ? "" : json.get("wan").toString());
	                    rci.setCpu(json.isNull("cpu") ? "" : json.get("cpu").toString());
	                    rci.setOnlineUsernum(json.isNull("onlineUsernum") ? "" : json.get("onlineUsernum").toString());
	                    rci.setUseRam(json.isNull("useRam") ? "" : json.get("useRam").toString());
	                    // 2018/6/27 新增redis中除去deviceList字段的所有字段 --- 结束
	                    rci.setRouteIp(json.isNull("routerIp") ? "" : json.get("routerIp").toString());
	                    rci.setInternet(inter);
	                    rci.setRouteSn(info.getRouteSn());
	                    rci.setVersion(json.isNull("version") ? "" : json.get("version").toString());
	                    rci.setGatewayIP(json.isNull("gatewayIP") ? "" : json.get("gatewayIP").toString());
	                    rci.setAdmuser(json.isNull("admuser") ? "" : json.get("admuser").toString());
	                    rci.setAdmpass(json.isNull("admpass") ? "" : json.get("admpass").toString());
	                    rci.setPppoeuser(json.isNull("pppoeuser") ? "" : json.get("pppoeuser").toString());
	                    rci.setPPPoepass(json.isNull("PPPoepass") ? "" : json.get("PPPoepass").toString());
	                    rci.setGetIFtime(sdf.format(System.currentTimeMillis()));
	                    infos.add(rci);
	                } else {
	                    RouteCheckIp rci = new RouteCheckIp();
	                    SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	                    rci.setRouteSn(info.getRouteSn().toString());
	                    rci.setInternet(2);
	                    rci.setGetIFtime(sdf1.format(System.currentTimeMillis()));
	                    infos.add(rci);
	                }
	                this.totalCount++;
	            }
	            this.routeInfoService.addRouteIpDate(infos);
	
	            SimpleDateFormat endt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	            Date endti = endt.parse(endt.format(new Date()));
	            if (TOTAL_ACOUNT * j < this.total) {
	                if (j == 1){
	                    total = TOTAL_ACOUNT;
	                } else {
	                    total = TOTAL_ACOUNT * j;
	                }
	            } else {
	                total = SnList.size();
	            }
	            logger.info("总计数:{},第{}个5000批处理结束,花费时间为:{}毫秒" ,total, this.j, endti.getTime() - beginti.getTime());
	        } catch (Exception e) {
	            logger.error("数据库操作失败!", e);
	        }
	    }
	
	    /**
	     * 校验IP
	     * @param prefix ip前缀
	     * @param ipAddr 完整ip地址
	     * @return
	     */
	    public boolean checkIp(String prefix, String ipAddr){
	        StringBuilder reg = new StringBuilder(prefix);
	        reg.append("(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}");
	        return ipAddr.matches(reg.toString());
	    }
	}

5、当创建线程结束,执行线程中的run()方法,那我们将自己需要处理的逻辑代码放在run()方法中,即可。

@Override
public void run() {
    try {
        Thread.sleep(1000);
        forRouteInfoList(); // 自己的处理逻辑代码,此处本人是封装成一个方法,直接调用
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (countDownLatch != null) {
            countDownLatch.countDown();
        }
    }
}

6、此例,调用接口从之前的6个小时,降低到50分钟,处理300万的数据和逻辑。

可见,多线程效率之高,对于数据量大级别的当然就要考虑用到多线程处理,节省代码运行时间,给用户以最好的体验。

希望通过这个示例,给当前在解决多线程怎么处理的程序猿们一个思路。
我是进阶的小强,大家一起2019年的爬坑历程。

上一篇:c# – 具有内部联接的SQL Server NULL值


下一篇:mysql – “is null”和“<=> NULL”之间的区别是什么