1 消息队列介绍
1 先进先出”的一种数据机构--》消息队列(mq)
2 MQ解决什么问题
-应用解耦
-流量消峰
-消息分发(发布订阅)
-异步消息(celery:本质是对消息队列的封装)
3 主流消息队列产品
-Kafka(互联网公司,数据量大用的多,吞吐量高,数据安全性低一些)
-rabbitmq(吞吐量低一些,安全性,准确性高)
1 源码安装
-https://zhuanlan.zhihu.com/p/375157411
2 yum 安装(rpm安装)
# 安装配置epel源
yum install epel-release -y
# 安装erlang
yum -y install erlang
# 安装RabbitMQ
yum -y install rabbitmq-server
# 启动
systemctl start rabbitmq-server
# 查看是否启动
ps aux |grep rabbitmq
3 docker安装
-拉一个镜像,启动起来即可
# 安装好Docker,执行下面命令
docker pull rabbitmq:management
docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
# 浏览器访问:
http://10.0.0.103:15672
# 输入用户名:admin 密码:admin ,进入到管理控制台
3 基本使用(生产者消费者模型)
1 使用python操作rabbitmq
2 pip3 install pika
3.1 生产者
import pika
?
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(‘101.133.225.166‘))
?
# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(‘101.133.225.166‘,credentials=credentials))
?
?
channel = connection.channel()
?
# 声明一个队列
channel.queue_declare(queue=‘hello‘)
?
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘llnb‘)
print("Sent ‘llnb‘")
connection.close()
3.2 消费者
import pika
?
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(‘101.133.225.166‘,credentials=credentials))
channel = connection.channel()
?
# 声明一个队列(创建一个队列),如果生产者还没放数据,这个队列根本不存,这句话保证代码不出错
channel.queue_declare(queue=‘hello‘)
?
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
# 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
# ch.basic_ack(delivery_tag=method.delivery_tag)
?
channel.basic_consume(queue=‘hello‘,on_message_callback=callback,auto_ack=False)
?
channel.start_consuming()
4 消息安全之ack
### 生产者
import pika
?
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(‘101.133.225.166‘))
?
# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(‘101.133.225.166‘,credentials=credentials))
?
?
channel = connection.channel()
?
# 声明一个队列
channel.queue_declare(queue=‘hello‘)
?
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘llnb‘)
print("Sent ‘llnb‘")
connection.close()
?
?
### 消费者
import pika
?
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(‘101.133.225.166‘,credentials=credentials))
channel = connection.channel()
?
# 声明一个队列(创建一个队列),如果生产者还没放数据,这个队列根本不存,这句话保证代码不出错
channel.queue_declare(queue=‘hello‘)
?
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
?
?
# 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
# 保证数据安全
ch.basic_ack(delivery_tag=method.delivery_tag) # 回复确认,rabbitmq的server就把该消息删除
?
?
# 只要收到消息,立马回复,rabbitmq的server就把消息删除
channel.basic_consume(queue=‘hello‘,on_message_callback=callback,auto_ack=False)
?
channel.start_consuming()