exchange持久化

broker服务重启之后,exchange任然保留,否则没有broker存在的时候会消失

channel.exchange_declare(exchange='ticket', exchange_type='direct', durable=True)

queue持久化

也就意味着queue的名称需要为固定值,否则没有意义了。服务重启后,该队列任然存在(持久化是将队列同步到硬盘)

不能同时开启exclusive=True,否则durable无法生效

# durable=True进行声明
channel.queue_declare(queue=_queue_name, durable=True)

消息持久化

队列持久化并不意味着里面的消息被持久话了,需要声明消息持久化(delivery_mode = 2)

channel.basic_publish(exchange=topic,
                      routing_key="",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

保证数据被正常消费

消费过程中出现异常退出,那么当前数据就不能被正常消费,所以需要auto_ack进行回调

# 消费者声明(默认就是False):auto_ack=False
channel.basic_consume(queue=_queue_name, on_message_callback=on_message_callback, auto_ack=False)

# 回调函数完成后,最后需要使用ch.basic_ack进行回调,代表消息消费完成
ch.basic_ack(delivery_tag=method.delivery_tag)

潜在风险

服务端异常退出(比如kill -9 ),未来得及持久化的数据会丢失

相关代码

服务端

#!/usr/bin/python
# -*- coding: utf-8 -*-
import pika

# 参数
host = '127.0.0.1'
port = 5672
user = 'test'
pwd = '123456'
topic = 'test'
message = 'this is test message!!!'

# 配置账号密码
credentials = pika.PlainCredentials(user, pwd)
# 配置参数
parameters = pika.ConnectionParameters(
    host=host,
    port=port,
    credentials=credentials,
    heartbeat=30,
    retry_delay=5,
    connection_attempts=5
)
# 建立连接
connection = pika.BlockingConnection(parameters)
# 定义一个虚拟的频道链接
channel = connection.channel()
# 声明连接的交换机(持久化)
channel.exchange_declare(exchange=topic, exchange_type='fanout', durable=True)
# 发布消息(持久化)
channel.basic_publish(exchange=topic,
                      routing_key="",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

消费端

注意事项:发布模式下需先开启消费端,否则数据无法路由到队列

#!/usr/bin/python
# -*- coding: utf-8 -*-
import pika

# 参数
host = '127.0.0.1'
port = 5672
user = 'test'
pwd = '123456'
topic = 'test'
queue_name = 'test_queue'

def callback(ch, method, properties, body):
	print(" [x] %r" % body)


# 配置账号密码
credentials = pika.PlainCredentials(user, pwd)
# 配置参数
parameters = pika.ConnectionParameters(
    host=host,
    port=port,
    credentials=credentials,
    heartbeat=30,
    retry_delay=5,
    connection_attempts=5
)
# 建立连接
connection = pika.BlockingConnection(parameters)
# 定义一个虚拟的频道链接
channel = connection.channel()
# 声明连接的交换机(持久化)
channel.exchange_declare(exchange=topic, exchange_type='fanout', durable=True)
# 声明连接的队列(持久化),不能同时开启exclusive=True
channel.queue_declare(queue=queue_name, durable=True)
# 绑定队列
channel.queue_bind(exchange=topic, queue=queue_name)
# 绑定回调函数,并且手动ack应答(回调函数完成以后再ack应答)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
# 开启进程监听消费者信息
channel.start_consuming()

注意事项

交换机存在,且durable=False时,下面声明会报错
channel.exchange_declare(exchange=topic, exchange_type='fanout', durable=True)

参考

  1. https://blog.csdn.net/weixin_39976575/article/details/111430377

  2. https://blog.csdn.net/weixin_34237941/article/details/112735971