RabbitMQ-持久化
exchange持久化
broker服务重启之后,exchange任然保留,否则没有broker存在的时候会消失
channel.exchange_declare(exchange='ticket', exchange_type='direct', durable=True)
queue持久化
也就意味着queue的名称需要为固定值,否则没有意义了。服务重启后,该队列任然存在(持久化是将队列同步到硬盘)
不能同时开启exclusive=True,否则durable无法生效
# durable=True进行声明
channel.queue_declare(queue=_queue_name, durable=True)
消息持久化
队列持久化并不意味着里面的消息被持久话了,需要声明消息持久化(delivery_mode = 2)
channel.basic_publish(exchange=topic,
routing_key="",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
保证数据被正常消费
消费过程中出现异常退出,那么当前数据就不能被正常消费,所以需要auto_ack进行回调
# 消费者声明(默认就是False):auto_ack=False
channel.basic_consume(queue=_queue_name, on_message_callback=on_message_callback, auto_ack=False)
# 回调函数完成后,最后需要使用ch.basic_ack进行回调,代表消息消费完成
ch.basic_ack(delivery_tag=method.delivery_tag)
潜在风险
服务端异常退出(比如kill -9 ),未来得及持久化的数据会丢失
相关代码
服务端
#!/usr/bin/python
# -*- coding: utf-8 -*-
import pika
# 参数
host = '127.0.0.1'
port = 5672
user = 'test'
pwd = '123456'
topic = 'test'
message = 'this is test message!!!'
# 配置账号密码
credentials = pika.PlainCredentials(user, pwd)
# 配置参数
parameters = pika.ConnectionParameters(
host=host,
port=port,
credentials=credentials,
heartbeat=30,
retry_delay=5,
connection_attempts=5
)
# 建立连接
connection = pika.BlockingConnection(parameters)
# 定义一个虚拟的频道链接
channel = connection.channel()
# 声明连接的交换机(持久化)
channel.exchange_declare(exchange=topic, exchange_type='fanout', durable=True)
# 发布消息(持久化)
channel.basic_publish(exchange=topic,
routing_key="",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
消费端
注意事项:发布模式下需先开启消费端,否则数据无法路由到队列
#!/usr/bin/python
# -*- coding: utf-8 -*-
import pika
# 参数
host = '127.0.0.1'
port = 5672
user = 'test'
pwd = '123456'
topic = 'test'
queue_name = 'test_queue'
def callback(ch, method, properties, body):
print(" [x] %r" % body)
# 配置账号密码
credentials = pika.PlainCredentials(user, pwd)
# 配置参数
parameters = pika.ConnectionParameters(
host=host,
port=port,
credentials=credentials,
heartbeat=30,
retry_delay=5,
connection_attempts=5
)
# 建立连接
connection = pika.BlockingConnection(parameters)
# 定义一个虚拟的频道链接
channel = connection.channel()
# 声明连接的交换机(持久化)
channel.exchange_declare(exchange=topic, exchange_type='fanout', durable=True)
# 声明连接的队列(持久化),不能同时开启exclusive=True
channel.queue_declare(queue=queue_name, durable=True)
# 绑定队列
channel.queue_bind(exchange=topic, queue=queue_name)
# 绑定回调函数,并且手动ack应答(回调函数完成以后再ack应答)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
# 开启进程监听消费者信息
channel.start_consuming()
注意事项
交换机存在,且durable=False时,下面声明会报错
channel.exchange_declare(exchange=topic, exchange_type='fanout', durable=True)
参考
- 原文作者:zaza
- 原文链接:https://zazayaya.github.io/2021/03/30/rabbitmq-durable.html
- 说明:转载本站文章请标明出处,部分资源来源于网络,如有侵权请及时与我联系!