测试环境:Centos6.x

安装

# 安装erlang(注意查看支持的系统版本)
# https://github.com/rabbitmq/erlang-rpm/releases
# 注意不要选错版本了,el6是centos6,el7是centos7
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.6/erlang-21.2.6-1.el6.x86_64.rpm
rpm -ivh erlang-21.2.6-1.el6.x86_64.rpm

# 安装rabbitmq依赖包
yum -y install epel-release.noarch
yum -y install socat

# 安装rabbitmq
# https://github.com/rabbitmq/rabbitmq-server/releases
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.13/rabbitmq-server-3.7.13-1.el6.noarch.rpm    
rpm -ivh rabbitmq-server-3.7.13-1.el6.noarch.rpm

开启管理后台

# 开启管理模块
# 查看支持的模块列表及安装状态
rabbitmq-plugins list
# 开启管理模块
rabbitmq-plugins enable rabbitmq_management
# 启动
/etc/init.d/rabbitmq-server start
# 访问管理端口
http://10.0.26.26:15672/

用户管理

# rabbitmqctl和web一样,拥有所有管理功能
# 默认用户(只能本地访问)
guest guest

# 添加用户(admin权限)
rabbitmqctl add_user zaza 123456
# 授权
rabbitmqctl set_user_tags zaza administrator
rabbitmqctl set_permissions -p / zaza ".*" ".*" ".*"
# 修改密码
# rabbitmqctl change_password <USERNAME> <NEWPASSWORD>

队列模式

RabbitMQ的五种队列模式与实例

# 发布者Routing模式(key完整匹配模式)
流程说明:
1、发布者声明交换机及类型
channel.exchange_declare(exchange='project_ticket', exchange_type='direct')
2、发布者声明持久化的队列(queue为队列名称,durable为持久化)
channel.queue_declare(queue=severity, durable=True)
3、发布者绑定队列
channel.queue_bind(exchange='project_ticket',
                   queue=queue,
                   routing_key=queue)
4、指定发布规则及routing_key
channel.basic_publish(
    exchange='project_ticket',                             
    routing_key=severity, # 这里指定routing_key,方便路由到指定的queue_name上
    body=message,                                     
    properties=pika.BasicProperties(delivery_mode=2,)) # 这里代表持久化
# 查看binds命令如下:
rabbitmqctl list_bindings -p /project

# 详细流程看python代码
# 执行流程(第一次执行的数据会丢失,没有客户端绑定)
./projects_direct.py # queue和routing_key名称一致

# 绑定
./projectc_direct.py 10.0.26.26 10.0.26.26
./projectc_direct.py 10.0.26.208 10.0.26.208
./projectc_direct.py 10.0.26.209 10.0.26.209
# 在队列10.0.26.210上绑定10.0.26.209和10.0.26.210关键字
./projectc_direct.py 10.0.26.210 10.0.26.209 10.0.26.210

#  # 发布者不指定绑定的话。第一次数据会出现下面的问题
#  # 上面的流程会创建持久化的交换机以及队列名称,但是因为客户端没有bind队列,
#  # 因此exchange不能找到routing_key对应的bind,会把所有的数据丢掉
#  x、消费者指定queue以及相关绑定
#  # 客户端启动一次后会进行相关绑定(也可以在服务端绑定),此时发布者发布的消息就会路由到相关的queue了
#  # 绑定如果是唯一的,建议在发布者上面进行绑定(这样的话在发布消息前进行绑定数据就不会丢失了)
#  # 消费者queue的可以绑定多个binding(多绑定在客户端进行操作,会比较灵活),来接收指定的routing_key消息
#  for binding in bindings:
#      channel.queue_bind(exchange='project_ticket',
#                         queue=queue_name,
#                         routing_key=binding)

# 绑定实际效果
(ops) [root@zaza-test mq]# rabbitmqctl list_bindings -p /project
Listing bindings for vhost /project...
source_name	source_kind	destination_name	destination_kind	routing_key	arguments
	exchange	amq.gen-DVvUAYD5UHfu0IqPsRHCxg	queue	amq.gen-DVvUAYD5UHfu0IqPsRHCxg	[]
	exchange	amq.gen-oRnZwTwVjNvMWGp_ij5aOQ	queue	amq.gen-oRnZwTwVjNvMWGp_ij5aOQ	[]
	exchange	amq.gen-BeMy_WBlOUs1Zn6dOqv4Qw	queue	amq.gen-BeMy_WBlOUs1Zn6dOqv4Qw	[]
	exchange	10.0.26.210	queue	10.0.26.210	[]
	exchange	amq.gen-tofXMdYTUBN52OorqW2Mow	queue	amq.gen-tofXMdYTUBN52OorqW2Mow	[]
	exchange	10.0.26.26	queue	10.0.26.26	[]
	exchange	10.0.26.209	queue	10.0.26.209	[]
	exchange	10.0.26.208	queue	10.0.26.208	[]
project_ticket	exchange	10.0.26.26	queue	10.0.26.26	[]
project_ticket	exchange	10.0.26.208	queue	10.0.26.208	[]
project_ticket	exchange	10.0.26.209	queue	10.0.26.209	[]
project_ticket	exchange	10.0.26.210	queue	10.0.26.209	[]
project_ticket	exchange	10.0.26.210	queue	10.0.26.210	[]

项目实战

# 以project项目为例
rabbitmqctl add_user project 123456
# 项目管理权限
rabbitmqctl set_user_tags project management
# 创建vhosts(类似于单独的数据库,防止项目间受影响)
rabbitmqctl add_vhost /project
# 设置权限
rabbitmqctl set_permissions -p /project project ".*" ".*" ".*"

| 交换机类型 | 路由 | 绑定 | 队列 |
| ------ | ------ | ------ |
| Exchanges | RouteKey | Binding | Queue |
| Fanout | 忽略 |  | |
| Direct | 匹配 |  | |
| Topic | 模糊匹配 |  |  |
| Headers |  |  |  |
Fanout Exchange --> 直接到绑定的Queue上,没有RouteKey 

添加自定义属性

比如:consumer(消费者)需要根据不同的publish(发布者)进行相关操作
publish_a: 只能操作a方法集
publish_b: 只能操作b方法集
# 1、生产者定义身份
channel.basic_publish(
    properties=pika.BasicProperties(
        headers={'remote_add': '10.0.0.1'}),
)
# 2、消费者获取信息
def callback(ch, method, properties, body):
    print(properties.headers.get("remote_add"))
    remote_add = properties.headers.get("remote_add")
	if "remote_add" == "ip_a":
		do_something_a
		return
	if "remote_add" == "ip_b":
		do_something_b
		return
	do_something_default

使用说明

# 一个connection(TCP连接),多个channel(channel1、channel2),channel本身实现自己的绑定
import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

import pika
# 密码参数
credentials = pika.PlainCredentials("zaza", "111111")
# 建立连接的参数
parameters = pika.ConnectionParameters(
    host="10.0.26.26",
    port=5672,
    virtual_host='/',
    credentials=credentials,
    heartbeat=0,
    retry_delay=5,
    connection_attempts=5
)
# 建立TCP连接(rabbitmqctl list_connections),每个客户端建立一个连接就行了
connection = pika.BlockingConnection(parameters)
# AMQP 0-9-1连接可被认为是"共享单个TCP连接的轻量级连接"的通道复用
# rabbitmqctl list_channels
channel1 = connection.channel()     # 建立channel1
channel2 = connection.channel()     # 建立channel2
# 每个channel代表一个会话任务(exchange参数用于区分不同项目的数据流)
channel1.exchange_declare(exchange="exchange_1", exchange_type='fanout')
channel1.queue_declare(queue="queue_1", exclusive=True)
channel1.queue_bind(exchange="exchange_1", queue="queue_1")
# 每个channel代表一个会话任务(exchange参数用于区分不同项目的数据流)
channel2.exchange_declare(exchange="exchange_2", exchange_type='fanout')
channel2.queue_declare(queue="queue_2", exclusive=True)
channel2.queue_bind(exchange="exchange_2", queue="queue_2")
# 不同channel可以有自己的任务,通过多进程实现并发
channel1.basic_consume(queue="queue_1", on_message_callback=callback, auto_ack=True)
channel1.start_consuming()
channel2.basic_consume(queue="queue_2", on_message_callback=callback, auto_ack=True)
channel2.start_consuming()

# 系统a用channel1、系统b用channel2? 这样可以独立并发执行?

术语说明

Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程;
Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue,权限控制的最小丽都是Virtual Host;
Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。
ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;
Message Queue:消息队列,用于存储还未被消费者消费的消息;
Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等;body是真正需要发送的数据内容;
BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来。

debug调试

from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.DEBUG)

producer = KafkaProducer(bootstrap_servers='10.0.0.166:9092')

producer.send('topic', b'hola')

参考

  1. http://www.rabbitmq.com/getstarted.html

  2. https://www.jianshu.com/p/80eefec808e5

  3. https://www.cnblogs.com/zhangweizhong/p/5687457.html