RabbitMQ 即一个行列步队,紧张是用来实现运用程序的异步和解耦,同时也能起到缓冲,分发的浸染。
中间件在互联网公司的利用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了本日的主角还是讲 RabbitMQ。中间件最紧张的浸染是解耦,中间件最标准的用法是生产者生产传送到行列步队,消费者从行列步队中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产,从而达到解耦的目的。在分布式的系统中,行列步队也会被用在很多其它的方面,比如:分布式事务的支持,RPC 的调用等等。
以前一贯利用的是 ActiveMQ,在实际的生产利用中也涌现了一些小问题,在网络查阅了很多的资料后,决定考试测验利用 RabbitMQ 来更换 ActiveMQ,RabbitMQ 的高可用性、高性能、灵巧性等一些特点吸引了我们,查阅了一些资料整理出此文。

RabbitMQ 是实现AMQP(高等行列步队协议)的中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 紧张是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么须要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高等行列步队协议,是运用层协议的一个开放标准,为面向的中间件设计。中间件紧张用于组件之间的解耦,的发送者无需知道利用者的存在,反之亦然。AMQP 的紧张特色是面向、行列步队、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,做事器端用Erlang措辞编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发,在易用性、扩展性、高可用性等方面表现不俗。
干系观点常日我们谈到行列步队做事, 会有三个观点: 发者、行列步队、收者,RabbitMQ 在这个基本观点之上, 多做了一层抽象, 在发者和行列步队之间, 加入了交流器 (Exchange). 这样发者和行列步队就没有直接联系, 转而变成发者把给交流器, 交流器根据调度策略再把再给行列步队。
左侧 P 代表 生产者,也便是往 RabbitMQ 发的程序。中间即是 RabbitMQ,个中包括了交流机和行列步队。右侧 C 代表 消费者,也便是往 RabbitMQ 拿的程序。那么,个中比较主要的观点有 4 个,分别为:虚拟主机,交流机,行列步队,和绑定。
虚拟主机:一个虚拟主机持有一组交流机、行列步队和绑定。为什么须要多个虚拟主机呢?很大略, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限掌握。 因此,如果须要禁止A组访问B组的交流机/行列步队/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 做事器都有一个默认的虚拟主机“/”。交流机:Exchange 用于转发,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的。 这里有一个比较主要的观点:路由键 。到交流机的时候,交互机会转发到对应的行列步队中,那么究竟转发到哪个行列步队,就要根据该路由键。绑定:也便是交流机须要和行列步队相绑定,这个中如上图所示,是多对多的关系。交流机(Exchange)交流机的功能紧张是吸收并且转发到绑定的行列步队,交流机不存储,在启用ack模式后,交流机找不到行列步队会返回缺点。交流机有四种类型:Direct, topic, Headers and Fanout
Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 的routing_key 匹配时, 才会被交流器投送到绑定的行列步队中去.Topic:按规则转发(最灵巧)Headers:设置 header attribute 参数类型的交流机Fanout:转发到所有绑定行列步队Direct ExchangeDirect Exchange 是 RabbitMQ 默认的交流机模式,也是最大略的模式,根据key全文匹配去探求行列步队。
第一个 X - Q1 就有一个 binding key,名字为 orange; X - Q2 就有 2 个 binding key,名字为 black 和 green。傍边的 路由键 和 这个 binding key 对应上的时候,那么就知道了该去到哪一个行列步队中。
Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个紧张是由于可能又有 Q3,而Q3只接管 black 的信息,而Q2不仅接管black 的信息,还接管 green 的信息。
Topic ExchangeTopic Exchange 转发紧张是根据通配符。 在这种交流机下,行列步队和交流机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交流性能力转发。
在这种交流机模式下:
路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。路由模式必须包含一个 星号(),紧张用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相称于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。详细代码发送的时候还是一样,第一个参数表示交流机,第二个参数表示 routing key,第三个参数即。如下:
rabbitTemplate.convertAndSend(\"大众testTopicExchange\"大众,\"大众key1.a.c.key2\"大众, \"大众 this is RabbitMQ!\"大众);
topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以利用两个通配符:
表示一个词.#表示零个或多个词.Headers Exchangeheaders 也是根据规则匹配, 相较于 direct 和 topic 固定地利用 routing_key , headers 则是一个自定义匹配规则的类型. 在行列步队与交流器绑定时, 会设定一组键值对规则, 中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 被投送到对应行列步队.
Fanout ExchangeFanout Exchange 广播的模式,不管路由键或者是路由模式,会把发给绑定给它的全部行列步队,如果配置了 routing_key 会被忽略。
Spring Boot 集成 RabbitMQ
Spring Boot 集成 RabbitMQ 非常大略,如果只是大略的利用配置非常少,Spring Boot 供应了spring-boot-starter-amqp项目对各种支持。
大略利用
1、配置 Pom 包,紧张是添加 spring-boot-starter-amqp 的支持
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、配置文件
配置 RabbitMQ 的安装地址、端口以及账户信息
spring.application.name=Spring-boot-rabbitmqspring.rabbitmq.host=192.168.0.86spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456
3、行列步队配置
@Configurationpublic class RabbitConfig { @Bean public Queue Queue() { return new Queue(\"大众hello\"大众); }}
4、发送者
rabbitTemplate 是 Spring Boot 供应的默认实现
@componentpublic class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = \"大众hello \"大众 + new Date(); System.out.println(\"大众Sender : \"大众 + context); this.rabbitTemplate.convertAndSend(\"大众hello\"大众, context); }}
5、吸收者
@Component@RabbitListener(queues = \"大众hello\公众)public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println(\"大众Receiver : \"大众 + hello); }}
6、测试
@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitMqHelloTest { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); }}
把稳,发送者和吸收者的 queue name 必须同等,不然不能吸收
多对多利用一个发送者,N 个吸收者或者 N 个发送者和 N 个吸收者会涌现什么情形呢?
一对多发送对上面的代码进行了小改造,吸收端注册了两个 Receiver,Receiver1 和 Receiver2,发送端加入参数计数,吸收端打印吸收到的参数,下面是测试代码,发送一百条,来不雅观察两个吸收真个实行效果
@Testpublic void oneToMany() throws Exception { for (int i=0;i<100;i++){ neoSender.send(i); }}
结果如下:
Receiver 1: Spring boot neo queue 11Receiver 2: Spring boot neo queue 12Receiver 2: Spring boot neo queue 14Receiver 1: Spring boot neo queue 13Receiver 2: Spring boot neo queue 15Receiver 1: Spring boot neo queue 16Receiver 1: Spring boot neo queue 18Receiver 2: Spring boot neo queue 17Receiver 2: Spring boot neo queue 19Receiver 1: Spring boot neo queue 20
根据返回结果得到以下结论
一个发送者,N个接管者,经由测试会均匀的将发送到N个吸收者中
多对多发送复制了一份发送者,加入标记,在一百个循环中相互交替发送
@Test public void manyToMany() throws Exception { for (int i=0;i<100;i++){ neoSender.send(i); neoSender2.send(i); }}
结果如下:
Receiver 1: Spring boot neo queue 20Receiver 2: Spring boot neo queue 20Receiver 1: Spring boot neo queue 21Receiver 2: Spring boot neo queue 21Receiver 1: Spring boot neo queue 22Receiver 2: Spring boot neo queue 22Receiver 1: Spring boot neo queue 23Receiver 2: Spring boot neo queue 23Receiver 1: Spring boot neo queue 24Receiver 2: Spring boot neo queue 24Receiver 1: Spring boot neo queue 25Receiver 2: Spring boot neo queue 25
结论:和一对多一样,吸收端仍旧会均匀吸收到
高等利用工具的支持
Spring Boot 以及完美的支持工具的发送和吸收,不须要格外的配置。
//发送者public void send(User user) { System.out.println(\"大众Sender object: \"大众 + user.toString()); this.rabbitTemplate.convertAndSend(\"大众object\公众, user);}...//吸收者@RabbitHandlerpublic void process(User user) { System.out.println(\"大众Receiver object : \"大众 + user);}
结果如下:
Sender object: User{name='neo', pass='123456'}Receiver object : User{name='neo', pass='123456'}Topic Exchange
topic 是 RabbitMQ 中最灵巧的一种办法,可以根据 routing_key 自由的绑定不同的行列步队
首先对 topic 规则配置,这里利用两个行列步队来测试
@Configurationpublic class TopicRabbitConfig { final static String message = \"大众topic.message\"大众; final static String messages = \公众topic.messages\公众; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange(\公众exchange\公众); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(\"大众topic.message\"大众); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with(\"大众topic.#\公众); }}
利用 queueMessages 同时匹配两个行列步队,queueMessage 只匹配 “topic.message” 行列步队
public void send1() { String context = \公众hi, i am message 1\"大众; System.out.println(\公众Sender : \"大众 + context); this.rabbitTemplate.convertAndSend(\"大众exchange\公众, \公众topic.message\"大众, context);}public void send2() { String context = \"大众hi, i am messages 2\公众; System.out.println(\"大众Sender : \公众 + context); this.rabbitTemplate.convertAndSend(\"大众exchange\公众, \"大众topic.messages\"大众, context);}
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到,发送send2只有topic.#可以匹配所有只有Receiver2监听到
Fanout ExchangeFanout 便是我们熟习的广播模式或者订阅模式,给 Fanout 交流机发送,绑定了这个交流机的所有行列步队都收到这个。
Fanout 干系配置
@Configurationpublic class FanoutRabbitConfig { @Bean public Queue AMessage() { return new Queue(\"大众fanout.A\"大众); } @Bean public Queue BMessage() { return new Queue(\"大众fanout.B\公众); } @Bean public Queue CMessage() { return new Queue(\"大众fanout.C\"大众); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(\"大众fanoutExchange\"大众); } @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); }}
这里利用了 A、B、C 三个行列步队绑定到 Fanout 交流机上面,发送真个 routing_key 写任何字符都会被忽略:
public void send() { String context = \公众hi, fanout msg \公众; System.out.println(\公众Sender : \"大众 + context); this.rabbitTemplate.convertAndSend(\"大众fanoutExchange\公众,\"大众\"大众, context);}
结果如下:
Sender : hi, fanout msg ...fanout Receiver B: hi, fanout msg fanout Receiver A : hi, fanout msg fanout Receiver C: hi, fanout msg
结果解释,绑定到 fanout 交流机上面的行列步队都收到了