首页 » Web前端 » php收发rabbitmq技巧_SpringBootRabbitMQ办法收发消息一文带你体验

php收发rabbitmq技巧_SpringBootRabbitMQ办法收发消息一文带你体验

访客 2024-12-01 0

扫一扫用手机浏览

文章目录 [+]

本篇会和SpringBoot做整合,采取自动配置的办法进行开拓,我们只须要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~

交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效。

php收发rabbitmq技巧_SpringBootRabbitMQ办法收发消息一文带你体验

1. 环境配置

第一节我们先来搞一下环境的配置,上一篇中我们已经引入了自动配置的包,我们既然利用了自动配置的办法,那RabbitMQ的连接信息我们直接放在配置文件中就行了,就像我们须要用到JDBC连接的时候去配置一下DataSource一样。

php收发rabbitmq技巧_SpringBootRabbitMQ办法收发消息一文带你体验
(图片来自网络侵删)

如图所示,我们只须要指明一下连接的IP+端口号和用户名密码就行了,这里我用的是默认的用户名与密码,不写的话默认也都是guest,端口号也是默认5672。

紧张我们须要看一下手动确认的配置,须要配置成manual才是手动确认,日后还会有其他的配置项,眼下我们配置这一个就可以了。

接下来我们要配置一个Queue,上一篇中我们往一个名叫erduo的行列步队中发送,当时是我们手动定义的此行列步队,这里我们也须要手动配置,声明一个Bean就可以了。

@Configurationpublic class RabbitmqConfig { @Bean public Queue erduo() { // 其三个参数:durable exclusive autoDelete // 一样平常只设置一下持久化即可 return new Queue("erduo",true); }}

就这么大略声明一下就可以了,当然了RabbitMQ毕竟是一个独立的组件,如果你在RabbitMQ中通过其他办法已经创建过一个名叫erduo的行列步队了,你这里也可以不声明,这里起到的一个效果便是如果你没有这个行列步队,会按照你声明的办法帮你创建这个行列步队。

配置完环境之后,我们就可以以SpringBoot的办法来编写生产者和消费者了。

2. 生产者与RabbitTemplate

和上一篇的节奏一样,我们先来编写生产者,不过这次我要引入一个新的工具:RabbitTemplate。

听它的这个名字就知道,又是一个拿来即用的工具类,Spring家族这点就很舒畅,什么东西都给你封装一遍,让你用起来更方便更顺手。

RabbitTemplate实现了标准AmqpTemplate接口,功能大致可以分为发送和接管。

我们这里是在生产者中来用,紧张便是利用它的发送功能:send和convertAndSend方法。

// 发送到默认的Exchange,利用默认的routing keyvoid send(Message message) throws AmqpException;// 利用指定的routing key发送到默认的exchangevoid send(String routingKey, Message message) throws AmqpException;// 利用指定的routing key发送到指定的exchangevoid send(String exchange, String routingKey, Message message) throws AmqpException;

send方法是发送byte数组的数据的模式,这里代表内容的工具是Message工具,它的布局方法便是传入byte数组数据,以是我们须要把我们的数据转成byte数组然后布局成一个Message工具再进行发送。

// Object类型,可以传入POJOvoid convertAndSend(Object message) throws AmqpException;void convertAndSend(String routingKey, Object message) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

convertAndSend方法是可以传入POJO工具作为参数,底层是有一个MessageConverter帮我们自动将数据转换成byte类型或String或序列化类型。

以是这里支持的传入工具也只有三种:byte类型,String类型和实现了Serializable接口的POJO。

先容完了,我们可以看一下代码:

@Slf4j@Component("rabbitProduce")public class RabbitProduce { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String message = "Hello 我是作者和耳朵,欢迎关注我。
" + LocalDateTime.now().toString(); System.out.println("Message content : " + message); // 指定类型 MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build(); rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props)); System.out.println("发送完毕。
"); } public void convertAndSend() { User user = new User(); System.out.println("Message content : " + user); rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user); System.out.println("发送完毕。
"); }}

这里我特意写明了两个例子,一个用来测试send,另一个用来测试convertAndSend。

send方法里我们看下来和之前的代码是险些一样的,定义一个,然后直接send,但是这个布局的布局方法可能比我们想的要多一个参数,我们原来说的只要把数据转成二进制数组放进去即可,现在看来还要多放一个参数了。

MessageProperties,是的我们须要多放一个MessageProperties工具,从他的名字我们也可以看出它的功能便是附带一些参数,但是某些参数是少不了的,不带弗成。

比如我的代码这里便是设置了一下的类型,的类型有很多种可以是二进制类型,文本类型,或者序列化类型,JSON类型,我这里设置的便是文本类型,指定类型是必须的,也可以为我们拿到之后要将转换成什么样的工具供应一个参考。

convertAndSend方法就要大略太多,这里我放了一个User工具拿来测试用,直接指定行列步队然后放入这个工具即可。

Tips:User必须实现Serializable接口,不然的话调用此方法的时候会抛出IllegalArgumentException非常。

代码完成之后我们就可以调用了,这里我写一个测试类进行调用:

@SpringBootTestpublic class RabbitProduceTest { @Autowired private RabbitProduce rabbitProduce; @Test public void sendSimpleMessage() { rabbitProduce.send(); rabbitProduce.convertAndSend(); }}

效果如下图~

同时在掌握台利用命令rabbitmqctl.bat list_queues查看行列步队-erduo现在的情形:

如此一来,我们的生产者测试就算完成了,现在行列步队里两条了,而且类型肯定不一样,一个是我们设置的文本类型,一个是自动设置的序列化类型。

3. 消费者与RabbitListener

既然行列步队里面已经有了,接下来我们就要看我们该如何通过新的办法拿到并消费与确认了。

消费者这里我们要用到@RabbitListener来帮我们拿到指定行列步队,它的用法很大略也很繁芜,我们可以先来说大略的办法,直接放到方法上,指定监听的行列步队就行了。

@Slf4j@Component("rabbitConsumer")public class RabbitConsumer { @RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("已确认"); }}

这段代码就代表onMessage方法会处理erduo(Producer.QUEUE_NAME是常量字符串"erduo")行列步队中的。

我们可以看到这个方法里面有两个参数,Message和Channel,如果用不到Channel可以不写此参数,但是Message一定是要的,它代表了本身。

我们可以想想,我们的程序从RabbitMQ之中拉回一条条之后,要以怎么样的办法展示给我们呢?

没错,便是封装为一个个Message工具,这里面放入了一条的所有信息,数据构造是什么样一会我一run你就能看到了。

同时这里我们利用Channel做一个确认的操作,这里的DeliveryTag代表的是这个在行列步队中的序号,这个信息存放在MessageProperties中。

4. SpringBoot 启动!

编写完生产者和消费者,同时已经运行过生产者往行列步队里面放了两条信息,接下来我们可以直接启动,查看消费情形:

在我赤色框线标记的地方可以看到,由于我们有了消费者以是项目启动后先和RabbitMQ建立了一个连接进行监听行列步队。

随后就开始消费我们行列步队中的两条:

第一条信息是contentType=text/plain类型,以是直接就在掌握台上打印出了详细内容。

第二条信息是contentType=application/x-java-serialized-object,在打印的时候只打印了一个内存地址+字节大小。

不管怎么说,数据我们是拿到了,也便是代表我们的消费是没有问题的,同时也都进行了确认操作,从数据上看,全体可以分为两部分:body和MessageProperties。

我们可以单独利用一个表明拿到这个body的内容 - @Payload

@RabbitListener(queues = Producer.QUEUE_NAME)public void onMessage(@Payload String body, Channel channel) throws Exception { System.out.println("Message content : " + body);}

也可以单独利用一个表明拿到MessageProperties的headers属性,headers属性在截图里也可以看到,只不过是个空的 - @Headers。

@RabbitListener(queues = Producer.QUEUE_NAME)public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception { System.out.println("Message content : " + body); System.out.println("Message headers : " + headers);}

这两个表明都算是扩展知识,我还是更喜好直接拿到全部,全都要!


上面我们已经完成了的发送与消费,全体过程我们可以再次回忆一下,统统都和我画的这张图上一样的轨迹:

只不过我们一贯没有指定Exchage一贯利用的默认路由,希望大家好好记住这张图。

5. @RabbitListener与@RabbitHandler

下面再来补一些知识点,有关@RabbitListener与@RabbitHandler。

@RabbitListener上面我们已经大略的进行了利用,轻微扩展一下它实在是可以监听多个行列步队的,就像这样:

@RabbitListener(queues = { "queue1", "queue2" })public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("已确认");}

还有一些其他的特性如绑定之类的,这里不再赘述由于太硬编码了一样平常用不上。

下面来说说这节要紧张讲的一个特性:@RabbitListener和@RabbitHandler的搭配利用。

前面我们没有提到,@RabbitListener表明实在是可以表明在类上的,这个表明在类上标志着这个类监听某个行列步队或某些行列步队。

这两个表明的搭配利用就要让@RabbitListener表明在类上,然后用@RabbitHandler表明在方法上,根据方法参数的不同自动识别并去消费,写个例子给大家看一看更直不雅观一些。

@Slf4j@Component("rabbitConsumer")@RabbitListener(queues = Producer.QUEUE_NAME)public class RabbitConsumer { @RabbitHandler public void onMessage(@Payload String message){ System.out.println("Message content : " + message); } @RabbitHandler public void onMessage(@Payload User user) { System.out.println("Message content : " + user); }}

大家可以看看这个例子,我们先用@RabbitListener监听erduo行列步队中的,然后利用@RabbitHandler表明了两个方法。

第一个方法的body类型是String类型,这就代表着这个方法只能处理文本类型的。
第二个方法的body类型是User类型,这就代表着这个方法只能处理序列化类型且为User类型的。

这两个方法恰好对应着我们第二节中测试类会发送的两种,以是我们往RabbitMQ中发送两条测试,用来测试这段代码,看看效果:

都在掌握台上如常打印了,如果@RabbitHandler表明的方法中没有一个的类型可以和你的类型对的上,比如都是byte数组类型,这里没有对应的方法去吸收,系统就会在掌握台不断的报错,如果你涌现这个情形就证明你类型写的禁绝确。

假设你的erduo行列步队中会涌现三种类型的:byte,文本和序列化,那你就必须要有对应的处理这三种的方法,不然发过来的时候就会由于无法精确转换而报错。

而且利用了@RabbitHandler表明之后就不能再和之前一样利用Message做吸收类型。

@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("已确认");}

这样写的话会报类型转换非常的,以是二者选其一。

同时上文我的@RabbitHandler没有进行确认,大家可以自己试一下进行确认。

6. 的序列化转换

通过上文我们已经知道,能被自动转换的工具只有byte[]、String、java序列化工具(实现了Serializable接口的工具),但是并不是所有的Java工具都会去实现Serializable接口,而且序列化的过程中利用的是JDK自带的序列化方法,效率低下。

以是我们更普遍的做法是:利用Jackson先将数据转换成JSON格式发送给RabbitMQ,再吸收的时候再用Jackson将数据反序列化出来。

这样做可以完美办理上面的痛点:工具既不必再去实现Serializable接口,也有比较高的效率(Jackson序列化效率业界该当是最好的了)。

默认的转换方案是转换顶层接口-MessageConverter的一个子类:SimpleMessageConverter,我们如果要换到另一个转换器只须要更换掉这个转换器就行了。

上图是MessageConverter构造树的构造树,可以看到除了SimpleMessageConverter之外还有一个Jackson2JsonMessageConverter,我们只须要将它定义为Bean,就可以直策应用这个转换器了。

@Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(jacksonObjectMapper); }

这样就可以了,这里的jacksonObjectMapper可以不传入,但是默认的ObjectMapper方案对JDK8的韶光日期序列化会不太友好,详细可以参考我的上一篇文章:从LocalDateTime序列化磋商全局同等性序列化,总的来说便是定义了自己的ObjectMapper。

同时为了接下来测试方便,我又定义了一个专门测试JSON序列化的行列步队:

@Beanpublic Queue erduoJson() { // 其三个参数:durable exclusive autoDelete // 一样平常只设置一下持久化即可 return new Queue("erduo_json",true);}

如此之后就可以进行测试了,先是生产者代码:

public void sendObject() { Client client = new Client(); System.out.println("Message content : " + client); rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client); System.out.println("发送完毕。
"); }

我又重新定义了一个Client工具,它和之前测试利用的User工具成员变量都是一样的,不一样的是它没有实现Serializable接口。

同时为了保留之前的测试代码,我又新建了一个RabbitJsonConsumer,用于测试JSON序列化的干系消费代码,里面定义了一个静态变量:JSON_QUEUE = "erduo_json";

以是这段代码是将Client工具作为发送到"erduo_json"行列步队中去,随后我们在测试类中run一下进行一次发送。

紧着是消费者代码:

@Slf4j@Component("rabbitJsonConsumer")@RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE)public class RabbitJsonConsumer { public static final String JSON_QUEUE = "erduo_json"; @RabbitHandler public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception { System.out.println("Message content : " + client); System.out.println("Message headers : " + headers); channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); System.out.println("已确认"); }}

有了上文的履历之后,这段代码理解起来也是很大略了吧,同时给出了上一节没写的如何在@RabbitHandler模式下进行签收。

我们直接来看看效果:

在打印的Headers里面,今后翻可以看到contentType=application/json,这个contentType是表明了的类型,这里正是解释我们新的转换器生效了,将所有都转换成了JSON类型。

后记

这两篇讲完了RabbitMQ的基本收发,包括手动配置和自动配置的两种办法,这些大家仔细研读之后该当会对RabbitMQ收发没什么疑问了~

作者:和耳朵原文链接:https://juejin.im/post/6859152029823008781

标签:

相关文章

今日头条算法如何实现个化推荐与精准传播

信息传播方式发生了翻天覆地的变化。今日头条作为国内领先的信息分发平台,凭借其强大的算法推荐系统,吸引了海量用户。今日头条的算法究竟...

Web前端 2025-01-31 阅读1 评论0

今日头条算法关闭之谜内容分发新格局

今日头条作为一款备受瞩目的新闻资讯平台,凭借其独特的算法推荐机制,吸引了大量用户。近期有关今日头条算法关闭的消息引发了广泛关注。本...

Web前端 2025-01-31 阅读1 评论0

今日头条算法智能推荐背后的科技魅力

信息爆炸的时代已经到来。人们每天在互联网上接触到海量的信息,如何从中筛选出有价值的内容,成为了人们关注的焦点。今日头条作为一款智能...

Web前端 2025-01-31 阅读1 评论0

今日头条算法专利申请个化推荐的秘密武器

信息爆炸的时代已经来临。在众多信息中,如何快速找到自己感兴趣的内容成为了一个难题。今日头条作为中国领先的资讯平台,凭借其独特的算法...

Web前端 2025-01-31 阅读1 评论0

今日头条算法机器推荐模式的秘密与挑战

大数据、人工智能等新兴技术的应用已经渗透到我们生活的方方面面。在信息爆炸的时代,人们获取信息的渠道越来越丰富,如何在海量信息中找到...

Web前端 2025-01-31 阅读1 评论0