Author: liuchao
email: mirschao@gmail.com
github: https://github.com/mirschao
gitee: https://gitee.com/mirschao
RabbitMQ 是一个开源的消息中间件, 用于在分布式应用程序之间传递消息、数据和事件。它实现了 AMQP(Advanced Message Queuing Protocol, 高级消息队列协议)标准, 为应用程序提供了一种可靠的异步通信机制。
RabbitMQ稳定性消息队列第一章 RabbitMQ部署及配置1.1 二进制源码部署RabbitMQ1.2 使用Python模拟后端程序异步处理队列数据第二章 RabbitMQ集群模式2.1 仲裁队列的部署及配置2.2 测试集群队列的可用性
在部署 RabbitMQ 时, 有一些重要的注意事项需要考虑, 以确保系统的稳定性、安全性和性能。以下是部署 RabbitMQ 时需要注意的关键点:
版本兼容性: 不同版本之间可能会有一些功能差异和改进, 所以确保使用相同或兼容的版本以避免问题
硬件和资源: RabbitMQ 在处理消息时需要一定的计算和内存资源。根据预期的消息负载, 选择配置足够的内存、CPU 和磁盘空间
网络配置: 确保 RabbitMQ 节点之间的网络连接畅通。适当地配置网络防火墙, 以允许 RabbitMQ 使用的端口的通信
节点命名: 确保每个 RabbitMQ 节点都有唯一的节点名称和主机名, 这是集群通信的重要部分
数据备份和恢复: 设置定期的数据备份, 以便在发生故障时能够迅速恢复。了解如何进行手动恢复和数据迁移
集群配置: 如果要构建集群, 确保节点之间的配置(如 Cookie、集群节点列表等)一致, 以确保正常的集群通信
高可用性: 使用镜像队列或者其他机制来确保消息的高可用性和冗余, 避免单点故障
安全性: 设置强密码并限制用户的访问权限。仅允许必要的端口对外开放, 以减少潜在的攻击面
文档和社区: 始终查阅官方文档和社区资源。这将帮助你避免常见错误, 并获得有关最佳实践和故障排除的建议
总之, RabbitMQ 部署需要综合考虑性能、可用性、安全性和扩展性等方面的因素。仔细规划和实施, 将有助于确保 RabbitMQ 在生产环境中稳定运行
x#> 下载好对应包后, 直接进行安装即可(如果缺少依赖可以到 https://pkgs.org寻找包即可, 或者尝试yum安装)$ yum -y install erlang-25.3.2.5-1.el7.x86_64.rpm$ tar xf rabbitmq-server-generic-unix-3.12.3.tar.xz -C /usr/local
#> 安装完成后设置$ vim /etc/profile.d/rabbitmq.shexport RABBITMQ_HOME='/usr/local/rabbitmq_server-3.12.3'export PATH=$PATH:$RABBITMQ_HOME/sbin$ source /etc/profile
#> 安装web管理端$ rabbitmq-plugins enable rabbitmq_management
#> 创建日志存储目录$ mkdir -p /usr/local/rabbitmq_server-3.12.3/var/log/rabbitmq
#> 修改配置文件并启动服务$ egrep -v "(^#|^$)" ${RABBITMQ_HOME}/etc/rabbitmq/rabbitmq.conflisteners.tcp.local = 127.0.0.1:5672default_user = admindefault_pass = Qfcloud120!!management.tcp.port = 15672management.tcp.ip = 0.0.0.0log.dir = /usr/local/rabbitmq_server-3.12.3/var/log/rabbitmqlog.file = rabbit.loglog.file.level = info
#> 启动rabbitmq$ rabbitmq-server [-detached 可以后台启动]
用异步编程模型时, 可以使用 aioamqp 模块来编写异步的 RabbitMQ 生产者和消费者程序。aioamqp 是一个适用于asyncio的RabbitMQ 客户端库, 使得在异步环境下与 RabbitMQ 进行交互更加方便
确保你已经安装了 aioamqp 模块
xxxxxxxxxx$ pip install aioamqp异步消费者(Consumer)逻辑
xxxxxxxxxximport asyncioimport aioamqp
async def callback(channel, body, envelope, properties): print(f" [x] Received {body}") await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
async def receive_messages(): transport, protocol = await aioamqp.connect() channel = await protocol.channel()
await channel.queue_declare(queue_name='hello') await channel.basic_consume(callback, queue_name='hello')
print(' [*] Waiting for messages. To exit press CTRL+C') while True: await asyncio.sleep(1) # Keep the event loop alive
loop = asyncio.get_event_loop()loop.run_until_complete(receive_messages())
异步生产者(Producer)逻辑:
xxxxxxxxxximport asyncioimport aioamqp
async def send_message(): transport, protocol = await aioamqp.connect() channel = await protocol.channel()
await channel.queue_declare(queue_name='hello') message = "Hello, RabbitMQ (async)!" await channel.basic_publish( payload=message.encode(), exchange_name='', routing_key='hello' ) print(f" [x] Sent '{message}'")
await protocol.close() transport.close()
loop = asyncio.get_event_loop()loop.run_until_complete(send_message())
这两个示例演示了一个简单的生产者和消费者模型。生产者将一条消息发送到名为 "hello" 的队列, 而消费者则监听该队列, 并在收到消息时打印出来
仲裁队列(Quorum Queues)提供队列复制的能力,保障数据的高可用和安全性。使用仲裁队列可以在RabbitMQ节点间进行队列数据的复制,在一个节点宕机时,队列依旧可以正常运行
仲裁队列适用于队列长时间存在,对队列容错和数据安全要求高,对延迟和队列特性要求相对低的场景。在可能出现消息大量堆积的场景,不推荐使用仲裁队列,因为仲裁队列的写入放大会造成成倍的磁盘占用
仲裁队列的消息会优先保存在内存中,使用仲裁队列时,建议定义队列最大长度和最大内存占用,在消息堆积超过阈值时从内存转移到磁盘,以免造成内存高水位
仲裁队列的算法是基于Raft共识算法的一个变种,提供更好的消息吞吐量。仲裁队列包含一个主副本和多个从副本,当生产者向主副本发送一条消息,主副本会将消息同步给从副本,超过半数的副本保存消息后,主副本才会向生产者发送确认。这意味着少部分比较慢的从副本不会影响整个队列的性能。同样地,主副本的选举也需要超过半数的副本同意,这会避免出现网络分区时,队列存在2个主副本。由此可见,仲裁队列相对于可用性更看重一致性
RabbitMQ使用集群部署时,如果其中一个节点故障下线,待它消除故障重新上线后,它保存的数据不会丢失,主副本会直接从从副本中断的地方开始复制消息。复制的过程是非阻塞的,整个队列不会因为新的副本加入而受到影响
xxxxxxxxxx#> 设定每台机器的主机名确保唯一性$ hostnamectl set-hostname rabbit-[abc]
#> 设定 /etc/hosts 文件对主机名进行解析$ vim /etc/hosts192.168.19.21 rabbit-a192.168.19.22 rabbit-b192.168.19.23 rabbit-c
#> 修改rabbitMQ的配置文件$ egrep -v "(^#|^$)" ${RABBITMQ_HOME}/etc/rabbitmq/rabbitmq.conflisteners.tcp.local = 127.0.0.1:5672default_vhost = /default_user = admindefault_pass = Qfcloud120!!default_permissions.configure = .*default_permissions.read = .*default_permissions.write = .*
management.tcp.port = 15672management.tcp.ip = 0.0.0.0
log.dir = /usr/local/rabbitmq_server-3.12.3/var/log/rabbitmqlog.file = rabbit.loglog.file.level = info
mqtt.durable_queue_type = quorum
#> 开启web管理端$ rabbitmq-plugins enable rabbitmq_management
#> 同步各个节点的erlang.cookie内容$ cat $HOME/.erlang.cookieCDIRTTUGRJHXATQLKAYI
#> 开启rabbit-a机器中的rabbitMQ服务$ rabbitmq-server -detached
#> 另外两台机器使用rabbitmqctl组建集群$ rabbitmqctl stop_app$ rabbitmqctl join_cluster --ram rabbit@rabbit-a$ rabbitmqctl start_app
xxxxxxxxxx#> xiaofei.pyimport asyncioimport aioamqp
async def callback(channel, body, envelope, properties): print(f" [x] Received {body}") await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
async def receive_messages(): transport, protocol = await aioamqp.connect( host='192.168.19.22', port=5672, login='admin', password='Qfcloud120!!' ) channel = await protocol.channel()
await channel.queue_declare(queue_name='hello') await channel.basic_consume(callback, queue_name='hello')
print(' [*] Waiting for messages. To exit press CTRL+C') while True: await asyncio.sleep(1) # Keep the event loop alive
loop = asyncio.get_event_loop()loop.run_until_complete(receive_messages())
#> shenchan.pyimport asyncioimport aioamqp
async def send_message(): transport, protocol = await aioamqp.connect( host='192.168.19.23', port=5672, login='admin', password='Qfcloud120!!' ) channel = await protocol.channel()
await channel.queue_declare(queue_name='hello') message = "Hello, RabbitMQ (abcnms)!" await channel.basic_publish( payload=message.encode(), exchange_name='', routing_key='hello' ) print(f" [x] Sent '{message}'")
await protocol.close() transport.close()
loop = asyncio.get_event_loop()loop.run_until_complete(send_message())