BlockingQueue 实现定时推送数据

 

package com...BlockingQueue;

import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import wfc.service.database.RecordSet;
import wfc.service.database.SQL;
import java.util.concurrent.*;


@Component
@EnableScheduling//可以在启动类上注解也可以在当前文件
public class BlockingQueues {

    private static ExecutorService service = Executors.newFixedThreadPool(10);
    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
//    private static RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();拒绝策略
//    private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, queue, handler);

    @Scheduled(cron = "0 0/1 * * * ?")//1分钟
    public void query(){
        doSting();
    }

    private static void doSting(){

        new Thread(new Runnable() {
            boolean flag = true;
            @Override
            public void run() {

                System.out.println("开始扫描未推送的办件到BlockingQueue....");
                String type = "0";
                String insertSql = "select * from DANGAN_FJ where TYPE = ? ";
                Object[] insertObject = new Object[] {type};
                RecordSet rs = SQL.execute(insertSql,insertObject);
                while (rs.next()){
                    String ST_FJ_ID = rs.getString("ST_FJ_ID");
                    try {
                        queue.put(ST_FJ_ID);
                    } catch (InterruptedException e) {
                        System.out.println("放入队列出异常...:"+e);
                        e.printStackTrace();
                    }
                }
                System.out.println("BlockingQueue队列里的办件--->:"+queue);

                while (flag) {
                    service.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                String ST_FJ_ID = queue.take();
                                String updateSql = "update dangan_fj set type =-1 where st_fj_id = ? ";
                                Object[] updateObject = new Object[]{ST_FJ_ID};
                                RecordSet updateRs = SQL.execute(updateSql, updateObject);
                                int number = updateRs.TOTAL_RECORD_COUNT;
                                //影响行数
                                System.out.println("办件过期更改影响行数:  " + number + "   办件编号为:" + ST_FJ_ID+"  处理线程:"+Thread.currentThread().getName());

                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                    if(queue.isEmpty()){
                        flag =false;
                    }
                }
            }
        },"").start();

    }


}

 

上一篇:POJ 3187 Backward Digit Sums


下一篇:POJ.3666 Making the Grade(DP,构造 )