内容概况:
异步执行配置相关:
asyncExecutorActivate:这个属性是激活作业执行器,它的默认参数是false,只有设为true,activiti启动的时候才会开启线程池去扫描定时操作的任务
asyncExecutorXXX:这些属性的操作都是基于asyncExecutor这样一个前缀,后面有各种类型的属性配置,(其实里面的属性配置大多都是与线程池、队列相关的配置)
(很重要的配置)asyncExecutor:asyncExecutor的bean的配置,它本身是一个接口,用它也可以配置自定义的线程池。
如何自定义一个线程池?
在设置了核心线程数的情况下,比如设为了5,那么在开启了五个线程之后,有新的任务来了之后,回去检测核心线程数是否都在执行任务中,如果是,那么新的请求就会放在队列里去等待,
等到核心线程中有一个线程执行完了自己的任务,那么排在队列最前面的这个请求,回去获取这个线程,然后去执行。那么当队列一直有新的请求加入时,负载过大,超过了队列大小的时候,最大线程数就会起作用了,新建线程至最大线程数,来一起执行超过容量的新请求。 当三个都满了以后,就会拒绝服务、请求。
作业执行器的配置:
这里要说明的是R5/P1DT1H, / 后面的指时间间隔,1D表示一天,是24个小时,而1H表示一小时加起来就是25个小时。R5是指执行次数为5,这个属性的含义是:在流程启动时开始计时,25小时后第一次执行这个指定事件, 并且每隔25小时执行一次,总共执行次数为5次(ps:包含第一次执行在内)。
流程定义文件my-process_job.bpmn20.xml的改变:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="processEngineConfiguration"
class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">
<!-- 给引擎设置自定义的commandInvoker -->
<!--<property name="commandInvoker" ref="commandInvoker" />-->
<!-- 若为true则开启记录事件、节点的状态,完成后将完成状态插入数据库,若为false则关闭,不记录 -->
<property name="enableDatabaseEventLogging" value="true"/> <!--打开异步激活器激活异步,如果不配置线程池就会使用它的默认线程池-->
<property name="asyncExecutorActivate" value="true"/>
<!--如果使用我们自己定义的线程池,需要先定义一个执行器-->
<property name="asyncExecutor" ref="asyncExecutor" />
<!-- 配置事件监听器 -->
<property name="eventListeners">
<list>
<bean class="com.yy.avtiviti.helloworld.event.JobEventListener"/>
</list>
</property>
</bean>
<!-- 执行器默认使用DefaultAsyncJobExecutor -->
<bean id="asyncExecutor"
class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor">
<!-- 需要配置一个服务 基于spring去配置它 -->
<property name="executorService" ref="executorService"/>
</bean>
<bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
<property name="threadNamePrefix" value="activiti-job-"/>
<property name="corePoolSize" value="10"/>
<property name="maxPoolSize" value="20"/>
<property name="queueCapacity" value="100"/>
<!-- 设置当线程池满了时候的拒绝策略,这里是使用的默认策略,抛出异常 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/>
</property>
</bean>
<bean id="commandInvoker" class="com.yy.avtiviti.helloworld.intercept.MDCCommandInvoker"/> </beans>
设置一个监听器JobEventListener,以便测试观察:
public class JobEventListener implements ActivitiEventListener {
private static final Logger LOGGER = LoggerFactory.getLogger(JobEventListener.class); //简单的完成一下监听器的效果
@Override
public void onEvent(ActivitiEvent event) {
ActivitiEventType eventType = event.getType();
String name = eventType.name(); if (name.startsWith("TIMER") || name.startsWith("JOB")){
LOGGER.info("监听到job事件 {} \t {}",eventType,event.getProcessInstanceId());
}
} @Override
public boolean isFailOnException() {
return false;
}
}
编写activiti配置文件activiti_job.cfg.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="processEngineConfiguration"
class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">
<!-- 给引擎设置自定义的commandInvoker -->
<!--<property name="commandInvoker" ref="commandInvoker" />-->
<!-- 若为true则开启记录事件、节点的状态,完成后将完成状态插入数据库,若为false则关闭,不记录 -->
<property name="enableDatabaseEventLogging" value="true"/> <!--打开异步激活器激活异步,如果不配置线程池就会使用它的默认线程池-->
<property name="asyncExecutorActivate" value="true"/>
<!--如果使用我们自己定义的线程池,需要先定义一个执行器-->
<property name="asyncExecutor" ref="asyncExecutor" />
<!-- 配置事件监听器 -->
<property name="eventListeners">
<list>
<bean class="com.yy.avtiviti.helloworld.event.JobEventListener"/>
</list>
</property>
</bean>
<!-- 执行器默认使用DefaultAsyncJobExecutor -->
<bean id="asyncExecutor"
class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor">
<!-- 需要配置一个服务 基于spring去配置它 -->
<property name="executorService" ref="executorService"/>
</bean>
<bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
<property name="threadNamePrefix" value="activiti-job-"/>
<property name="corePoolSize" value="10"/>
<property name="maxPoolSize" value="20"/>
<property name="queueCapacity" value="100"/>
<!-- 设置当线程池满了时候的拒绝策略,这里是使用的默认策略,抛出异常 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/>
</property>
</bean>
<bean id="commandInvoker" class="com.yy.avtiviti.helloworld.intercept.MDCCommandInvoker"/> </beans>
编写测试类configJobTest :
public class configJobTest {
private static final Logger LOGGER = LoggerFactory.getLogger(configTest.class); @Rule
public ActivitiRule activitiRule = new ActivitiRule("activiti_job.cfg.xml");//传入自定义的mdc配置文件 @Test
@Deployment(resources = {"my-process_job.bpmn20.xml"})//流程定义文件
public void test() throws InterruptedException {
//这里流程定义文件里设置了定时任务在一定事件内启动五次,所以不需要自行启动了。这里启动的代码就可以不要了。
//这里记录一下时间,看下流程每次启动时间与结束时间
LOGGER.info("start"); //在流程定义文件初始化以后,就开始定时启动了。
//那么我应该要查询一下在这段时间内有多少定时任务去执行
List<Job> jobList = activitiRule
.getManagementService()
.createTimerJobQuery()
.listPage(0, 100);
for (Job job : jobList) {
LOGGER.info("定时任务 {} ,默认重复次数 {}",job,job.getRetries());
}
LOGGER.info("jobList.size = {}",jobList.size());
//因为主线程很快就能执行完,而定时任务还没有执行,所以让线程等待一下
Thread.sleep(1000*10);
LOGGER.info("end"); }
}
测试结果如下: