环境 CentOS7 + Python3.5 yum -y install epel-release erlang socat
cd /usr/local/src
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
yum install rabbitmq-server-3.6.-.el7.noarch.rpm 插件列表查询: rabbitmq-plugins list
开启管理功能: rabbitmq-plugins enable rabbitmq_management 增加访问用户,默认用户guest只能本地访问。
rabbitmqctl add_user admin 设置用户角色: rabbitmqctl set_user_tags admin administrator
权限设置:rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read}
conf: 一个正则表达式match哪些配置资源能够被该用户访问。
write: 一个正则表达式match哪些配置资源能够被该用户读。
read: 一个正则表达式match哪些配置资源能够被该用户访问。 权限设置示例: rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" 列出所有用户: rabbitmqctl list_users
删除用户: rabbitmqctl delete_user {username}
修改用户密码: rabbitmqctl change_password {username} {newpassword} 启动: rabbitmq-server -detached
关闭: rabbitmqctl stop
服务器状态: rabbitmqctl status
集群状态: rabbitmqctl cluster_status
添加vhost: rabbitmqctl add_vhost {name}
删除vhost: rabbitmqctl delete_vhost {name}
生产者文件 producer.py
import pika, sys credentials = pika.PlainCredentials("admin", "")
conn_params = pika.ConnectionParameters("192.168.205.128", virtual_host="/", credentials=credentials)
conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()
channel.confirm_delivery() # 将信道设置为confirm模式 msg = sys.argv[1] # 将第一个参数作为消息内容
msg_props = pika.BasicProperties()
msg_props.content_type = "text/plain"
ack = channel.basic_publish(body=msg, exchange="hello-exchange", properties=msg_props, routing_key="hola") # 发布信息
if ack is True:
print ("put message to rabbitmq successed!")
else:
print ("put message to rabbitmq failed") channel.close()
消费者文件 consumer.py
import pika credentials = pika.PlainCredentials("admin", "")
conn_params = pika.ConnectionParameters("192.168.205.128", virtual_host="/", credentials=credentials)
conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel() # 获取信道
channel.exchange_declare("hello-exchange", "direct", passive=False, durable=True, auto_delete=False) # 声明交换器
channel.queue_declare(queue="hello-queue") #声明队列
channel.queue_bind(queue="hello-queue", exchange="hello-exchange", routing_key="hola") # 使用路由键绑定队列和交换器 def msg_consumer(channel, method, header, body): # 处理消息
channel.basic_ack(delivery_tag=method.delivery_tag) # 消息确认
print ("%d : '%s' " %(method.delivery_tag, body) )
if body == b'quit': # 这里的 body 是 bytes 类型
channel.basic_cancel(consumer_tag="hello-consumer") # 停止消费并退出
channel.stop_consuming()
return # 订阅消费者
channel.basic_consume( msg_consumer, queue="hello-queue", consumer_tag="hello-consumer")
channel.start_consuming() # 开始消费