(1)RabbitMQ 是开源的代理和行列步队做事器,用来通过普通协议在完备不同的运用之间共享数据,RabbitMQ 底层是用了 Erlang 措辞来编写的,并且 RabbitMQ 是基于 AMQP 协议的.(2)RabbitMQ 不仅仅可以利用 java 客户端进行编写,且可以利用其他的措辞(python,php等…),它供应了丰富的API
RabbitMQ 的优点:
(1)开源,性能精良,稳定性保障(2)与 SpringAMQP 完美的整合,API 丰富 (Spring基于 RabbitMQ 供应了一套框架,叫做AMQP 框架)这套框架不仅供应了原生的 RabbitMQ,而且还供应了丰富可扩展的API帮助开拓职员更好的去运用(3)集群模式丰富,表达式配置,HA模式,镜像行列步队模型解释:(担保数据不丢失的提前做到高可靠性,可用性)普遍利用的镜像行列步队模式(4)AMQP 全称:Advanced Message Queuing Protocl AMQP 翻译过来:高等行列步队协议

这里为了方便在 docker 上安装 RabbitMQ(1)首先搜索 RabbitMQ 的安装命令:https://hub.docker.com/_/rabbitmq
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management1
安装成功:
对付映射的端口须要把稳两个:15672 和 5672,个中 15672 是管理的端口,5672 是通讯的端口
可以看到docker容器 15672 对应的主机映射端口是 32771,
默认的用户名和密码都是 guest
三、SpringBoot 整合 RabbitMQ1. 加入依赖2. 配置 application.properties把稳这里的端口选择 32781,对应rabbitmq 容器的端口是 5672,选择通讯端口,切忌不用选择管理端口
spring.rabbitmq.host=192.168.245.133spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.port=327811234
3. 直接交流模式
直接交流模式可以参考:https://blog.csdn.net/fakerswe/article/details/81508963
所谓“直接连接交流机”便是:Producer(生产者)投递的被DirectExchange (交流机)转发到通过routingkey绑定到详细的某个Queue(行列步队),把放入行列步队,然后Consumer从Queue中订阅
这里的详细某个行列步队是由参数 routingKey 掌握的,是通过这个参数往行列步队上发的
RabbitMQ模型的核心思想(core idea): 生产者会把发送给RabbitMQ的交流中央(Exchange),Exchange的一侧是生产者,另一侧则是一个或多个行列步队,由Exchange决定一条的生命周期–发送给某些行列步队,或者直接丢弃掉。
(1)配置 RabbitDirectConfig
@Configurationpublic class RabbitDirectConfig { public final static String DIRECTNAME = "yolo-direct"; // 行列步队 @Bean Queue queue() { return new Queue("hello.yolo"); } @Bean DirectExchange directExchange() { return new DirectExchange(DIRECTNAME, true, false); } // 将 queue 和 directExchange 绑定到一起 @Bean Binding binding() { return BindingBuilder.bind(queue()).to(directExchange()).with("direct"); }}123456789101112131415161718192021
(2)消费者
@Componentpublic class DirectReceiver { // 监听行列步队 @RabbitListener(queues = "hello.yolo") public void handler1(String msg) { System.out.println("handler1>>>" + msg); }}12345678
(3)测试:生产者
@Autowired RabbitTemplate rabbitTemplate; @Test public void contextLoads() { //将转发到 routingKey 为 hello.yolo 的行列步队,对应 DirectReceiver 的监听行列步队名 rabbitTemplate.convertAndSend("hello.yolo", "hello yolo! ni hao!"); }1234567
4. 广播模式
可以参考:https://blog.csdn.net/fakerswe/article/details/81455340
大略的讲,便是把交流机(Exchange)里的发送给所有绑定该交流机的行列步队,忽略routingKey。
由图可知,生产者把发送到交流机后,由交流机发送给消费者行列步队。消费者行列步队如果想要吸收到交流机里的,那么须要担保:行列步队绑定的交流机名称要和交流机同等,这个是广播模式的关键,也是MQ后续所有模式最粗略的条件。
这里是通过生产者发往交流机的,然后交流机再发送给绑定的行列步队(1)配置广播模式
@Configurationpublic class RabbitFanoutConfig { public static final String FANOUTNAME = "yolo-fanout"; / 行列步队1 @return / @Bean Queue queueOne() { return new Queue("queue-one"); } / 行列步队2 @return / @Bean Queue queueTwo() { return new Queue("queue-two"); } / 交流机 @return / @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUTNAME, true, false); } / 绑定行列步队1 @return / @Bean org.springframework.amqp.core.Binding bindingOne() { return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } / 绑定行列步队2 @return / @Bean Binding bindingTwo() { return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); }}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
(2)消费者
/ 定义吸收器:消费者 /@Componentpublic class FanoutReceiver { / 吸收行列步队1 @param msg / @RabbitListener(queues = "queue-one") public void handler1(String msg) { System.out.println("FanoutReceiver:handler1:" + msg); } / 吸收行列步队2 @param msg / @RabbitListener(queues = "queue-two") public void handler2(String msg) { System.out.println("FanoutReceiver:handler2:" + msg); }}12345678910111213141516171819202122
(3)测试:生产者
/ 往交流机上发送信息 / @Test public void test1() { rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello fanout!"); }1234567
这里须要把稳,须要先启动消费者,再启动生产者,否则先启动生产者,exchange吸收到后创造没有行列步队对其感兴趣,就会将丧失落,此时跟 routingKey 无关
行列步队1和行列步队2 均收到了
5. 主题路由匹配模式
可参考:https://blog.csdn.net/weixin_43770545/article/details/90902788
如果你想在淘宝上买一双运动鞋,那么你是不是会在搜索框中搜“XXX运动鞋”,这个时候系统将会模糊匹配的所有符合哀求的运动鞋,然后展示给你。所谓“主题路由匹配交流机”也是这样一个道理,但是利用时也有一定的规则。
String routingkey = “testTopic.#”;String routingkey = “testTopic.”;12
表示只匹配一个词#表示匹配多个词
(1)配置 topic 模式
@Configurationpublic class RabbitTopicConfig { public static final String TOPICNAME = "yolo-topic"; @Bean TopicExchange topicExchange() { return new TopicExchange(TOPICNAME, true, false); } @Bean Queue xiaomi() { return new Queue("xiaomi"); } @Bean Queue huawei() { return new Queue("huawei"); } @Bean Queue phone() { return new Queue("phone"); } @Bean Binding xiaomiBinding() { //xiaomi.# 表示如果路由的 routingKey 因此xiaomi 开头就会发送到 xiaomi 这个行列步队上 return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#"); } @Bean Binding huaweiBinding() { //huawei.# return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#"); } @Bean Binding phoneBinding() { // #.phone.# 表示routingKey 中包含 phone 就会被发送到 phone 这个行列步队上 return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#"); }}123456789101112131415161718192021222324252627282930313233343536373839404142
(2) 消费者
@Componentpublic class TopicReceiver { @RabbitListener(queues = "xiaomi") public void handler1(String msg) { System.out.println("TopicReceiver:handler1:" + msg); } @RabbitListener(queues = "huawei") public void handler2(String msg) { System.out.println("TopicReceiver:handler2:" + msg); } @RabbitListener(queues = "phone") public void handler3(String msg) { System.out.println("TopicReceiver:handler3:" + msg); }}1234567891011121314151617
(3)测试:生产者
@Test public void test2() { //可以被小米的行列步队收到 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.news", "小米新闻"); //可以被手机的行列步队收到 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "vivo.phone", "vivo 手机"); //可以被华为和手机的行列步队收到 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "huawei.phone", "华为手机"); }123456789
6. header 模式
这种模式利用的是 header 中的 key/value (键值对) 匹配行列步队,也和 routingKey 无关
(1)配置 config
@Configurationpublic class RabbitHeaderConfig { public static final String HEADERNAME = "yolo-header"; @Bean HeadersExchange headersExchange() { return new HeadersExchange(HEADERNAME, true, false); } @Bean Queue queueName() { return new Queue("name-queue"); } @Bean Queue queueAge() { return new Queue("age-queue"); } @Bean Binding bindingName() { Map<String, Object> map = new HashMap<>(); // map.put("name", "yolo"); //whereAny 表示的header中只要有一个header匹配上map中的key,value,就把发送到对应的行列步队上 return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match(); } @Bean Binding bindingAge() { //只要有,age 这个字段,就发送到相应的行列步队上去 return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists(); }}1234567891011121314151617181920212223242526272829303132333435
(2)消费者
@Componentpublic class HeaderReceiver { @RabbitListener(queues = "name-queue") public void handler1(byte[] msg) { System.out.println("HeaderReceiver:handler1:" + new String(msg, 0, msg.length)); } @RabbitListener(queues = "age-queue") public void handler2(byte[] msg) { System.out.println("HeaderReceiver:handler2:" + new String(msg, 0, msg.length)); }}123456789101112
(3)测试
@Test public void test3() { //对应 RabbitHeaderConfig 中的map 的 key / value Message nameMsg = MessageBuilder.withBody("hello yolo !".getBytes()).setHeader("name","yolo").build(); Message ageMsg = MessageBuilder.withBody("hello 99 !".getBytes()).setHeader("age","99").build(); //此时发送的吸收,跟 routingKey无关,跟的 header 内容有关 rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg); }123456789
如果变动,header 里的键值对:则无法匹配成功,行列步队收不到该信息