1.在项目的pom中引入
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
2.配置消息通道
public interface Demo {
/**
* 发消息的通道名称
*/
String DEMO_OUTPUT = "demo_output";
/**
* 消息的订阅通道名称
*/
String DEMO_INPUT = "demo_input";
/**
* 发消息的通道
*
* @return
*/
@Output(DEMO_OUTPUT)
MessageChannel sendDemoMessage();
/**
* 收消息的通道
*
* @return
*/
@Input(DEMO_INPUT)
SubscribableChannel recieveDemoMessage();
}
- 使带注释组件的结合Input和Output根据作为值给注释传递接口的列表到代理
@EnableBinding(value = {Demo.class})
4.链接kafka配置
spring.cloud.stream.bindings.demo_input.destination=demo
spring.cloud.stream.bindings.demo_input.group=demo
spring.cloud.stream.bindings.demo_output.destination=demo
spring.cloud.stream.bindings.demo_output.group=demo
spring.cloud.stream.default-binder=kafka
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
5.发送消息
@Resource(name = Demo.DEMO_OUTPUT)
private MessageChannel sendDemoMessageChannel;
@Test
public void Demo() {
boolean isSendSuccess = sendDemoMessageChannel.
send(MessageBuilder.withPayload("OK").build());
System.out.println(isSendSuccess);
}
6.接收消息
@StreamListener(Demo. DEMO_INPUT)
public void insertQuotationK(Message<String> message) {
if (StringUtils.isEmpty(message.getPayload())) {
System.out.println("receiver data is empty !");
System.out.println(400 + "failed");
}
System.out.println("kafka收到"+message.getPayload());
}
7.结束咯,如果出现异常,请留言。