之前在 《springboot + rabbitmq 做智能家居》 中说过可以用 rabbitmq 的 MQTT 协议做智能家居的指令推送,里边还提到过能用 MQTT 协议做 web 的推送,而未读(小红点)功能刚好运用到实时推送了。
MQTT 协议就不再赘述了,没打仗过的同学翻翻前边的文章温习一下吧,本日还是紧张以实践为主!
web 端实时推送,常用的实现办法比较多,但万变不离其宗,底层基本上还是依赖于 websocket,MQTT 协议也不例外。

RabbitMQ的根本搭建就不详细说了,自行百度一步一步搞问题不大,这里紧张说一下两个比较主要的配置。
1、开启 mqtt 协议默认情形下RabbitMQ是不开启MQTT 协议的,以是须要我们手动的开启干系的插件,而RabbitMQ的MQTT 协议分为两种。
第一种 rabbitmq_mqtt 供应与后端做事交互利用,对应端口1883。
rabbitmq-pluginsenablerabbitmq_mqtt
第二种 rabbitmq_web_mqtt 供应与前端交互利用,对应端口15675。
rabbitmq-pluginsenablerabbitmq_web_mqtt
在 RabbitMQ 管理后台看到如下的显示,就表示MQTT 协议开启成功,到这中间件环境就搭建完毕了。
利用MQTT 协议默认的交流机 Exchange 为 amp.topic,而我们订阅的主题会在 Queues 注册一个客户端行列步队,路由 Routing key 便是我们设置的主题。
做事端发送
web 端实时推送一样平常都是单向的推送,前端吸收做事端推送的显示即可,以是就只实现发送即可。
1、mqtt 客户端依赖包引入 spring-integration-mqtt、org.eclipse.paho.client.mqttv3 两个工具包实现
<!--mqtt依赖包--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version></dependency>
2、发送者
的发送比较大略,紧张是运用到 @ServiceActivator 表明,须要把稳messageHandler.setAsync属性,如果设置成 false,关闭异步模式发送时可能会壅塞。
@ConfigurationpublicclassIotMqttProducerConfig{@AutowiredprivateMqttConfigmqttConfig;@BeanpublicMqttPahoClientFactorymqttClientFactory(){DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory();factory.setServerURIs(mqttConfig.getServers());returnfactory;}@BeanpublicMessageChannelmqttOutboundChannel(){returnnewDirectChannel();}@Bean@ServiceActivator(inputChannel="iotMqttInputChannel")publicMessageHandlermqttOutbound(){MqttPahoMessageHandlermessageHandler=newMqttPahoMessageHandler(mqttConfig.getServerClientId(),mqttClientFactory());messageHandler.setAsync(false);messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());returnmessageHandler;}}
MQTT 对外供应发送的 API 时,须要利用 @MessagingGateway 表明,去供应一个网关代理,参数 defaultRequestChannel 指定发送绑定的channel。
可以实现三种API接口,payload 为发送的,topic 发送的主题,qos 质量。
@MessagingGateway(defaultRequestChannel="iotMqttInputChannel")publicinterfaceIotMqttGateway{//向默认的topic发送voidsendMessage2Mqtt(Stringpayload);//向指定的topic发送voidsendMessage2Mqtt(Stringpayload,@Header(MqttHeaders.TOPIC)Stringtopic);//向指定的topic发送,并指定做事质量参数voidsendMessage2Mqtt(@Header(MqttHeaders.TOPIC)Stringtopic,@Header(MqttHeaders.QOS)intqos,Stringpayload);}
前端订阅
前端利用与做事端对应的工具 paho-mqtt mqttws31.js实现,实现办法与传统的 websocket 办法差不多,核心方法 client = new Paho.MQTT.Client 和 各种监听事宜,代码比较简洁。
把稳:要担保前后端 clientId的全局唯一性,我这里就大略用随机数办理了
<scripttype="text/javascript">//mqtt协议rabbitmq做事varbrokerIp=location.hostname;//mqtt协议端口号varport=15675;//接管推送的主题vartopic="push_message_topic";//mqtt连接client=newPaho.MQTT.Client(brokerIp,port,"/ws","clientId_"+parseInt(Math.random()100,10));varoptions={timeout:3,//超时时间keepAliveInterval:30,//心跳韶光onSuccess:function(){console.log(("连接成功~"));client.subscribe(topic,{qos:1});},onFailure:function(message){console.log(("连接失落败~"+message.errorMessage));}};//考虑到https的情形if(location.protocol=="https:"){options.useSSL=true;}client.connect(options);console.log(("已经连接到"+brokerIp+":"+port));//连接断开事宜client.onConnectionLost=function(responseObject){console.log("失落去连接-"+responseObject.errorMessage);};//吸收事宜client.onMessageArrived=function(message){console.log("接管主题:"+message.destinationName+"的:"+message.payloadString);$("#arrivedDiv").append("<br/>"+message.payloadString);varcount=$("#count").text();count=Number(count)+1;$("#count").text(count);};//推送给指定主题functionsendMessage(){vara=$("#message").val();if(client.isConnected()){varmessage=newPaho.MQTT.Message(a);message.destinationName=topic;client.send(message);}}</script>
测试
前后真个代码并不多,接下来我们测试一下,弄了个页面看看效果。
首先用 postman 仿照后端发送
http://127.0.0.1:8080/fun/sendMessage?message=我是程序员内点事&topic=push_message_topic
再看一下前端订阅的效果,看到被实时推送到了前端,这里只做了未读数量统计,一样平常还会做未读详情列表。
总结
未读是一个十分常见的功能,不管是 web端还是移动端系统都是必备的模块,MQTT 协议只是个中的一种实现办法,还是有必要节制一种方法。详细用什么工具实现还是要看详细的业务场景和学习本钱,像我用RabbitMQ 做还考虑到一些运维本钱在里边。
本文完全代码地址:https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-mqtt-messagepush
全网搜:程序员内点事。
整理了几百本各种技能电子书相送 ,嘘~,「免费」 送给小伙伴们,私信或者评论【666】自行领取。和一些小伙伴们建了一个技能互换群,一起磋商技能、分享技能资料,旨在共同学习进步。