首页 » 网站建设 » rabbitmqphp主动断开技巧_RabbitMQ 若何实现数据100不损失

rabbitmqphp主动断开技巧_RabbitMQ 若何实现数据100不损失

duote123 2024-12-09 0

扫一扫用手机浏览

文章目录 [+]

如图所示,在每个步骤中都可能会发生丢失的情形,因而我们须要有方法来保障系统的可靠性。
(磁盘破坏等极度情形就不在我们的考虑范围啦……)

一、生产端投递可靠性

生产端投递丢失的缘故原由有很多,例如在传输过程中发生网络故障导致丢失,或者投递到RabbitMQ时RabbitMQ挂了,从而引发丢失,此类情形我们根本无法预估并知晓。
因而,我们可以利用RabbitMQ的一些机制来处理

rabbitmqphp主动断开技巧_RabbitMQ 若何实现数据100不损失

1.事务机制

事务处理方案所说能够保障的强同等性,然则以造成的性能损耗较大,面对大量推送时性能问题严重,故而我们可以考虑轻量级办理方案:confirm确认机制

rabbitmqphp主动断开技巧_RabbitMQ 若何实现数据100不损失
(图片来自网络侵删)
2.confirm确认机制

生产端投递的投递到RabbitMQ后,RabbitMQ将发送一个确认给莅临盆端,让生产端知晓我已收到,否则这条就可能丢失了,须要生产端再次发起投递。

开启确认机制:

err = channel.Confirm(false)

以confirm模式发送示例:

package mainimport ( "fmt" "github.com/streadway/amqp" "rmq/db/rmq")var ( channel amqp.Channel err error queue amqp.Queue conn amqp.Connection)func main() { conn, err = rmq.GetConn() defer conn.Close() channel, err = conn.Channel() if err != nil { fmt.Printf("error: %s \n", err.Error()) return } defer channel.Close() err = channel.Confirm(false) if err != nil { fmt.Printf("error: %s \n", err.Error()) return } queue, err = channel.QueueDeclare("confirm:message", false, false, false, false, nil) if err != nil { fmt.Printf("error: %s \n", err.Error()) return } confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1)) defer confirmOne(confirms) err = channel.Publish("", queue.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte("confirm message"), }) if err != nil { fmt.Printf("error: %s \n", err.Error()) return } fmt.Println("发送成功")}func confirmOne(confirms <-chan amqp.Confirmation) { if confirmed := <-confirms; confirmed.Ack { fmt.Printf("带标志的投递确认: %d", confirmed.DeliveryTag) } else { fmt.Printf("投递确认: %d", confirmed.DeliveryTag) }}

采取此种方案就可以让生产端感知到是否投递到RabbitMQ中。

二、持久化

RabbitMQ收到后是暂存到内存当中,此时若RabbitMQ挂了,重启做事将会导致数据丢失,以是我们应该将干系数据持久化到硬盘中,这样RabbitMQ重启后依然可以到硬盘中取数据规复。
达到RabbitMQ后先到exchange交流机,然后路由到queue行列步队,末了发送给消费端。

因而须要对exchange、queue、message都进行持久化:exchange持久化:

// 第三个参数true表示这个exchange持久化channel.ExchangeDeclare("exchange", "direct", true, false, false, true, nil)

queue持久化:

// 第二个参数true标识这个queue持久化channel.QueueDeclare("confirm:message", true, false, false, false, nil)

message持久化:

err = ch.Publish("exchange","queue name",false,false,amqp.Publishing{ DeliveryMode: amqp.Persistent, // 持久化 ContentType: "text/plain", Body: []byte(msgBody), })

把稳:如果须要持久化,Queue也是须要设定为持久化才有效。
这样,如果RabbitMQ收到后挂掉了,重启后会自行规复。
思考一下,基于以上几种机制,我们还是不能完备担保可靠性投递到RabbitMQ中,比如极度情形,RabbitMQ收到还没来得及将持久化到硬盘时,RabbitMQ挂掉了,此时依然是丢失了;或者RabbitMQ在发送确认给生产真个过程中,由于网络故障导致生产端没有收到确认,导致生产端不知道RabbitMQ是否收到,依然不好处理接下来的业务。

因而我们须要在上述机制的根本上,考虑一些补偿机制,以应对部分极度情形,比如入库。

三、入库

我们可以考虑将要发送的保存到数据库中,标注一个状态字段status=0,标识生产端将发送给RabbitMQ但还没收到确认回答。
在生产端收到RabbitMQ确认回答后,将status设为1,表示RabbitMQ已收到。
考虑到前面提到的极度情形,我们可以在生产端开设一个定时器,定时检索表,将status=0并且超过固定期限后还没收到确认的内容取出重发(此时消费端要考虑重复情形,提前做好幂等性设置),并设定重发最大次数,超限做单独的分外处理。

到此,就可以可靠性投递到RabbitMQ中,生产端也可以正常感知了。

四、消费端信息不丢失

正常情形下,以下三种情形会导致丢失

在RabbitMQ将发出后,消费端还没有吸收到前发生网络故障,消费端与RabbitMQ断开连接,此时会丢失;在RabbitMQ将发出后,消费端还没有吸收到前消费端挂了,此时会丢失;消费端准备吸收到后,但在处理过程中发生非常或宕机,会丢失。
综合上述三种情形,都是由于RabbitMQ的自动ack机制,即RabbitMQ默认在发出后就立即将这条删除,而不管消费端是否吸收到,是否处理完,导致消费端丢失时,RabbitMQ也没有该了。
因此就须要将自动ack机制改为手动ack机制。
消费端手动确认:

// 吸收messages,err := r.channel.Consume( r.QueueName, // 行列步队名称 "", // 区分消费者 false, // 设置关闭自动应答,每条必须手动ack false, false, false, nil)if err != nil{ fmt.Println(err)}// 消费channel := make(chan bool)go func(){ for k := range messages{ // true 表示确认所有未确认 // false 表示确认当前 k.Ack(false) }}()// 壅塞主程序<-channel

当autoAck设置为false时,对付RabbitMQ做事端而言,行列步队中的就分为两类:

等待投递给消费真个已经投递给消费端,等待消费端发回确认旗子暗记的

如果RabbitMQ一贯没有吸收到消费端的确认旗子暗记,且消费端已经断开链接或宕机,此时RabbitMQ会将此重新放入行列步队,等待下次投递。
故而消费端也须要做好幂等性设置,确保重复处理机制。
综合以上方案,即可担保生产端-RabbitMQ-消费端全链路数据不丢失啦!

相关文章

phpcmdbheight技巧_MagicodesIE 22宣告

导入导出通用库,支持DTO导入导出以及动态导出,支持Excel、Word、PDF、CSV和HTML。已加入ncc开源组织.Magi...

网站建设 2024-12-16 阅读0 评论0

大数据计划,未来社会发展的基石

在信息爆炸的时代,大数据已经渗透到了我们生活的方方面面。我国政府高度重视大数据发展,将其作为国家战略予以推进。本文将从大数据计划的...

网站建设 2024-12-16 阅读0 评论0