使用Go和RabbitMQ实现分布式事务


RabbitMQ 是一个开源的消息代理和队列服务器,它允许应用程序通过共享服务或消息队列进行异步通信。在这篇文章中,我们将探讨如何在 Go 应用程序中使用 RabbitMQ 来实现分布式事务,着重讲解如何进行连接配置。

1. 安装 RabbitMQ 客户端
Go 的 RabbitMQ 客户端库是 amqp,你可以使用 go get 命令来安装:
go get github.com/streadway/amqp2. 连接到 RabbitMQ 服务器
要连接到 RabbitMQ 服务器,我们需要创建一个 amqp.Connection 对象。在创建这个对象时,需要提供一个连接字符串(URL),它包含了 RabbitMQ 服务器的地址、端口、用户名和密码。
下面是一个创建连接的示例:
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil { log.Fatalf("failed to connect to RabbitMQ: %v", err)}defer conn.Close()
在这个示例中,我们使用的是默认的用户名(guest)和密码(guest),并假设 RabbitMQ 服务器运行在本机的默认端口(5672)上。在实际应用中,需要根据实际环境进行修改。
3. 创建一个 Channel
在 RabbitMQ 中,所有的操作都是在 Channel(信道)中进行的。因此,在发送或接收消息前,我们需要先创建一个 Channel:
ch, err := conn.Channel()if err != nil { log.Fatalf("failed to open a channel: %v", err)}defer ch.Close()4. 声明一个 Queue
在 RabbitMQ 中,消息是存储在 Queue(队列)中的。因此,我们需要声明一个 Queue,以便生产者可以发送消息到这个 Queue,消费者可以从这个 Queue 中接收消息:
q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments)if err != nil { log.Fatalf("failed to declare a queue: %v", err)}5. 发送和接收消息
下面是一个发送消息的示例:
body := "Hello World!"err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })if err != nil { log.Fatalf("failed to publish a message: %v", err)}
下面是一个接收消息的示例:
msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args)if err != nil { log.Fatalf("failed to register a consumer: %v", err)}forever := make(chan bool)go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) d.Ack(false) }}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever完整代码
下面是使用Go和RabbitMQ实现分布式事务的示例代码:
服务器端(Producer)
package mainimport ( "github.com/streadway/amqp" "log")func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) }}func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "task_queue", true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") body := "Hello World!" err = ch.Publish( "", q.Name, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body)}
2. 客户端(Consumer)
package mainimport ( "github.com/streadway/amqp" "log" "time")func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) }}func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "task_queue", true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") msgs, err := ch.Consume( q.Name, "", false, false, false, false, nil, ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) time.Sleep(2 * time.Second) //模拟耗时操作 d.Ack(false) log.Printf("Done") } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever}RabbitMQ 的 Quality of Service(服务质量)
ch.Qos 方法是用来设置 RabbitMQ 的 Quality of Service(服务质量)参数的。该函数的原型如下:
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
prefetchCount:这是一个消息预取数设置。当你设置为1时,意味着在一个消费者处理完一个消息并且对该消息进行了确认前,不会分派新的消息给消费者。也就是说,消费者在同一时间只会处理一条消息。这样可以实现更公平的消息分发,防止某些消费者一直忙于处理消息,而其他消费者则什么也没做。
prefetchSize:这是预取大小设置,单位为字节。如果设置为非零值,服务器将会试图保证在为消费者分派新消息之前,至少会有这么多字节的消息已经在消费者的网络缓冲区中。然而,这个设置在 RabbitMQ 的当前实现中并没有实际效果,因为它并没有实现对这个参数的支持。所以,通常我们将它设置为0。
global:这是一个标志位,用来指明上述设置是只对当前的 Channel 有效(如果设置为 false),还是对整个 Connection 有效(如果设置为 true)。
例如,以下代码设置了预取计数为1,这样在同一时间,每个消费者最多只会处理一条消息:
err := ch.Qos( 1, // prefetch count 0, // prefetch size false, // global)if err != nil { log.Fatal(err)}总结
在这篇文章中,我们了解了如何在 Go 程序中使用 RabbitMQ 来实现分布式事务,包括如何安装 RabbitMQ 客户端库、如何连接到 RabbitMQ 服务器、如何创建 Channel 和 Queue,以及如何发送和接收消息。我们还详细讲解了如何进行连接配置。希望这篇文章对你有所帮助!
到顶部