RabbitMQ
一、rabbitmq 概念理解
1、概念
MQ(Message Queue,消息队列)是一种用于在分布式系统中实现消息传递和异步通信的技术。它充当了发送方和接收方之间的中间人,用于在应用程序或服务之间传递消息。MQ 允许系统中的不同组件彼此独立运行,而无需直接通信或相互依赖,从而提高系统的可扩展性、可靠性和灵活性。
概念剖析:
- **消息:**消息是应用程序之间传递的数据单元,可以是文本、JSON、XML 或其他格式。
- **队列:**队列是一个用于存储消息的容器,消息按照先进先出(FIFO)的顺序存储。
- **生产者:**生产者是发送消息到消息队列的应用程序或服务。它生成并推送消息到队列中。
- **消费者:**消费者是从消息队列中接收和处理消息的应用程序或服务。它会从队列中取出消息并执行相应的操作。
- **中间件:**消息队列通常由消息中间件实现,如 RabbitMQ、Kafka、ActiveMQ、Redis等。中间件负责管理消息的存储、传递和路由。
2、MQ工作原理
MQ的工作方式通常包含以下步骤:
- **消息生产:**生产者将消息发送到消息队列。
- **消息存储:**消息队列中间件接收并存储消息,直到有消费者来取走它们。
- **消息消费:**消费者从队列中取出消息并处理。
- **消息确认:**消费者处理完消息后,通常会向队列发送确认,通知中间件消息已成功处理,可以将其删除或标记为已处理。
3、应用场景
- 异步处理
- 应用解耦
- 流量削峰
4、介绍
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
特点:
- 基于
AMQP协议来实现。主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。 - 可靠性:采用一系列机制来确保消息的可靠性,如持久化、传输确认和发布确认等方法。能够保证百分之百的不丢失。
- 高可用性:队列可以在集群的多台机器上进行镜像设置,即使其中的某些节点出现故障,队列仍然可用。
- **可扩展性:**RabbitMQ支持构建集群,多个节点可以组成一个集群。
- **灵活的路由:**消息在进入队列之前会通过交换器进行路由,使得消息能够按照特定的规则进行分发。
常用概念理解:
-
Producer: 生产者是消息的发送者。它将消息发送到 RabbitMQ 中,消息被发送到一个 Exchange(交换器)中。生产者并不知道消息将被发送到哪个队列(Queue),只负责将消息发送到指定的交换器。
-
Exchange: 交换器是用来接收生产者发送的消息并根据一定的规则(Binding)将消息路由到一个或多个队列中。交换器有不同的类型,每种类型的路由行为不同:
-
- Direct Exchange:根据消息的 Routing Key(路由键)精确地将消息路由到指定的队列中。
- Topic Exchange:根据 Routing Key 的模式匹配,将消息路由到符合条件的队列中,支持模糊匹配。
- Fanout Exchange:将消息广播到所有绑定到该交换器的队列中,忽略 Routing Key。
- Headers Exchange:根据消息的 Headers 属性进行路由。
-
Queue: 队列是 RabbitMQ 中存储消息的容器。消息进入队列后,会被消费者(Consumer)从中取出并处理。一个队列可以绑定到多个交换器上,且一个队列可以有多个消费者,但每条消息只能被一个消费者消费。
-
Consumer: 消费者是从队列中接收消息并处理的应用程序或服务。消费者可以主动拉取消息,也可以通过订阅的方式被动接收消息。
-
Message: 消息是生产者发送给 RabbitMQ 的数据载体,通常包括两个部分:
-
- Payload:消息的实际数据。
- Attributes:消息的元数据,比如 Routing Key、Headers、优先级等。
-
Ack: 消费者处理完消息后,可以发送确认(ACK)给 RabbitMQ,以表示消息已成功处理。未确认的消息会被重新投递。
-
Routing Key: 路由键是生产者发送消息时指定的,用于匹配交换器和队列的绑定规则。不同类型的交换器对路由键的处理方式不同。
二、实战
1、rabbitmq安装
services:
rabbitmq:
image: registry.cn-hangzhou.aliyuncs.com/serialt/rabbitmq:3.13.7
container_name: rabbitmq
restart: always
ports:
- "15672:15672"
- "5672:5672"
volumes:
- /data/rabbitmq:/data
environment:
- RABBITMQ_USERNAME=serialt
- RABBITMQ_PASSWORD=msg.pass
- RABBITMQ_VHOST=*
2、工作模式
1)Simple 模式
单发单收,消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除
2)工作队列 Work Queue
让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
3)发布订阅Pub/Sub模式
在这种模型中,多了一个 Exchange 角色,而且过程略有变化: P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X (交换机)。 C:消费者,消息的接收者,会一直等待消息到来。 Queue:消息队列,接收消息、缓存消息。 Exchange:交换机(X) ,一方面,接收生产者发送的消息。另一方面,如何处理消息,递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列。
- Direct:全值匹配,把消息交给符合指定
routing key的队列。 - Topic:通配符,与Direct类型类似,但Direct类型要求
routing key完全相等,而Topic类型是对routing key进行模糊匹配,比Direct灵活。 - Headers:根据Message的一些头部信息来分发过滤Message,用的比较少。
3、消息持久化
1)队列持久化:在声明队列时,将 durable 参数设置为 true,使得队列在 RabbitMQ 重启后仍然存在。
2)消息持久化:在发送消息时,将 delivery_mode 设置为 2,这样消息会被写入磁盘,即使 RabbitMQ 崩溃或重启,消息也不会丢失。
4、消费端:
消息从Queue发送到消费端之后,消费端会发送一个确认消息:Consumer Ack,有两种确认方式:自动确认和手动确认。
**自动确认:**当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
**手动确认:**在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用ch.Ack(false),手动确认,如果出现异常,则调用d.Reject(true)让其自动重新发送消息。
示例代码
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/streadway/amqp"
)
const (
// 注意:根据你的环境修改此 URL
rabbitMQURL = "amqp://guest:guest@localhost:5661/"
queueName = "q1"
exchangeName = "e1"
exchangeType = "direct" // 可根据实际需求改为 "topic"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func connectToRabbitMQ() (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(rabbitMQURL)
if err != nil {
return nil, nil, fmt.Errorf("无法连接到 RabbitMQ: %w", err)
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("无法打开通道: %w", err)
}
return conn, ch, nil
}
// 声明交换机(durable=false 以匹配可能已存在的配置)
func declareExchange(ch *amqp.Channel) error {
return ch.ExchangeDeclare(
exchangeName, // name
exchangeType, // type
true, // durable ← 关键:设为 false 避免冲突
false, // delete when unused
false, // internal
false, // no-wait
nil, // arguments
)
}
// 声明队列(durable=false,与已有队列一致)
func declareQueue(ch *amqp.Channel) error {
_, err := ch.QueueDeclare(
queueName,
false, // durable ← 必须为 false,否则报 406 错误
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // arguments
)
return err
}
// 绑定队列到交换机
func bindQueue(ch *amqp.Channel) error {
return ch.QueueBind(
queueName, // queue
queueName, // routing key(你当前用队列名作为 routing key)
exchangeName, // exchange
false, // no-wait
nil, // arguments
)
}
// Producer:发送消息
func Producer() {
conn, ch, err := connectToRabbitMQ()
failOnError(err, "连接失败")
defer conn.Close()
defer ch.Close()
// err = declareExchange(ch)
// failOnError(err, "声明交换机失败")
// err = declareQueue(ch)
// failOnError(err, "声明队列失败")
// err = bindQueue(ch)
// failOnError(err, "绑定队列失败")
body := "data hellocccc! " + time.Now().Format("2006-01-02 15:04:05")
err = ch.Publish(
exchangeName,
queueName, // routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "发布消息失败")
log.Printf(" [x] 已发送消息: %s", body)
}
// Consumer:接收消息
func Consumer() {
conn, ch, err := connectToRabbitMQ()
failOnError(err, "连接失败")
defer conn.Close()
// 不关闭 channel,因为消费者长期运行
err = declareExchange(ch)
failOnError(err, "声明交换机失败")
err = declareQueue(ch)
failOnError(err, "声明队列失败")
err = bindQueue(ch)
failOnError(err, "绑定队列失败")
msgs, err := ch.Consume(
queueName,
"",
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil,
)
failOnError(err, "注册消费者失败")
log.Println(" [*] 等待消息... (按 CTRL+C 退出)")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] 收到消息: %s", string(d.Body))
time.Sleep(1 * time.Second) // 模拟处理
}
}()
<-forever
}
func main() {
if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr, "用法: %s [producer|consumer]\n", os.Args[0])
os.Exit(1)
}
switch os.Args[1] {
case "p":
Producer()
case "c":
Consumer()
default:
fmt.Fprintf(os.Stderr, "未知命令: %s\n", os.Args[1])
fmt.Fprintf(os.Stderr, "用法: %s [producer|consumer]\n", os.Args[0])
os.Exit(1)
}
}