Spring Integration 配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jpa="http://www.springframework.org/schema/integration/jpa" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/jpa http://www.springframework.org/schema/integration/jpa/spring-integration-jpa-2.2.xsd "> <int-jpa:inbound-channel-adapter auto-startup="true" entity-manager="em" send-timeout="60000" channel="process.channel" expect-single-result="true" jpa-query="SELECT sysdate FROM dual"> <int:poller fixed-delay="60000" /> </int-jpa:inbound-channel-adapter> <int:channel id="process.channel"> <int:queue capacity="1"/> </int:channel> <int:chain input-channel="process.channel"> <int-jpa:retrieving-outbound-gateway entity-manager="em" jpa-query="SELECT sp FROM SmsMessage sp Where sp.tatus is null order by sp.requestOn,sp.id"/> <int:splitter ref="process.processSplitter" method="split"/> <int:service-activator ref="process.smsSenderService" method="send" /> <int:poller fixed-delay="5000" receive-timeout="-1"/> </int:chain> <bean id="process.smsSenderService" class="com.yd.core.service.SmsSenderService" /> <bean id="process.processSplitter" class="com.yd.core.service.PaymentProcessSplitter" /> </beans>
Job Worker
import org.springframework.context.ApplicationContext; import org.springframework.integration.MessageChannel; import org.springframework.integration.support.MessageBuilder; public class JobWorker implements Runnable { private static final int DEFAULT_WAIT_TIME = 3000; @Override public void run() { while (true) { try { LoggerUtil.getJobLogger().info("JobWorker, Ready for take job run request."); JobRunnerRequest jobRequest = JobManagerService.getJobManager().takeRequest(); while (jobRequest == null) { LoggerUtil.getJobLogger().warn("JobWorker, jobRequest is null, will try to get the job requet again."); Thread.sleep(DEFAULT_WAIT_TIME); jobRequest = JobManagerService.getJobManager().takeRequest(); } LoggerUtil.getJobLogger().info("JobWorker, Received a job run request."); MessageChannel channel = findChannel(jobRequest.getJobChannelId()); if (channel != null) { channel.send(MessageBuilder.withPayload(jobRequest.getJobMessagePayload()).build()); LoggerUtil.getJobLogger().info("JobWorker, Completed to sned message to job channel"); } } catch (Exception ex) { LoggerUtil.getJobLogger().warn("JobWorker, Completed to sned message to job channel"); } } } private MessageChannel findChannel(String jobChannelId) { ApplicationContext context = ApplicationContextProvider.getContext(); if (context == null) { LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the application context, to startup job %s", jobChannelId)); return null; } Object channel = context.getBean(jobChannelId); if (channel instanceof MessageChannel) { return (MessageChannel) channel; } else { LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the message bean, to startup job %s", jobChannelId)); return null; } } }
JobManagerService
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public final class JobManagerService { private BlockingQueue<JobRunnerRequest> jobRequestQueue = new LinkedBlockingQueue<JobRunnerRequest>(); private static volatile JobManagerService jobManagerInstnce; private static Object objSyncLocker = new Object(); private JobManagerService() { } private void startupWorker() { new Thread(new JobWorker()).start(); } public static JobManagerService getJobManager() { if (jobManagerInstnce == null) { synchronized (objSyncLocker) { if (jobManagerInstnce == null) { jobManagerInstnce = new JobManagerService(); jobManagerInstnce.startupWorker(); } } } return jobManagerInstnce; } public void addRequest(JobRunnerRequest request) { try { jobRequestQueue.put(request); } catch (InterruptedException e) { LoggerUtil.getJobLogger().warn(e.getMessage(), e); } } public JobRunnerRequest takeRequest() { try { return jobRequestQueue.take(); } catch (InterruptedException e) { LoggerUtil.getJobLogger().warn(e.getMessage(), e); return null; } } }
ApplicatonContextProvider
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; public class ApplicationContextProvider implements ApplicationContextAware { private static volatile ApplicationContext ctx; public static ApplicationContext getContext() { return ctx; } private static synchronized void setContext(ApplicationContext applicationContext) { ctx = applicationContext; } @Override public void setApplicationContext(ApplicationContext applicationContext){ setContext(applicationContext); } }