流量洪峰
举个栗子:
双11、618购物节,会涌现特定时间点抢购某种商品的情形,瞬时流量巨大。

用户端发起下单操作,做事端包括库存检讨,库存冻结,余额检讨,余额冻结,订单天生,余额扣减,库存扣减,天生流水,余额解冻,库存解冻等业务逻辑。
假设20000用户端抢购1每秒内抢购100件商品,做事端吸收到20000个要求,但每秒只能处理2000个要求,很有可能导致做事端系统被压垮,引发雪崩。
为了避免雪崩,常见的优化方案为削峰限流,平滑流量要求,保护做事器资源,一样平常利用行列步队缓冲,限速实行。
排队处理业务
2.RabbitMQ 削峰加入MQ行列步队
RabbitMQ供应了一种做事质量保障功能,将设置为手动确认,如果一定数目的未被确认,不进行消费新的。
意思便是说,业务员为第一个人(0001号)办理业务,确认办完后,再请0002号接着办理。没有办理完上个业务,不呼叫下一个客户。
// 做事质量担保:// prefetchSize=0:表示内容大小没有限定,// prefetchCount=3:表示预读取消息数量为3,如果这三条均没有确认,则消费者不再读取新// global=false:表示prefetchCount单独运用于信道上的每个新消费者channel.basicQos(0,3,false);
削峰设置
// 做事质量担保:// prefetchSize=0:表示内容大小没有限定,// prefetchCount=3:表示预读取消息数量为3,如果这三条均没有确认,则消费者不再读取新// global=false:表示prefetchCount单独运用于信道上的每个新消费者channel.basicQos(0,3,false);
//不自动回答行列步队应答 -- RabbitMQ 中切实其实认机制,// 限流办法// queue:行列步队名称// autoAck=false:不自动确认,消费者吸收到并处理完逻辑后须要手动确认,// 如果没有进行手动确认,通道内的到达basicQos设置的prefetchCount数量后便不再消费// Consumer回调:MyConsumerchannel.basicConsume(queueName,false,new MyConsumer(channel));
消费者代码
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.QueueingConsumer.Delivery;public class Consumer { public static void main(String[] args) throws Exception { // 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置 RabbitMQ 地址 connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); // 创建一个新的连接 Connection connection = connectionFactory.newConnection(); // 创建一个新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "qos.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); // 声明要关注的行列步队 channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); // 做事质量担保: // prefetchSize=0:表示内容大小没有限定, // prefetchCount=3:表示预读取消息数量为3, // 如果这三条均没有确认,则消费者不再读取新 // global=false:表示prefetchCount单独运用于信道上的每个新消费者 channel.basicQos(0,3,false); // 不自动回答行列步队应答 -- RabbitMQ 中切实其实认机制, // 限流办法 // queue:行列步队名称 // autoAck=false:不自动确认,消费者吸收到后,处理完业务逻辑后须要手动确认, // 如果没有进行手动确认,通道内的到达basicQos设置的prefetchCount数量后便不再消费 // Consumer回调:MyConsumer channel.basicConsume(queueName,false,new MyConsumer(channel)); }}
自定义消费者代码
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import java.io.IOException;/ DefaultConsumer类 实现了 Consumer 接口, 通过传入一个通道, 见告做事器我们须要哪个通道的 如果通道中有, 就会实行回调函数 handleDelivery/public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); //当我们须要确认一条已经被消费时,我们调用的 basicAck 方法 //第一个参数:deliveryTag(唯一标识 ID),当一个消费者向 RabbitMQ 注册后, // 会建立起一个 Channel , // RabbitMQ 会用 basic.deliver 方法向消费者推送,这个方法携带了一个 delivery tag, // 以便 Consumer 可以在确认时见告 RabbitMQ 到底是哪条被确认了。 // delivery tag 是一个单调递增的正整数,其范围仅限于当前 Channel, // RabbitMQ 担保在每个信道中每条的 Delivery Tag 从 1 开始递增。 //第二个参数:multiple:为了减少网络流量,手动确认可以被批处理, // 当该参数为 true 时,则可以一次性确认 delivery_tag 小于即是传入值的所有 channel.basicAck(envelope.getDeliveryTag(), false); }}
生产者代码
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); // 创建一个新的连接 Connection connection = connectionFactory.newConnection(); // 创建一个新的通道Channel Channel channel = connection.createChannel(); String exchange = "test_qos_exchange"; String routingKey = "qos.save"; String msg = "Hello RabbitMQ QOS Message"; for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } }}