闹着玩的来源:前台发送消息,后台接受处理发给kafka,kafka消费者接到消息传给前台显示。联想到websocket。
最终效果如图:
页面解释:
不填写内容的话,表单值默认为Topic、Greeting、Name
点击订阅,按钮变黑
Send Topic | 广播 | 前台显示前缀:T-You Send |
Subscribe Topic | 订阅广播 | 前台显示前缀:A-You Receive、B-You Receive |
Send Queue | 广播 | 前台显示前缀:Q-You Send |
Subscribe Queue | 订阅广播 | 前台显示前缀:C-You Receive、D-You Receive |
Subscribe Point | 订阅点对点 | 前台显示前缀:/user/110/queue/pushInfo,Receive |
Send Kafka | 点对点 | 前台显示前缀:Kafka Send |
Receive Kafka | 订阅点对点 | 前台显示前缀:Kafka Receive |
重要提示:欲接受消息,先点击订阅
关键代码:
配置websocket
package com.example.demo.conf; import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; /**
* @program: boot-kafka
* @description:
* @author: 001977
* @create: 2018-07-11 11:30
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfigurationWithSTOMP implements WebSocketMessageBrokerConfigurer { @Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/clientConnectThis").setAllowedOrigins("*").withSockJS();
} @Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// P2P should conf a /user ; broadcast should conf a /topic
config.enableSimpleBroker("/topic", "/queue", "/user");
config.setApplicationDestinationPrefixes("/app"); // Client to Server
config.setUserDestinationPrefix("/user"); // Server to Client
}
}
controller
package com.example.demo.controller; import com.example.demo.entity.Welcome;
import com.example.demo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView; /**
* @program: boot-kafka
* @description:
* @author: 001977
* @create: 2018-07-11 11:00
*/
@RestController
public class SimpleController { @Autowired
private KafkaService kafkaService; @Autowired
private SimpMessagingTemplate simpMessagingTemplate; @RequestMapping("/")
public ModelAndView stomp(){
return new ModelAndView("stomp_home");
} @SubscribeMapping("/firstConnection")
public Welcome thanks(){
return new Welcome("...", "Thank You!");
} @MessageMapping("/sendMessageTopic")
@SendTo("/topic/webSocketTopic")
public Welcome sendToTopic(@RequestBody Welcome welcome){
System.out.println("Send-Topic-Msg:" + welcome);
return welcome;
} @MessageMapping("/sendMessageQueue")
@SendToUser("/queue/webSocketQueue")
public Welcome sendToQueue(@RequestBody Welcome welcome){
System.out.println("Send-Queue-Msg:" + welcome);
return welcome;
} /**
* P2P,后台模拟推送给前台,需打开@Scheduled注释
*/
//@Scheduled(fixedRate = 1000L)
public void send(){
Welcome welcome = new Welcome("110","Hello!");
simpMessagingTemplate.convertAndSendToUser("110", "/queue/pushInfo", welcome);
System.err.println(welcome);
} @MessageMapping("/sendKafka")
public Welcome sendToKafka(@RequestBody Welcome welcome){
boolean b = kafkaService.send(welcome);
if (b)
System.out.println("Send-Kafka-Msg:" + welcome);
return welcome;
} }
前端JS
var socket = new SockJS('/clientConnectThis');
var stompClient = Stomp.over(socket);
stompClient.connect({},
function connectCallback(frame) { // success
connectResult("Connect Success");
stompClient.subscribe('/app/firstConnection', function (response) {
var returnData = JSON.parse(response.body);
receiveText("/app/firstConnection,Test Receive:" + returnData.greeting);
});
},
function errorCallBack(error) { // failed
connectResult("Connect Break");
}
); //发送消息
function sendTopic() {
var topic = $('#topic').val();
var message = $('#message').val();
var name = $('#name').val();
if(topic == "")
topic = "Topic";
if(message == "")
message = "Greeting";
if(name == "")
name = "Name";
var messageJson = JSON.stringify({"topic":topic,"greeting": message,"name":name});
stompClient.send("/app/sendMessageTopic", {}, messageJson);
sendText("T-You Send:" + messageJson);
}
function sendQueue() {
var topic = $('#topic').val();
var message = $('#message').val();
var name = $('#name').val();
if(topic == "")
topic = "Topic";
if(message == "")
message = "Greeting";
if(name == "")
name = "Name";
var messageJson = JSON.stringify({"topic":topic,"greeting": message,"name":name});
stompClient.send("/app/sendMessageQueue", {}, messageJson);
sendText("Q-You Send:" + messageJson);
} //订阅消息
function subscribeTopic(t) {
$(t).css({
"backgroundColor": "#000"
});
stompClient.subscribe('/topic/webSocketTopic', function (response) {
var returnData = JSON.parse(response.body);
receiveText("A-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
});
stompClient.subscribe('/topic/webSocketTopic', function (response) {
var returnData = JSON.parse(response.body);
receiveText("B-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
});
} //订阅消息
function subscribeQueue(t) {
$(t).css({
"backgroundColor": "#000"
});
stompClient.subscribe('/user/queue/webSocketQueue', function (response) {
var returnData = JSON.parse(response.body);
receiveText("C-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
});
stompClient.subscribe('/user/queue/webSocketQueue', function (response) {
var returnData = JSON.parse(response.body);
receiveText("D-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
});
} function subscribePoint(t) {
$(t).css({
"backgroundColor": "#000"
});
// /user/{userId}/**
stompClient.subscribe('/user/110/queue/pushInfo', function (response) {
var returnData = JSON.parse(response.body);
receiveText("/user/110/queue/pushInfo,Receive:" + returnData.greeting);
});
} function sendKafka() {
var topic = $('#topic').val();
var message = $('#message').val();
var name = $('#name').val();
if(topic == "")
topic = "Topic";
if(message == "")
message = "Greeting";
if(name == "")
name = "Name";
var messageJson = JSON.stringify({"topic":topic,"greeting": message,"name":name});
stompClient.send("/app/sendKafka", {}, messageJson);
sendText("Kafka Send:" + messageJson);
} function kafkaReceive(t) {
$(t).css({
"backgroundColor": "#000"
});
stompClient.subscribe('/user/kafka-user-id/queue/kafkaMsg', function (response) {
var returnData = JSON.parse(response.body);
receiveText("Kafka Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
});
} function sendText(v) {
$('.container .right').append($('<div class="common-message send-text">'+ v +'</div>'));
} function receiveText(v) {
$('.container .right').append($('<div class="common-message receive-text">'+ v +'</div>'));
} function connectResult(v) {
$('.container').append($('<div class="connect-text">'+ v +'</div>'))
}
其余的见GitHub