本篇会和SpringBoot做整合,采取自动配置的办法进行开拓,我们只须要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~
交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效。
第一节我们先来搞一下环境的配置,上一篇中我们已经引入了自动配置的包,我们既然利用了自动配置的办法,那RabbitMQ的连接信息我们直接放在配置文件中就行了,就像我们须要用到JDBC连接的时候去配置一下DataSource一样。

如图所示,我们只须要指明一下连接的IP+端口号和用户名密码就行了,这里我用的是默认的用户名与密码,不写的话默认也都是guest,端口号也是默认5672。
紧张我们须要看一下手动确认的配置,须要配置成manual才是手动确认,日后还会有其他的配置项,眼下我们配置这一个就可以了。
接下来我们要配置一个Queue,上一篇中我们往一个名叫erduo的行列步队中发送,当时是我们手动定义的此行列步队,这里我们也须要手动配置,声明一个Bean就可以了。
@Configurationpublic class RabbitmqConfig { @Bean public Queue erduo() { // 其三个参数:durable exclusive autoDelete // 一样平常只设置一下持久化即可 return new Queue("erduo",true); }}
就这么大略声明一下就可以了,当然了RabbitMQ毕竟是一个独立的组件,如果你在RabbitMQ中通过其他办法已经创建过一个名叫erduo的行列步队了,你这里也可以不声明,这里起到的一个效果便是如果你没有这个行列步队,会按照你声明的办法帮你创建这个行列步队。
配置完环境之后,我们就可以以SpringBoot的办法来编写生产者和消费者了。
2. 生产者与RabbitTemplate和上一篇的节奏一样,我们先来编写生产者,不过这次我要引入一个新的工具:RabbitTemplate。
听它的这个名字就知道,又是一个拿来即用的工具类,Spring家族这点就很舒畅,什么东西都给你封装一遍,让你用起来更方便更顺手。
RabbitTemplate实现了标准AmqpTemplate接口,功能大致可以分为发送和接管。
我们这里是在生产者中来用,紧张便是利用它的发送功能:send和convertAndSend方法。
// 发送到默认的Exchange,利用默认的routing keyvoid send(Message message) throws AmqpException;// 利用指定的routing key发送到默认的exchangevoid send(String routingKey, Message message) throws AmqpException;// 利用指定的routing key发送到指定的exchangevoid send(String exchange, String routingKey, Message message) throws AmqpException;
send方法是发送byte数组的数据的模式,这里代表内容的工具是Message工具,它的布局方法便是传入byte数组数据,以是我们须要把我们的数据转成byte数组然后布局成一个Message工具再进行发送。
// Object类型,可以传入POJOvoid convertAndSend(Object message) throws AmqpException;void convertAndSend(String routingKey, Object message) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
convertAndSend方法是可以传入POJO工具作为参数,底层是有一个MessageConverter帮我们自动将数据转换成byte类型或String或序列化类型。
以是这里支持的传入工具也只有三种:byte类型,String类型和实现了Serializable接口的POJO。
先容完了,我们可以看一下代码:
@Slf4j@Component("rabbitProduce")public class RabbitProduce { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String message = "Hello 我是作者和耳朵,欢迎关注我。" + LocalDateTime.now().toString(); System.out.println("Message content : " + message); // 指定类型 MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build(); rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props)); System.out.println("发送完毕。"); } public void convertAndSend() { User user = new User(); System.out.println("Message content : " + user); rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user); System.out.println("发送完毕。"); }}
这里我特意写明了两个例子,一个用来测试send,另一个用来测试convertAndSend。
send方法里我们看下来和之前的代码是险些一样的,定义一个,然后直接send,但是这个布局的布局方法可能比我们想的要多一个参数,我们原来说的只要把数据转成二进制数组放进去即可,现在看来还要多放一个参数了。
MessageProperties,是的我们须要多放一个MessageProperties工具,从他的名字我们也可以看出它的功能便是附带一些参数,但是某些参数是少不了的,不带弗成。
比如我的代码这里便是设置了一下的类型,的类型有很多种可以是二进制类型,文本类型,或者序列化类型,JSON类型,我这里设置的便是文本类型,指定类型是必须的,也可以为我们拿到之后要将转换成什么样的工具供应一个参考。
convertAndSend方法就要大略太多,这里我放了一个User工具拿来测试用,直接指定行列步队然后放入这个工具即可。
Tips:User必须实现Serializable接口,不然的话调用此方法的时候会抛出IllegalArgumentException非常。
代码完成之后我们就可以调用了,这里我写一个测试类进行调用:
@SpringBootTestpublic class RabbitProduceTest { @Autowired private RabbitProduce rabbitProduce; @Test public void sendSimpleMessage() { rabbitProduce.send(); rabbitProduce.convertAndSend(); }}
效果如下图~
同时在掌握台利用命令rabbitmqctl.bat list_queues查看行列步队-erduo现在的情形:
如此一来,我们的生产者测试就算完成了,现在行列步队里两条了,而且类型肯定不一样,一个是我们设置的文本类型,一个是自动设置的序列化类型。
3. 消费者与RabbitListener既然行列步队里面已经有了,接下来我们就要看我们该如何通过新的办法拿到并消费与确认了。
消费者这里我们要用到@RabbitListener来帮我们拿到指定行列步队,它的用法很大略也很繁芜,我们可以先来说大略的办法,直接放到方法上,指定监听的行列步队就行了。
@Slf4j@Component("rabbitConsumer")public class RabbitConsumer { @RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("已确认"); }}
这段代码就代表onMessage方法会处理erduo(Producer.QUEUE_NAME是常量字符串"erduo")行列步队中的。
我们可以看到这个方法里面有两个参数,Message和Channel,如果用不到Channel可以不写此参数,但是Message一定是要的,它代表了本身。
我们可以想想,我们的程序从RabbitMQ之中拉回一条条之后,要以怎么样的办法展示给我们呢?
没错,便是封装为一个个Message工具,这里面放入了一条的所有信息,数据构造是什么样一会我一run你就能看到了。
同时这里我们利用Channel做一个确认的操作,这里的DeliveryTag代表的是这个在行列步队中的序号,这个信息存放在MessageProperties中。
4. SpringBoot 启动!编写完生产者和消费者,同时已经运行过生产者往行列步队里面放了两条信息,接下来我们可以直接启动,查看消费情形:
在我赤色框线标记的地方可以看到,由于我们有了消费者以是项目启动后先和RabbitMQ建立了一个连接进行监听行列步队。
随后就开始消费我们行列步队中的两条:
第一条信息是contentType=text/plain类型,以是直接就在掌握台上打印出了详细内容。
第二条信息是contentType=application/x-java-serialized-object,在打印的时候只打印了一个内存地址+字节大小。
不管怎么说,数据我们是拿到了,也便是代表我们的消费是没有问题的,同时也都进行了确认操作,从数据上看,全体可以分为两部分:body和MessageProperties。
我们可以单独利用一个表明拿到这个body的内容 - @Payload
@RabbitListener(queues = Producer.QUEUE_NAME)public void onMessage(@Payload String body, Channel channel) throws Exception { System.out.println("Message content : " + body);}
也可以单独利用一个表明拿到MessageProperties的headers属性,headers属性在截图里也可以看到,只不过是个空的 - @Headers。
@RabbitListener(queues = Producer.QUEUE_NAME)public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception { System.out.println("Message content : " + body); System.out.println("Message headers : " + headers);}
这两个表明都算是扩展知识,我还是更喜好直接拿到全部,全都要!
!
!
上面我们已经完成了的发送与消费,全体过程我们可以再次回忆一下,统统都和我画的这张图上一样的轨迹:
只不过我们一贯没有指定Exchage一贯利用的默认路由,希望大家好好记住这张图。
5. @RabbitListener与@RabbitHandler下面再来补一些知识点,有关@RabbitListener与@RabbitHandler。
@RabbitListener上面我们已经大略的进行了利用,轻微扩展一下它实在是可以监听多个行列步队的,就像这样:
@RabbitListener(queues = { "queue1", "queue2" })public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("已确认");}
还有一些其他的特性如绑定之类的,这里不再赘述由于太硬编码了一样平常用不上。
下面来说说这节要紧张讲的一个特性:@RabbitListener和@RabbitHandler的搭配利用。
前面我们没有提到,@RabbitListener表明实在是可以表明在类上的,这个表明在类上标志着这个类监听某个行列步队或某些行列步队。
这两个表明的搭配利用就要让@RabbitListener表明在类上,然后用@RabbitHandler表明在方法上,根据方法参数的不同自动识别并去消费,写个例子给大家看一看更直不雅观一些。
@Slf4j@Component("rabbitConsumer")@RabbitListener(queues = Producer.QUEUE_NAME)public class RabbitConsumer { @RabbitHandler public void onMessage(@Payload String message){ System.out.println("Message content : " + message); } @RabbitHandler public void onMessage(@Payload User user) { System.out.println("Message content : " + user); }}
大家可以看看这个例子,我们先用@RabbitListener监听erduo行列步队中的,然后利用@RabbitHandler表明了两个方法。
第一个方法的body类型是String类型,这就代表着这个方法只能处理文本类型的。第二个方法的body类型是User类型,这就代表着这个方法只能处理序列化类型且为User类型的。这两个方法恰好对应着我们第二节中测试类会发送的两种,以是我们往RabbitMQ中发送两条测试,用来测试这段代码,看看效果:
都在掌握台上如常打印了,如果@RabbitHandler表明的方法中没有一个的类型可以和你的类型对的上,比如都是byte数组类型,这里没有对应的方法去吸收,系统就会在掌握台不断的报错,如果你涌现这个情形就证明你类型写的禁绝确。
假设你的erduo行列步队中会涌现三种类型的:byte,文本和序列化,那你就必须要有对应的处理这三种的方法,不然发过来的时候就会由于无法精确转换而报错。
而且利用了@RabbitHandler表明之后就不能再和之前一样利用Message做吸收类型。
@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("已确认");}
这样写的话会报类型转换非常的,以是二者选其一。
同时上文我的@RabbitHandler没有进行确认,大家可以自己试一下进行确认。
6. 的序列化转换通过上文我们已经知道,能被自动转换的工具只有byte[]、String、java序列化工具(实现了Serializable接口的工具),但是并不是所有的Java工具都会去实现Serializable接口,而且序列化的过程中利用的是JDK自带的序列化方法,效率低下。
以是我们更普遍的做法是:利用Jackson先将数据转换成JSON格式发送给RabbitMQ,再吸收的时候再用Jackson将数据反序列化出来。
这样做可以完美办理上面的痛点:工具既不必再去实现Serializable接口,也有比较高的效率(Jackson序列化效率业界该当是最好的了)。
默认的转换方案是转换顶层接口-MessageConverter的一个子类:SimpleMessageConverter,我们如果要换到另一个转换器只须要更换掉这个转换器就行了。
上图是MessageConverter构造树的构造树,可以看到除了SimpleMessageConverter之外还有一个Jackson2JsonMessageConverter,我们只须要将它定义为Bean,就可以直策应用这个转换器了。
@Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(jacksonObjectMapper); }
这样就可以了,这里的jacksonObjectMapper可以不传入,但是默认的ObjectMapper方案对JDK8的韶光日期序列化会不太友好,详细可以参考我的上一篇文章:从LocalDateTime序列化磋商全局同等性序列化,总的来说便是定义了自己的ObjectMapper。
同时为了接下来测试方便,我又定义了一个专门测试JSON序列化的行列步队:
@Beanpublic Queue erduoJson() { // 其三个参数:durable exclusive autoDelete // 一样平常只设置一下持久化即可 return new Queue("erduo_json",true);}
如此之后就可以进行测试了,先是生产者代码:
public void sendObject() { Client client = new Client(); System.out.println("Message content : " + client); rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client); System.out.println("发送完毕。"); }
我又重新定义了一个Client工具,它和之前测试利用的User工具成员变量都是一样的,不一样的是它没有实现Serializable接口。
同时为了保留之前的测试代码,我又新建了一个RabbitJsonConsumer,用于测试JSON序列化的干系消费代码,里面定义了一个静态变量:JSON_QUEUE = "erduo_json";
以是这段代码是将Client工具作为发送到"erduo_json"行列步队中去,随后我们在测试类中run一下进行一次发送。
紧着是消费者代码:
@Slf4j@Component("rabbitJsonConsumer")@RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE)public class RabbitJsonConsumer { public static final String JSON_QUEUE = "erduo_json"; @RabbitHandler public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception { System.out.println("Message content : " + client); System.out.println("Message headers : " + headers); channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); System.out.println("已确认"); }}
有了上文的履历之后,这段代码理解起来也是很大略了吧,同时给出了上一节没写的如何在@RabbitHandler模式下进行签收。
我们直接来看看效果:
在打印的Headers里面,今后翻可以看到contentType=application/json,这个contentType是表明了的类型,这里正是解释我们新的转换器生效了,将所有都转换成了JSON类型。
后记这两篇讲完了RabbitMQ的基本收发,包括手动配置和自动配置的两种办法,这些大家仔细研读之后该当会对RabbitMQ收发没什么疑问了~
作者:和耳朵原文链接:https://juejin.im/post/6859152029823008781