首先xxl-mq是大神xuxueli开发的一个消息中间件框架:
与springboot整合过程:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>com.xuxueli</groupId> <artifactId>xxl-mq-samples</artifactId> <version>1.3.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>xxl-mq-samples-springboot</artifactId> <packaging>jar</packaging> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- starter-web:spring-webmvc + autoconfigure + logback + yaml + tomcat --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- starter-test:junit + spring-test + mockito --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- freemarker-starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId> </dependency> <!-- xxl-mq-client --> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-mq-client</artifactId> <version>${parent.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>${spring-boot.version}</version> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2 propeties
### web server.port=8081 server.context-path=/ ### resources spring.mvc.static-path-pattern=/static/** spring.resources.static-locations=classpath:/static/ ### freemarker spring.freemarker.templateLoaderPath=classpath:/templates/ spring.freemarker.suffix=.ftl spring.freemarker.charset=UTF-8 spring.freemarker.request-context-attribute=request spring.freemarker.settings.number_format=0.########## # xxl-mq, admin conf 这个配置时admin部署的位置 xxl.mq.admin.address=http://localhost:8080/xxl-mq-admin ### xxl-mq, access token xxl.mq.accessToken=
index。html
<script src="${request.contextPath}/static/jquery/jquery.min.js"></script> <body> <input type="button" class="send" _type="0" value="并行消费" /> <br><br> <input type="button" class="send" _type="1" value="串行消费" /> <br><br> <input type="button" class="send" _type="2" value="广播消息" /> <br><br> <input type="button" class="send" _type="3" value="延时消息:5分钟后执行" /> <br><br> <input type="button" class="send" _type="4" value="性能测试:批量发送10000条消息" /> <hr> <div id="console"></div> <script> $(function(){ $(".send").click(function () { var _type = $(this).attr("_type"); $.post( '${request.contextPath}/produce', {'type':_type}, function(data,status){ var temp = "<br>" + new Date().format("yyyy-MM-dd HH:mm:ss") + ": "; temp += ("SUCCESS" == data)?('成功发送一条消息!'):data; $("#console").prepend(temp); }); }); }); // Format Date.prototype.format = function(fmt) { var o = { "M+" : this.getMonth()+1, //月份 "d+" : this.getDate(), //日 "h+" : this.getHours(), //小时 "m+" : this.getMinutes(), //分 "s+" : this.getSeconds(), //秒 "q+" : Math.floor((this.getMonth()+3)/3), //季度 "S" : this.getMilliseconds() //毫秒 }; if(/(y+)/.test(fmt)) fmt=fmt.replace(RegExp.$1, (this.getFullYear()+"").substr(4 - RegExp.$1.length)); for(var k in o) if(new RegExp("("+ k +")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length==1) ? (o[k]) : (("00"+ o[k]).substr((""+ o[k]).length))); return fmt; } </script> </body>
需要在juery下面引入:
日志管理配置:
logback。xml
<?xml version="1.0" encoding="UTF-8"?> <configuration debug="false" scan="true" scanPeriod="1 seconds"> <contextName>logback</contextName> <property name="log.path" value="/data/applogs/xxl-mq/xxl-mq-samples-springboot.log"/> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${log.path}</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern> </rollingPolicy> <encoder> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n </pattern> </encoder> </appender> <root level="info"> <appender-ref ref="console"/> <appender-ref ref="file"/> </root> </configuration>
配置XxlMqConf.java:
package com.xxl.mq.sample.springboot.conf; import com.xxl.mq.client.factory.impl.XxlMqSpringClientFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component public class XxlMqConf { // ---------------------- param ---------------------- @Value("${xxl.mq.admin.address}") private String adminAddress; @Value("${xxl.mq.accessToken}") private String accessToken; @Bean public XxlMqSpringClientFactory getXxlMqConsumer(){ XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory(); xxlMqSpringClientFactory.setAdminAddress(adminAddress); xxlMqSpringClientFactory.setAccessToken(accessToken); return xxlMqSpringClientFactory; } }
controller 根据传入的参数进行设置:
import com.xxl.mq.client.message.XxlMqMessage; import com.xxl.mq.client.producer.XxlMqProducer; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.Calendar; import java.util.Date; /** * index controller * @author xuxueli 2015-12-19 16:13:16 */ @Controller public class IndexController { @RequestMapping("/") public String index(){ return "index"; } @RequestMapping("/produce") @ResponseBody public String produce(int type){ String topic = "topic_1"; String data = "时间戳:" + System.currentTimeMillis(); if (type == 0) { /** * 并行消费 */ XxlMqProducer.produce(new XxlMqMessage(topic, data)); } else if (type == 1) { /** * 串行消费 */ XxlMqProducer.produce(new XxlMqMessage(topic, data, 1L)); } else if (type == 2) { /** * 广播消费 */ XxlMqProducer.broadcast(new XxlMqMessage(topic, data)); } else if (type == 3) { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); Date effectTime = calendar.getTime(); /** * 延时消息 */ XxlMqProducer.produce(new XxlMqMessage(topic, data, effectTime)); } else if (type == 4) { int msgNum = 10000; long start = System.currentTimeMillis(); for (int i = 0; i < msgNum; i++) { XxlMqProducer.produce(new XxlMqMessage("topic_1", "No:"+i)); } long end = System.currentTimeMillis(); return "Cost = " + (end-start); } else { return "Type Error."; } return "SUCCESS"; } @ExceptionHandler({Exception.class}) public String exception(Exception e) { e.printStackTrace(); return e.getMessage(); } }
对应的消费者:
import com.xxl.mq.client.consumer.IMqConsumer; import com.xxl.mq.client.consumer.MqResult; import com.xxl.mq.client.consumer.annotation.MqConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; /** * Created by xuxueli on 16/8/28. */ @MqConsumer(topic = "topic_2") @Service public class Demo2MqComsumer implements IMqConsumer { private Logger logger = LoggerFactory.getLogger(Demo2MqComsumer.class); @Override public MqResult consume(String data) throws Exception { logger.info("[Demo2MqComsumer] 消费一条消息:{}", data); return MqResult.SUCCESS; } }
import com.xxl.mq.client.consumer.IMqConsumer; import com.xxl.mq.client.consumer.MqResult; import com.xxl.mq.client.consumer.annotation.MqConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; /** * Created by xuxueli on 16/8/28. */ @MqConsumer(topic = "topic_1") @Service public class DemoAMqComsumer implements IMqConsumer { private Logger logger = LoggerFactory.getLogger(DemoAMqComsumer.class); @Override public MqResult consume(String data) throws Exception { logger.info("[DemoAMqComsumer] 消费一条消息:{}", data); return MqResult.SUCCESS; } }
import com.xxl.mq.client.consumer.IMqConsumer; import com.xxl.mq.client.consumer.MqResult; import com.xxl.mq.client.consumer.annotation.MqConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; /** * Created by xuxueli on 16/8/28. */ @MqConsumer(topic = "topic_1") @Service public class DemoBMqComsumer implements IMqConsumer { private Logger logger = LoggerFactory.getLogger(DemoBMqComsumer.class); @Override public MqResult consume(String data) throws Exception { logger.info("[DemoBMqComsumer] 消费一条消息:{}", data); return MqResult.SUCCESS; } }
import com.xxl.mq.client.consumer.IMqConsumer; import com.xxl.mq.client.consumer.MqResult; import com.xxl.mq.client.consumer.annotation.MqConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; /** * Created by xuxueli on 16/8/28. */ @MqConsumer(topic = "topic_1", group = MqConsumer.EMPTY_GROUP) @Service public class DemoCMqComsumer implements IMqConsumer { private Logger logger = LoggerFactory.getLogger(DemoCMqComsumer.class); @Override public MqResult consume(String data) throws Exception { logger.info("[DemoCMqComsumer] 消费一条消息:{}", data); return MqResult.SUCCESS; } }