环境搭建:
点击查看代码
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>
点击查看代码
public class Util {
public static ConnectionFactory getConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("your-host");
factory.setUsername("admin");
factory.setPassword("admin");
return factory;
}
}
一 Helllo world
生产者
public class Demo01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = Util.getConnectionFactory();
try (Connection connection = factory.newConnection()) {
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
消费者
@Slf4j
public class Demo01 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = Util.getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
log.info(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
二 work queue
生产者
@Slf4j
public class Demo01 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) {
ConnectionFactory factory = Util.getConnectionFactory();
try (Connection connection = factory.newConnection();
Scanner scanner = new Scanner(System.in);) {
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
while (true) {
String message = scanner.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
log.info(" [x] Sent ‘" + message + "‘");
}
} catch (TimeoutException | IOException e) {
e.printStackTrace();
}
}
}
消费者
@Slf4j
public class Demo01 {
private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = Util.getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
log.info(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
log.info(" [x] Received ‘" + message + "‘");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.info(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = true;
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == ‘.‘) Thread.sleep(1000);
}
}
}
三 publish subscribe
生产者
@Slf4j
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = Util.getConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Scanner scanner = new Scanner(System.in)) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (; ;) {
String message = scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
log.info(" [x] Sent ‘" + message + "‘");
}
}
}
}
消费者
@Slf4j
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = Util.getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
log.info(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
log.info(" [x] Received ‘" + message + "‘");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
四 routing
生产者
@Slf4j
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = Util.getConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Scanner scanner = new Scanner(System.in)) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
while (true) {
String severity = getSeverity(scanner.nextInt());
scanner.nextLine();
log.info(severity);
String message = scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘");
}
}
}
private static String getSeverity(int i) {
switch (i) {
case 4: return "debug";
case 2: return "warning";
case 1: return "error";
default: return "info";
}
}
}
消费者
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = Util.getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, args[0]);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received ‘" +
delivery.getEnvelope().getRoutingKey() + "‘:‘" + message + "‘");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
五 topic
生产者
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = Util.getConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Scanner scanner = new Scanner(System.in)) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
while (true) {
String routingKey = scanner.nextLine();
String message = scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + routingKey + "‘:‘" + message + "‘");
}
}
}
}
消费者
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = Util.getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received ‘" +
delivery.getEnvelope().getRoutingKey() + "‘:‘" + message + "‘");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
rabbitmq的使用常用模式