利用CountDownLatch检查大数据抽取过程中数据是否完整

数据完整性

1.数据格式完整

2.数据大小完整

package com.dwz.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample6 {
    private static Random random = new Random(System.currentTimeMillis());
    
    static class Event {
        int id;

        public Event(int id) {
            this.id = id;
        }
    }
    
    interface Watcher {
        void done(Table table);
    }
    
    static class TaskBatch implements Watcher{
        private CountDownLatch countDownLatch;
        private TaskGroup taskGroup;
        
        public TaskBatch(int size, TaskGroup taskGroup) {
            this.countDownLatch = new CountDownLatch(size);
            this.taskGroup = taskGroup;
        }
        
        @Override
        public void done(Table table) {
            countDownLatch.countDown();
            if(countDownLatch.getCount() == 0) {
                System.out.println("The table " + table.tableName + " finished work, [" + table + "]");
                taskGroup.done(table);
            }
        }
        
    }
    
    static class TaskGroup implements Watcher{
        private CountDownLatch countDownLatch;
        private Event event;
        
        public TaskGroup(int size, Event event) {
            this.countDownLatch = new CountDownLatch(size);
            this.event = event;
        }

        @Override
        public void done(Table table) {
            countDownLatch.countDown();
            if(countDownLatch.getCount() == 0) {
                System.out.println("======All of table done in event : " + event.id);
            }
        }
        
    }
    
    static class Table {
        String tableName;
        long sourceRecordCount = 10;
        long targetCount;
        String sourceColumnSchema = "<table name = 'a'><column name = 'coll' type = 'varchar2'></table>";
        String targetColumnSchema = "";
        
        public Table(String tableName, long sourceRecordCount) {
            this.tableName = tableName;
            this.sourceRecordCount = sourceRecordCount;
        }
        
        @Override
        public String toString() {
            return "Table [tableName=" + tableName + ", sourceRecordCount=" + sourceRecordCount + ", targetCount="
                    + targetCount + ", sourceColumnSchema=" + sourceColumnSchema + ", targetColumnSchema="
                    + targetColumnSchema + "]";
        }
    }
    
    private static List<Table> capture(Event event) {
        List<Table> list = new ArrayList<Table>();
        for(int i = 0; i < 10; i++) {
            list.add(new Table("table-"+ event.id + "-" + i, i * 1000));
        }
        return list;
    }
    
    public static void main(String[] args) {
        Event[] events = {new Event(1), new Event(2)};
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        for(Event event : events) {
            List<Table> tables = capture(event);
            TaskGroup taskGroup = new TaskGroup(tables.size(), event);
            for(Table table : tables) {
                TaskBatch taskBatch = new TaskBatch(2, taskGroup);
                TrustSourceColumns columnsRunnable = new TrustSourceColumns(table, taskBatch);
                TrustSourceRecordCount recordCountRunnable = new TrustSourceRecordCount(table, taskBatch);
                
                executorService.submit(columnsRunnable);
                executorService.submit(recordCountRunnable);
            }
        }
        executorService.shutdown();
    }
    
    static class TrustSourceRecordCount implements Runnable {
        private final Table table;
        private final TaskBatch taskBatch;
        
        public TrustSourceRecordCount(Table table, TaskBatch taskBatch) {
            this.table = table;
            this.taskBatch = taskBatch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(random.nextInt(5000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            table.targetCount = table.sourceRecordCount;
            taskBatch.done(table);
//            System.out.println("The table " + table.tableName + " target record count capture done and update the data.");
        }
    }
    
    static class TrustSourceColumns implements Runnable {
        private final Table table;
        private final TaskBatch taskBatch;
        
        public TrustSourceColumns(Table table, TaskBatch taskBatch) {
            this.table = table;
            this.taskBatch = taskBatch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(random.nextInt(5000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            table.targetColumnSchema = table.sourceColumnSchema;
            taskBatch.done(table);
//            System.out.println("The table " + table.tableName + " target columns capture done and update the data.");
        }
    }
}

 

上一篇:CopyOnWriteArrayList(写入并复制) & CountDownLatch(闭锁)


下一篇:Java深入学习08:CountDownLatch应用