首页 » 网站推广 » phpactivemq重连技巧_ActiveMQ实现消息失落败重发机制和两种模式PTP和PubSub

phpactivemq重连技巧_ActiveMQ实现消息失落败重发机制和两种模式PTP和PubSub

访客 2024-11-19 0

扫一扫用手机浏览

文章目录 [+]

异步接管,减少软件多系统集成的耦合度。
可靠吸收,确保在中间件可靠保存,多个也可以组成原子事务。

然而ActiveMQ默认的配置性能偏低,须要优化,但是配置文件繁芜,ActiveMQ本身不供应管理工具,主页上的文档看上去比较全面,但是缺少一种有效的组织办法,文档只有片段,很难由浅入深进行理解。

phpactivemq重连技巧_ActiveMQ实现消息失落败重发机制和两种模式PTP和PubSub

实操:直接上代码添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!-- activeMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>添加配置

# ActiveMQ activemq: url: failover:(tcp://127.0.0.1:61616) username: admin password: admin defaultQueueName: dt.queue办法一:点对点模式自定义ActiveMQ的配置类

package io.dt.xmq.config;import javax.jms.Queue;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ActiveMQSession;import org.apache.activemq.RedeliveryPolicy;import org.apache.activemq.command.ActiveMQQueue;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.jms.annotation.EnableJms;import org.springframework.jms.config.DefaultJmsListenerContainerFactory;import org.springframework.jms.core.JmsTemplate;/ <p> Title: ActiveMQConfig </p> <p> Description: 自定义ActiveMQ的配置 </p> activemq切实其实认机制便是文档中说的ack机制有:<br> AUTO_ACKNOWLEDGE = 1 自动确认 <br> CLIENT_ACKNOWLEDGE = 2 客户端手动确认<br> DUPS_OK_ACKNOWLEDGE = 3 自动批量确认<br> SESSION_TRANSACTED = 0 事务提交并确认<br> INDIVIDUAL_ACKNOWLEDGE = 4 单条确认 activemq 独占<br> @author dt.xiao @date 2023年9月30日 /@EnableJms@Configurationpublic class ActiveMQConfig {@Value("${activemq.defaultQueueName}")private String defaultQueueName;@Value("${activemq.url}")private String url;@Value("${activemq.username}")private String username;@Value("${activemq.password}")private String password;//配置默认的目的地@Beanpublic Queue queue() {return new ActiveMQQueue(defaultQueueName);}/ 重发机制RedeliveryPolicy 以下情形会导致重发: 1.在利用事务的Session中,调用rollback()方法; 2.在利用事务的Session中,调用commit()方法之前就关闭了Session; 3.在Session中利用CLIENT_ACKNOWLEDGE签收模式或者INDIVIDUAL_ACKNOWLEDGE模式,并且调用了recover()方法。
/@Beanpublic RedeliveryPolicy rediliveryPolicy() {RedeliveryPolicy rp = new RedeliveryPolicy();// 是否在每次考试测验重新发送失落败后,增长这个等待韶光rp.setUseExponentialBackOff(true);// 重发次数,默认为6次 这里设置为10次rp.setMaximumRedeliveries(10);// 重发韶光间隔,默认为1秒rp.setInitialRedeliveryDelay(1);// 第一次失落败后重新发送之前等待500毫秒,第二次失落败再等待500 2毫秒,这里的2便是valuerp.setBackOffMultiplier(2);// 是否避免碰撞rp.setUseCollisionAvoidance(false);// 设置重发最大拖延韶光-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效rp.setMaximumRedeliveryDelay(-1);return rp;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory (RedeliveryPolicy redeliveryPolicy){ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username,password, url);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);return activeMQConnectionFactory;} @Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){JmsTemplate jmsTemplate=new JmsTemplate();//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setDeliveryMode(2);jmsTemplate.setConnectionFactory(activeMQConnectionFactory);//此处可不设置默认,在发送时也可设置行列步队jmsTemplate.setDefaultDestination(queue); return jmsTemplate;} //定义一个监听器连接工厂,这里定义的是点对点模式的监听器连接工厂@Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(activeMQConnectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔韶光factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);return factory;} }
生产者

package io.dt.xmq.producer;import javax.jms.Destination;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Component;/ <p>Title: Producer</p> <p>Description: 生产者</p> @author dt.xiao @date 2023年9月30日 /@Componentpublic class Producer {private final static Logger log = LoggerFactory.getLogger(Producer.class);@Autowiredprivate JmsTemplate jmsTemplate;/ <p>Title: sendMessage</p> <p>Description: 发送到指定的Destination中</p> @param dest @param message /public void sendMessage(Destination dest, final String message){log.debug("[发送]-配送模式:"+jmsTemplate.getDeliveryMode()+"\t目的地:"+dest.toString()+"\t发送的报文内容:"+message);jmsTemplate.convertAndSend(dest, message);}/ <p>Title: sendMessage</p> <p>Description: 发送到默认的Queue中</p> @param message /public void sendMessage(final String message){log.debug("[发送]-配送模式:"+jmsTemplate.getDeliveryMode()+"\t目的地:默认配置的Queue \t发送的报文内容:"+message);jmsTemplate.convertAndSend(message);}}消费者

package io.dt.xmq.consumer;import javax.jms.JMSException;import javax.jms.Session;import javax.jms.TextMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;/ <p>Title: AsynListenerConsumer</p> <p>Description: 点对点的消费者[异步监听模式]</p> @author dt.xiao @date 2023年9月30日 /@Componentpublic class AsynListenerConsumer {private static final Logger log = LoggerFactory.getLogger(AsynListenerConsumer.class);@JmsListener(destination="xiaojl.queue",containerFactory="jmsQueueListener")public void receiveQueue(final TextMessage text, Session session) throws JMSException{try {log.debug("[处理]-ID:"+text.getJMSMessageID()+"\t收到的报文内容:"+text.getText());boolean f = true;if(f){throw new JMSException("假设涌如今消费过程中涌现非常!
");}// 利用手动签收模式,须要手动的调用,如果不在catch中调用session.recover()只会在重启做事后重发text.acknowledge();} catch (JMSException e) {log.error("[处理]-涌现非常",e.getLocalizedMessage());session.recover();// 此不可省略 重发信息利用}}}
单元测试

package io.dt;import javax.jms.Destination;import org.apache.activemq.command.ActiveMQQueue;import org.apache.activemq.command.ActiveMQTopic;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import io.dt.xmq.ActivemqApp;import io.dt.xmq.producer.Producer;/ <p>Title: ApplicationTest</p> <p>Description: </p> @author dt.xiao @date 2023年9月30日 /@RunWith(SpringRunner.class)@SpringBootTest(classes=ActivemqApp.class)public class ApplicationTest {@Autowiredprivate Producer producer;@Testpublic void t_send(){String message = "x11111";Destination dest = new ActiveMQQueue("dt.queue");producer.sendMessage(dest, message);//仿照做事在线while(true){}}}单元测试运行日志

2023-09-30 16:21:41.968 INFO 2420 --- [ main] io.dt.ApplicationTest : Started ApplicationTest in 3.705 seconds (JVM running for 5.228)2023-09-30 16:21:42.245 DEBUG 2420 --- [ main] io.dt.xmq.producer.Producer : [发送]-配送模式:2目的地:queue://dt.queue发送的报文内容:x111112023-09-30 16:21:42.256 INFO 2420 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://127.0.0.1:616162023-09-30 16:21:42.512 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:42.516 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:42.532 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:42.532 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:42.536 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:42.540 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:42.581 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:42.582 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:42.872 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:42.872 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:42.912 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:42.912 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:42.946 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:42.946 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:43.014 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:43.014 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:43.143 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:43.143 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:43.400 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:43.400 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常2023-09-30 16:21:43.913 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1收到的报文内容:x111112023-09-30 16:21:43.914 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [处理]-涌现非常

从实行结果可以看到,消费者AsynListenerConsumer,在处理的过程中涌现了非常,从而ActiveMQ重发了10次给AsynListenerConsumer。

phpactivemq重连技巧_ActiveMQ实现消息失落败重发机制和两种模式PTP和PubSub
(图片来自网络侵删)

办法二:发布/订阅模式添加ActiveMQ的配置类

//定义一个监听器连接工厂,这里定义的是发布/订阅模式的监听器连接工厂@Bean(name = "jmsTopicListener")public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory){DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(activeMQConnectionFactory);//设置jms并发数factory.setConcurrency("1-10");//重连间隔韶光factory.setRecoveryInterval(1000L);//CLIENT_ACKNOWLEDGE = 2 客户端手动确认factory.setSessionAcknowledgeMode(ActiveMQSession.CLIENT_ACKNOWLEDGE); //默认情形下activemq供应的是queue模式,若要利用topic模式须要配置下面配置factory.setPubSubDomain(true); return factory;}消费者1

package io.dt.xmq.consumer;import javax.jms.JMSException;import javax.jms.Session;import javax.jms.TextMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;/ <p>Title: TopicConsumer</p> <p>Description: 发布/订阅模式的消费者</p> @author dt.xiao @date 2018年9月30日 /@Componentpublic class TopicConsumer {private static final Logger log = LoggerFactory.getLogger(TopicConsumer.class);@JmsListener(destination="dt.topic", containerFactory="jmsTopicListener")public void receiveTopic(final TextMessage text, Session session) throws JMSException{try {log.debug("[处理]-ID:"+text.getJMSMessageID()+"\t收到的报文内容:"+text.getText());boolean f = true;if(f){throw new JMSException("假设涌如今消费过程中涌现非常!
");}// 利用手动签收模式,须要手动的调用,如果不在catch中调用session.recover()只会在重启做事后重发text.acknowledge();} catch (JMSException e) {log.error("[处理]-涌现非常",e.getLocalizedMessage());session.recover();}}}
消费者2

package io.dt.xmq.consumer;import javax.jms.JMSException;import javax.jms.Session;import javax.jms.TextMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;/ <p>Title: TopicConsumer</p> <p>Description: 发布/订阅模式的消费者</p> @author dt.xiao @date 2018年9月30日 /@Componentpublic class TopicConsumer2 {private static final Logger log = LoggerFactory.getLogger(TopicConsumer2.class);@JmsListener(destination="dt.topic",containerFactory="jmsTopicListener")public void receiveTopic(final TextMessage text,Session session) throws JMSException{try {log.debug("[处理2]-ID:"+text.getJMSMessageID()+"\t收到的报文内容:"+text.getText());// 利用手动签收模式,须要手动的调用,如果不在catch中调用session.recover()只会在重启做事后重发text.acknowledge();} catch (JMSException e) {log.error("[处理2]-涌现非常",e.getLocalizedMessage());session.recover();}}}单元测试

@Testpublic void t_sendTopic(){String message = "dtTopic-111";Destination dest = new ActiveMQTopic("dt.topic");producer.sendMessage(dest, message);//仿照做事在线while(true){}}单元测试运行日志

2018-09-30 16:20:23.419 INFO 6732 --- [ main] io.dt.ApplicationTest : Started ApplicationTest in 3.893 seconds (JVM running for 5.388)2018-09-30 16:20:23.624 DEBUG 6732 --- [ main] io.dt.xmq.producer.Producer : [发送]-配送模式:2目的地:topic://dt.topic发送的报文内容:dtTopic-1112018-09-30 16:20:23.818 INFO 6732 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://127.0.0.1:616162018-09-30 16:20:23.892 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer2 : [处理2]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:23.923 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:23.924 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:23.962 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:23.964 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:23.971 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:23.971 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:23.980 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:23.982 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:23.990 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:23.991 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:24.011 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:24.011 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:24.050 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:24.051 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:24.134 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:24.135 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:24.266 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:24.266 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:24.523 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:24.523 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常2018-09-30 16:20:25.037 DEBUG 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1收到的报文内容:dtTopic-1112018-09-30 16:20:25.037 ERROR 6732 --- [enerContainer-1] io.dt.xmq.consumer.TopicConsumer : [处理]-涌现非常

从实行结果可以看到,消费者TopicConsumer,在处理的过程中涌现了非常,从而ActiveMQ重发了10次给TopicConsumer。

总结

本文中通过一个示例程序讲解ActiveMQ对点对点模式和发布/订阅模式的配置和利用。
在此同时,我们还配置了失落败重试机制。

相关文章

php统计语法技巧_PHP入门PHP基本语法一

认识PHP代码标识利用不同的四对标记指令分隔符程序注释变量什么是变量如何定义变量变量名的命名规则4.1 PHP标量类型—整型4.2...

网站推广 2024-12-09 阅读0 评论0

nginxphpfpm分别技巧_nginx动静分离实战

动静分离示意图准备我将用三个端口(80,8001,8002)代表负载均衡,静态资源,动态资源。当然你也可以利用其他做事器来供应静态...

网站推广 2024-12-09 阅读0 评论0