目录
1 无锁机制简介
普通队列写入时需要通过锁机制避免并发,disruptor不用锁,使用CAS(Compare And Swap/Set)操作确保线程安全,这是一个CPU级别的指令,工作方式类似乐观锁。
2 RingBuffer简介
Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首尾相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。
RingBuffer中分离了读指针和写指针,从而使生产者和消费者互不干扰,两者可以完全并发执行,从而使性能达到数倍于传统基于互斥锁方式实现的消息队列模型。
RingBuffer保持数组元素永远有效,入队列直接覆盖旧的数据,相比普通数组队列,无需GC。
2.1 工作原理简介
disruptor的是基于事件实现的,那么就有了生产者(provider)和消费者(consumer)存在,生产者生产元素放入数组中,消费者从数组中消费元素,这个数组就是RingBuffer。每一个生产者和消费者内部都会有一个私有指针pri-sequence,表示当前操作的元素序号,同时RingBuffer内部也会有一个全局指针global-sequence
指向最后一个可以被消费的元素。这样当生产者需要放数据时,只需要获取global-sequence
的下一个位置,下一个位置如果还未被消费,那么就会进入等待策略,如果下一个位置已经被消费,那么就会直接覆盖当前位置的属性值。
当生产者需要向容器中存放数据时,只需要使用sequence%(数组长度-1)
就可以得到要添加的元素应该放在哪儿个位置上,这样就实现了数组的首尾相连。
disruptor初始化时需要指定容器大小,容器大小指定为2^n,计算时可以可以使用位运算:
如果容器大小是8,要放12号元素。12%8 = 12 &(8-1)=1100&0111=0100=4。
使用位运算可以提升效率。
3 disruptor实现流水异步入库
3.1 定义事件实体类
LogEvent作为队列RingBuffer中的元数据
import java.io.Serializable;
import java.util.Date;
public class LogEvent implements Serializable {
private static final long serialVersionUID = 1L;
private String userId;// char(32)
private String rspCd;// char(2)
private String rspMsg;// varchar(128)
private Date transCrtTs;// timestamp(3)
private Date transCfmTs;// timestamp(3)
public LogEvent() {
this.userId = "";
this.rspCd = "";
this.rspMsg = "";
this.transCrtTs = new Date();
this.transCfmTs = new Date();
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
if (userId == null) {
return;
}
this.userId = userId;
}
public String getRspCd() {
return rspCd;
}
public void setRspCd(String rspCd) {
if (rspCd == null) {
return;
}
this.rspCd = rspCd;
}
public String getRspMsg() {
return rspMsg;
}
public void setRspMsg(String rspMsg) {
if (rspMsg == null) {
return;
}
this.rspMsg = rspMsg;
}
public Date getTransCrtTs() {
return transCrtTs;
}
public void setTransCrtTs(Date transCrtTs) {
if (transCrtTs == null) {
return;
}
this.transCrtTs = transCrtTs;
}
public Date getTransCfmTs() {
return transCfmTs;
}
public void setTransCfmTs(Date transCfmTs) {
if (transCfmTs == null) {
return;
}
this.transCfmTs = transCfmTs;
}
@Override
public String toString() {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("LogEvent{");
stringBuffer.append("userId=");
stringBuffer.append(userId);
stringBuffer.append(", rspCd=");
stringBuffer.append(rspCd);
stringBuffer.append(", rspMsg=");
stringBuffer.append(rspMsg);
stringBuffer.append(", transCrtTs=");
stringBuffer.append(transCrtTs);
stringBuffer.append(", transCfmTs=");
stringBuffer.append(transCfmTs);
return stringBuffer.toString();
}
}
3.2 定义事件服务类
LogEventService中,初始化队列RingBuffer,为生产者提供接口。
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.beanutils.BeanUtils;
import java.util.concurrent.Executors;
public class LogEventService {
private static LogEventService instance;
private static int RING_BUFFER_SIZE = 1024 * 1024;
private RingBuffer<LogEvent> ringBuffer;
private LogEventHandler logEventHandler;
/**
* 构造函数,调用初始化队列
*/
public LogEventService() {
initRingBuffer();
}
/**
* 创建service实例
* @return LogEventService
*/
public static LogEventService getInstance() {
if (instance == null) {
instance = new LogEventService();
}
return instance;
}
/**
* 初始化队列
*/
private void initRingBuffer() {
try {
logEventHandler = new LogEventHandler();
Disruptor<LogEvent> disruptor = new Disruptor<>(EVENT_FACTORY, RING_BUFFER_SIZE, Executors.defaultThreadFactory());
disruptor.handleEventsWith(logEventHandler);
ringBuffer = disruptor.start();
} catch (Exception e) {
}
}
/**
* 生产者入队列接口
* @param log LogEvent
*/
public void publish(LogEvent log) {
long sequence = ringBuffer.next();
try {
LogEvent ringValue = ringBuffer.get(sequence);
BeanUtils.copyProperties(ringValue, log);//复制对象中的所有属性
} catch (Exception e) {
} finally {
ringBuffer.publish(sequence);
}
}
/**
* 初始化填充队列,提前分配内存,降低GC
*/
public final EventFactory<LogEvent> EVENT_FACTORY =
new EventFactory<LogEvent>() {
@Override
public LogEvent newInstance() {
return new LogEvent();
}
};
}
3.3 定义消费者
3.3.1 单任务处理
import com.lmax.disruptor.EventHandler;
import com.dto.LogEvent;
import com.task.LogTask;
public class LogEventHandler implements EventHandler<LogEvent> {
/**
* 消费队列
* @param log 队列中的任务
* @param sequence 当前消费到的队列位置
* @param endOfBatch 是否为RingBuffer内存片中的最后一块
*/
@Override
public void onEvent(LogEvent log, long sequence, boolean endOfBatch) {
LogTask logTask = new LogTask();
logTask.process(log);//调用相关服务
}
}
3.3.2 批处理
使用批处理方式,消费队列中的对象,调用相关服务
import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import java.util.List;
import com.dto.LogEvent;
import com.task.LogTask;
public class LogEventHandler implements EventHandler<LogEvent> {
private final static int DB_BATCH_SIZE = 100;
private final static int RING_BATCH_SIZE = 1024;
private List<Object> cache = Lists.newArrayList();
/**
* 消费队列,批处理
* @param log 队列中的任务
* @param sequence 当前消费到的队列位置
* @param endOfBatch 是否为RingBuffer内存片中的最后一块
*/
@Override
public void onEvent(LogEvent log, long sequence, boolean endOfBatch) {
cache.add(log);
LogTask logTask = new LogTask();
if ((sequence + 1) % DB_BATCH_SIZE == 0) {
logTask.process(cache);//调用相关服务
cache.clear();
}
if (endOfBatch) {
if ((sequence + 1) % RING_BATCH_SIZE != 0) {
logTask.process(cache);//调用相关服务
cache.clear();
}
}
}
}
3.4 运行
创建2个生产线程,测试生产和消费过程。
import java.util.Date;
import com.dto.LogEvent;
public class DisruptorTest {
private final static int THREAD_NUM = 2;//生产者线程数
private final static int TASK_NUM = 10000;//每个生产者生产任务的数量
public static void main(String[] args) {
for (int i = 0; i < THREAD_NUM; i++) {
new DisruptorThread().start();//创建并启动生产者线程
}
}
/**
* 生产者单线程执行任务
*/
private static class DisruptorThread extends Thread {
@Override
public void run() {
for (int i = 0; i < TASK_NUM; i++) {
LogEvent log = initLogEvent();
LogEventService.getInstance().publish(log);
}
}
}
/**
* 创建log对象
*
* @return LogEvent
*/
private static LogEvent initLogEvent() {
LogEvent log = new LogEvent();
log.setUserId("123456789");
log.setRspCd("00");
log.setRspMsg("成功");
log.setTransCrtTs(new Date());
log.setTransCfmTs(new Date());
return log;
}
}
4 pom依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.1</version>
</dependency>
5 参考
Disruptorhttp://www.manongjc.com/detail/22-eslcjjgowuksoks.htmlJava多线程之Disruptor入门https://www.jb51.net/article/211039.htm利用disruptor DB批量存储https://blog.csdn.net/hanbaoqi99/article/details/78954915