disruptor无锁队列实现流水记录

目录

1 无锁机制简介

2 RingBuffer简介

2.1 工作原理简介

3 disruptor实现流水异步入库

3.1 定义事件实体类

3.2 定义事件服务类

3.3 定义消费者

3.3.1 单任务处理

3.3.2 批处理

3.4 运行

4 pom依赖

5 参考


1 无锁机制简介

普通队列写入时需要通过锁机制避免并发,disruptor不用锁,使用CAS(Compare And Swap/Set)操作确保线程安全,这是一个CPU级别的指令,工作方式类似乐观锁。

2 RingBuffer简介

Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首尾相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。

RingBuffer中分离了读指针和写指针,从而使生产者和消费者互不干扰,两者可以完全并发执行,从而使性能达到数倍于传统基于互斥锁方式实现的消息队列模型。

RingBuffer保持数组元素永远有效,入队列直接覆盖旧的数据,相比普通数组队列,无需GC。

2.1 工作原理简介

disruptor的是基于事件实现的,那么就有了生产者(provider)和消费者(consumer)存在,生产者生产元素放入数组中,消费者从数组中消费元素,这个数组就是RingBuffer。每一个生产者和消费者内部都会有一个私有指针pri-sequence,表示当前操作的元素序号,同时RingBuffer内部也会有一个全局指针global-sequence指向最后一个可以被消费的元素。这样当生产者需要放数据时,只需要获取global-sequence的下一个位置,下一个位置如果还未被消费,那么就会进入等待策略,如果下一个位置已经被消费,那么就会直接覆盖当前位置的属性值。

当生产者需要向容器中存放数据时,只需要使用sequence%(数组长度-1)就可以得到要添加的元素应该放在哪儿个位置上,这样就实现了数组的首尾相连。

disruptor初始化时需要指定容器大小,容器大小指定为2^n,计算时可以可以使用位运算:

如果容器大小是8,要放12号元素。12%8 = 12 &(8-1)=1100&0111=0100=4。

使用位运算可以提升效率。

3 disruptor实现流水异步入库

3.1 定义事件实体类

LogEvent作为队列RingBuffer中的元数据

import java.io.Serializable;
import java.util.Date;

public class LogEvent implements Serializable {

	private static final long serialVersionUID = 1L;

	private String userId;// char(32)
	private String rspCd;// char(2)
	private String rspMsg;// varchar(128)
	private Date transCrtTs;// timestamp(3)
	private Date transCfmTs;// timestamp(3)

	public LogEvent() {
		this.userId = "";
		this.rspCd = "";
		this.rspMsg = "";
		this.transCrtTs = new Date();
		this.transCfmTs = new Date();
	}

	public String getUserId() {
		return userId;
	}

	public void setUserId(String userId) {
		if (userId == null) {
			return;
		}
		this.userId = userId;
	}

	public String getRspCd() {
		return rspCd;
	}

	public void setRspCd(String rspCd) {
		if (rspCd == null) {
			return;
		}
		this.rspCd = rspCd;
	}

	public String getRspMsg() {
		return rspMsg;
	}

	public void setRspMsg(String rspMsg) {
		if (rspMsg == null) {
			return;
		}
		this.rspMsg = rspMsg;
	}

	public Date getTransCrtTs() {
		return transCrtTs;
	}

	public void setTransCrtTs(Date transCrtTs) {
		if (transCrtTs == null) {
			return;
		}
		this.transCrtTs = transCrtTs;
	}

	public Date getTransCfmTs() {
		return transCfmTs;
	}

	public void setTransCfmTs(Date transCfmTs) {
		if (transCfmTs == null) {
			return;
		}
		this.transCfmTs = transCfmTs;
	}

	@Override
	public String toString() {
		StringBuffer stringBuffer = new StringBuffer();
		stringBuffer.append("LogEvent{");
		stringBuffer.append("userId=");
		stringBuffer.append(userId);
		stringBuffer.append(", rspCd=");
		stringBuffer.append(rspCd);
		stringBuffer.append(", rspMsg=");
		stringBuffer.append(rspMsg);
		stringBuffer.append(", transCrtTs=");
		stringBuffer.append(transCrtTs);
		stringBuffer.append(", transCfmTs=");
		stringBuffer.append(transCfmTs);
		return stringBuffer.toString();
	}
}

3.2 定义事件服务类

LogEventService中,初始化队列RingBuffer,为生产者提供接口。

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.beanutils.BeanUtils;

import java.util.concurrent.Executors;

public class LogEventService {
    private static LogEventService instance;
    private static int RING_BUFFER_SIZE = 1024 * 1024;
    private RingBuffer<LogEvent> ringBuffer;
    private LogEventHandler logEventHandler;

    /**
     * 构造函数,调用初始化队列
     */
    public LogEventService() {
        initRingBuffer();
    }

    /**
     * 创建service实例
     * @return LogEventService
     */
    public static LogEventService getInstance() {
        if (instance == null) {
            instance = new LogEventService();
        }
        return instance;
    }

    /**
     * 初始化队列
     */
    private void initRingBuffer() {
        try {
            logEventHandler = new LogEventHandler();
            Disruptor<LogEvent> disruptor = new Disruptor<>(EVENT_FACTORY, RING_BUFFER_SIZE, Executors.defaultThreadFactory());
            disruptor.handleEventsWith(logEventHandler);
            ringBuffer = disruptor.start();
        } catch (Exception e) {

        }
    }

    /**
     * 生产者入队列接口
     * @param log LogEvent
     */
    public void publish(LogEvent log) {
        long sequence = ringBuffer.next();
        try {
            LogEvent ringValue = ringBuffer.get(sequence);
            BeanUtils.copyProperties(ringValue, log);//复制对象中的所有属性
        } catch (Exception e) {

        } finally {
            ringBuffer.publish(sequence);
        }
    }

    /**
     * 初始化填充队列,提前分配内存,降低GC
     */
    public final EventFactory<LogEvent> EVENT_FACTORY =
            new EventFactory<LogEvent>() {
                @Override
                public LogEvent newInstance() {
                    return new LogEvent();
                }
            };

}

3.3 定义消费者

3.3.1 单任务处理

import com.lmax.disruptor.EventHandler;

import com.dto.LogEvent;
import com.task.LogTask;

public class LogEventHandler implements EventHandler<LogEvent> {
    
    /**
     * 消费队列
     * @param log 队列中的任务
     * @param sequence 当前消费到的队列位置
     * @param endOfBatch 是否为RingBuffer内存片中的最后一块
     */
    @Override
    public void onEvent(LogEvent log, long sequence, boolean endOfBatch) {
        LogTask logTask = new LogTask();        
        logTask.process(log);//调用相关服务
    }
}

3.3.2 批处理

使用批处理方式,消费队列中的对象,调用相关服务

import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import java.util.List;

import com.dto.LogEvent;
import com.task.LogTask;

public class LogEventHandler implements EventHandler<LogEvent> {

    private final static int DB_BATCH_SIZE = 100;
    private final static int RING_BATCH_SIZE = 1024;
    private List<Object> cache = Lists.newArrayList();

    /**
     * 消费队列,批处理
     * @param log 队列中的任务
     * @param sequence 当前消费到的队列位置
     * @param endOfBatch 是否为RingBuffer内存片中的最后一块
     */
    @Override
    public void onEvent(LogEvent log, long sequence, boolean endOfBatch) {
        cache.add(log);
        LogTask logTask = new LogTask();
        if ((sequence + 1) % DB_BATCH_SIZE == 0) {
            logTask.process(cache);//调用相关服务
            cache.clear();
        }
        if (endOfBatch) {
            if ((sequence + 1) % RING_BATCH_SIZE != 0) {
                logTask.process(cache);//调用相关服务
                cache.clear();
            }
        }
    }
}

3.4 运行

创建2个生产线程,测试生产和消费过程。

import java.util.Date;

import com.dto.LogEvent;

public class DisruptorTest {

    private final static int THREAD_NUM = 2;//生产者线程数
    private final static int TASK_NUM = 10000;//每个生产者生产任务的数量

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_NUM; i++) {
            new DisruptorThread().start();//创建并启动生产者线程
        }
    }

    /**
     * 生产者单线程执行任务
     */
    private static class DisruptorThread extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < TASK_NUM; i++) {
                LogEvent log = initLogEvent();
                LogEventService.getInstance().publish(log);
            }
        }
    }

    /**
     * 创建log对象
     *
     * @return LogEvent
     */
    private static LogEvent initLogEvent() {
        LogEvent log = new LogEvent();
        log.setUserId("123456789");
        log.setRspCd("00");
        log.setRspMsg("成功");
        log.setTransCrtTs(new Date());
        log.setTransCfmTs(new Date());
        return log;
    }
}

4 pom依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.1</version>
</dependency>

5 参考

Disruptordisruptor无锁队列实现流水记录http://www.manongjc.com/detail/22-eslcjjgowuksoks.htmlJava多线程之Disruptor入门disruptor无锁队列实现流水记录https://www.jb51.net/article/211039.htm利用disruptor DB批量存储disruptor无锁队列实现流水记录https://blog.csdn.net/hanbaoqi99/article/details/78954915

上一篇:Java版人脸跟踪三部曲之三:编码实战


下一篇:我是怎么把业务代码越写越复杂的 _ MVP - MVVM - Clean Architecture