Author: liuchao
email: mirschao@gmail.com
github: https://github.com/mirschao
gitee: https://gitee.com/mirschao
RabbitMQ 是一个开源的消息中间件, 用于在分布式应用程序之间传递消息、数据和事件。它实现了 AMQP(Advanced Message Queuing Protocol, 高级消息队列协议)标准, 为应用程序提供了一种可靠的异步通信机制。
RabbitMQ稳定性消息队列第一章 RabbitMQ部署及配置1.1 二进制源码部署RabbitMQ1.2 使用Python模拟后端程序异步处理队列数据第二章 RabbitMQ集群模式2.1 仲裁队列的部署及配置2.2 测试集群队列的可用性
在部署 RabbitMQ 时, 有一些重要的注意事项需要考虑, 以确保系统的稳定性、安全性和性能。以下是部署 RabbitMQ 时需要注意的关键点:
版本兼容性: 不同版本之间可能会有一些功能差异和改进, 所以确保使用相同或兼容的版本以避免问题
硬件和资源: RabbitMQ 在处理消息时需要一定的计算和内存资源。根据预期的消息负载, 选择配置足够的内存、CPU 和磁盘空间
网络配置: 确保 RabbitMQ 节点之间的网络连接畅通。适当地配置网络防火墙, 以允许 RabbitMQ 使用的端口的通信
节点命名: 确保每个 RabbitMQ 节点都有唯一的节点名称和主机名, 这是集群通信的重要部分
数据备份和恢复: 设置定期的数据备份, 以便在发生故障时能够迅速恢复。了解如何进行手动恢复和数据迁移
集群配置: 如果要构建集群, 确保节点之间的配置(如 Cookie、集群节点列表等)一致, 以确保正常的集群通信
高可用性: 使用镜像队列或者其他机制来确保消息的高可用性和冗余, 避免单点故障
安全性: 设置强密码并限制用户的访问权限。仅允许必要的端口对外开放, 以减少潜在的攻击面
文档和社区: 始终查阅官方文档和社区资源。这将帮助你避免常见错误, 并获得有关最佳实践和故障排除的建议
总之, RabbitMQ 部署需要综合考虑性能、可用性、安全性和扩展性等方面的因素。仔细规划和实施, 将有助于确保 RabbitMQ 在生产环境中稳定运行
x#> 下载好对应包后, 直接进行安装即可(如果缺少依赖可以到 https://pkgs.org寻找包即可, 或者尝试yum安装)
$ yum -y install erlang-25.3.2.5-1.el7.x86_64.rpm
$ tar xf rabbitmq-server-generic-unix-3.12.3.tar.xz -C /usr/local
#> 安装完成后设置
$ vim /etc/profile.d/rabbitmq.sh
export RABBITMQ_HOME='/usr/local/rabbitmq_server-3.12.3'
export PATH=$PATH:$RABBITMQ_HOME/sbin
$ source /etc/profile
#> 安装web管理端
$ rabbitmq-plugins enable rabbitmq_management
#> 创建日志存储目录
$ mkdir -p /usr/local/rabbitmq_server-3.12.3/var/log/rabbitmq
#> 修改配置文件并启动服务
$ egrep -v "(^#|^$)" ${RABBITMQ_HOME}/etc/rabbitmq/rabbitmq.conf
listeners.tcp.local = 127.0.0.1:5672
default_user = admin
default_pass = Qfcloud120!!
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0
log.dir = /usr/local/rabbitmq_server-3.12.3/var/log/rabbitmq
log.file = rabbit.log
log.file.level = info
#> 启动rabbitmq
$ rabbitmq-server [-detached 可以后台启动]
用异步编程模型时, 可以使用 aioamqp
模块来编写异步的 RabbitMQ 生产者和消费者程序。aioamqp
是一个适用于asyncio的RabbitMQ 客户端库, 使得在异步环境下与 RabbitMQ 进行交互更加方便
确保你已经安装了 aioamqp
模块
xxxxxxxxxx
$ pip install aioamqp
异步消费者(Consumer)逻辑
xxxxxxxxxx
import asyncio
import aioamqp
async def callback(channel, body, envelope, properties):
print(f" [x] Received {body}")
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
async def receive_messages():
transport, protocol = await aioamqp.connect()
channel = await protocol.channel()
await channel.queue_declare(queue_name='hello')
await channel.basic_consume(callback, queue_name='hello')
print(' [*] Waiting for messages. To exit press CTRL+C')
while True:
await asyncio.sleep(1) # Keep the event loop alive
loop = asyncio.get_event_loop()
loop.run_until_complete(receive_messages())
异步生产者(Producer)逻辑:
xxxxxxxxxx
import asyncio
import aioamqp
async def send_message():
transport, protocol = await aioamqp.connect()
channel = await protocol.channel()
await channel.queue_declare(queue_name='hello')
message = "Hello, RabbitMQ (async)!"
await channel.basic_publish(
payload=message.encode(),
exchange_name='',
routing_key='hello'
)
print(f" [x] Sent '{message}'")
await protocol.close()
transport.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(send_message())
这两个示例演示了一个简单的生产者和消费者模型。生产者将一条消息发送到名为 "hello" 的队列, 而消费者则监听该队列, 并在收到消息时打印出来
仲裁队列(Quorum Queues)提供队列复制的能力,保障数据的高可用和安全性。使用仲裁队列可以在RabbitMQ节点间进行队列数据的复制,在一个节点宕机时,队列依旧可以正常运行
仲裁队列适用于队列长时间存在,对队列容错和数据安全要求高,对延迟和队列特性要求相对低的场景。在可能出现消息大量堆积的场景,不推荐使用仲裁队列,因为仲裁队列的写入放大会造成成倍的磁盘占用
仲裁队列的消息会优先保存在内存中,使用仲裁队列时,建议定义队列最大长度和最大内存占用,在消息堆积超过阈值时从内存转移到磁盘,以免造成内存高水位
仲裁队列的算法是基于Raft共识算法的一个变种,提供更好的消息吞吐量。仲裁队列包含一个主副本和多个从副本,当生产者向主副本发送一条消息,主副本会将消息同步给从副本,超过半数的副本保存消息后,主副本才会向生产者发送确认。这意味着少部分比较慢的从副本不会影响整个队列的性能。同样地,主副本的选举也需要超过半数的副本同意,这会避免出现网络分区时,队列存在2个主副本。由此可见,仲裁队列相对于可用性更看重一致性
RabbitMQ使用集群部署时,如果其中一个节点故障下线,待它消除故障重新上线后,它保存的数据不会丢失,主副本会直接从从副本中断的地方开始复制消息。复制的过程是非阻塞的,整个队列不会因为新的副本加入而受到影响
xxxxxxxxxx
#> 设定每台机器的主机名确保唯一性
$ hostnamectl set-hostname rabbit-[abc]
#> 设定 /etc/hosts 文件对主机名进行解析
$ vim /etc/hosts
192.168.19.21 rabbit-a
192.168.19.22 rabbit-b
192.168.19.23 rabbit-c
#> 修改rabbitMQ的配置文件
$ egrep -v "(^#|^$)" ${RABBITMQ_HOME}/etc/rabbitmq/rabbitmq.conf
listeners.tcp.local = 127.0.0.1:5672
default_vhost = /
default_user = admin
default_pass = Qfcloud120!!
default_permissions.configure = .*
default_permissions.read = .*
default_permissions.write = .*
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0
log.dir = /usr/local/rabbitmq_server-3.12.3/var/log/rabbitmq
log.file = rabbit.log
log.file.level = info
mqtt.durable_queue_type = quorum
#> 开启web管理端
$ rabbitmq-plugins enable rabbitmq_management
#> 同步各个节点的erlang.cookie内容
$ cat $HOME/.erlang.cookie
CDIRTTUGRJHXATQLKAYI
#> 开启rabbit-a机器中的rabbitMQ服务
$ rabbitmq-server -detached
#> 另外两台机器使用rabbitmqctl组建集群
$ rabbitmqctl stop_app
$ rabbitmqctl join_cluster --ram rabbit@rabbit-a
$ rabbitmqctl start_app
xxxxxxxxxx
#> xiaofei.py
import asyncio
import aioamqp
async def callback(channel, body, envelope, properties):
print(f" [x] Received {body}")
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
async def receive_messages():
transport, protocol = await aioamqp.connect(
host='192.168.19.22', port=5672, login='admin', password='Qfcloud120!!'
)
channel = await protocol.channel()
await channel.queue_declare(queue_name='hello')
await channel.basic_consume(callback, queue_name='hello')
print(' [*] Waiting for messages. To exit press CTRL+C')
while True:
await asyncio.sleep(1) # Keep the event loop alive
loop = asyncio.get_event_loop()
loop.run_until_complete(receive_messages())
#> shenchan.py
import asyncio
import aioamqp
async def send_message():
transport, protocol = await aioamqp.connect(
host='192.168.19.23', port=5672, login='admin', password='Qfcloud120!!'
)
channel = await protocol.channel()
await channel.queue_declare(queue_name='hello')
message = "Hello, RabbitMQ (abcnms)!"
await channel.basic_publish(
payload=message.encode(),
exchange_name='',
routing_key='hello'
)
print(f" [x] Sent '{message}'")
await protocol.close()
transport.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(send_message())