1、依赖包引入
在SpringBoot项目中集成MQTT做事,这里选择的是基于ActiveMQ供应的MQTT做事,ActiveMQ可以独立支配也可以嵌入到自有运用中,在SpringBoot中集成ActiveMQ比较随意马虎,首先须要引入要用到的jar依赖包:
<!-- ActiveMQ 不推举直接引入ActiveMQ all,会引入不必要的依赖 --><dependency> <!-- spring boot连接ActiveMQ的集成依赖 --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency> <!-- spring boot 集成ActiveMQ 核心依赖 --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId></dependency><dependency> <!-- activemq stomp协议依赖 --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-stomp</artifactId></dependency><dependency> <!-- activemq amqp协议依赖 --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-amqp</artifactId></dependency><dependency> <!-- activemq mqtt协议依赖 --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-mqtt</artifactId></dependency><dependency> <!-- activemq kahadb持久化依赖 --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-kahadb-store</artifactId></dependency><!-- end ActiveMQ -->
2、ActiveMQ做事配置

依赖引入完成往后,须要建立ActiveMQ的配置文件,这里可以直接在原ActiveMQ配置文件根本上进行修正,由于是嵌入式的,以是就不须要原独立支配的web管理界面了,配置文件放在SpringBoot项目中的resources目录下,名为activemq.xml,配置内容如下:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="activemq-data"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true"> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="false"/> </managementContext> <persistenceAdapter> <kahaDB directory="activemq-data/kahadb"/><!-- kahabd持久化目录 --> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="64 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <!-- 如果须要实现其它协议,须要引入对应的协议依赖jar包 --> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors> </broker></beans>
3、在SpringBoot运用加入ActiveMQ做事
在SpringBoot加入ActiveMQ工厂做事实例,以下代码加在SpringBoot启动类中。
/ ActiveMQ做事初始化 @return ActiveMQ Broker工厂实例 @throws Exception / @Bean public BrokerFactoryBean brokerService() throws Exception { BrokerFactoryBean brokerFactoryBean = new BrokerFactoryBean(); ClassPathResource res = new ClassPathResource("activemq.xml");//加载配置信息 brokerFactoryBean.setConfig(res);//加载配置信息 brokerFactoryBean.setStart(true);//启动做事 return brokerFactoryBean; }
4、连接本地ActiveMQ做事
经由以上配置往后,就可以随应一起启动ActiveMQ做事了,这里实现了AMQP、STOMP、MQTT三种协议,由于是在本地连接同一JVM虚拟机内的做事,就不须要像连接外部ActiveMQ做事那样指定IP和端口,这里只须要指定其做事名就可以了,做事名是在ActiveMQ中的brokerName指定的,在application.properties的文加入以下配置:
#这里指向的连接是本运用内的localhost做事,这里采取vm连接spring.activemq.broker-url=vm://localhost#开启订阅模式,MQTT协议因此订阅模式存在的spring.jms.pub-sub-domain=true
5、在运用中利用MQTT做事,进行业务研发
经由以上配置往后,就可以在运用中像利用普通JMS一样来与MQTT设备进行数据交互,MQTT订阅是可以利用通配符进行主题订阅的,常日来讲某一个设备上传数据会以编号+主题的办法实现,如topic:/run/0001/data,个中topic为固定协议头,run为数据分类,0001为设备的编号,data为详细的功能块数据,如果要订阅这一数据,显然不可能分每一个设备单独订阅一个主题,这个时候就可以订阅topic:/run//data这个主题,这样所有设备的这个主题都能订阅到。在java等分隔符"/"要用"."代替。
//-----------订阅run下的data主题--------------------/ 订阅.ru..data主题 @param message 内容 / @JmsListener(destination = ".ru..data") public void onRun(ActiveMQMessage message) throws Exception{ String body = null;//内容 String hid = null;//设备编号 String type = message.getJMSDestination().toString();//主题类型,形如:topic:/run/0001/data //--------解析内容-------------------------- if(null!=message.getContent()){ body = new String(message.getContent().getData(),"UTF-8");//byte[]类型的内容 }else if(message instanceof ActiveMQTextMessage) { ActiveMQTextMessage sg = (ActiveMQTextMessage) message;//String类型的内容 body=sg.getText(); } //-------从主题类型中解析出设备编号--------------- hid = StringUtils.splitByWholeSeparator(type,".")[2];//设备编号 //------业务处理代码---------------------------- }
6、向设备端发布主题
这里采取JSM的办法向设备端发布对应的主题及内容,详细代码如下:
private final JmsTemplate jmsTemplate;//JMS发送实例/ 发布主题 @param topic 主题,形如 .run.0001.data,这里的.相称于/ @param body 内容 / public void sendTopic(String topic,String body){ jmsTemplate.convertAndSend(topic,body.getBytes());//这里以byte[]的内容发送 }