首页 » 网站建设 » activemqphpjava技巧_速读消息中心件架构体系ActiveMQ入门事理分析优缺点

activemqphpjava技巧_速读消息中心件架构体系ActiveMQ入门事理分析优缺点

访客 2024-11-24 0

扫一扫用手机浏览

文章目录 [+]

2)JMS连接。
JMS连接(Connection)表示JMS客户端和做事器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。

3)JMS会话。
JMS会话(Session)表示JMS客户与JMS做事器之间的会话状态。
JMS会话建立在JMS连接上,表示客户与做事器之间的一个会话线程。

activemqphpjava技巧_速读消息中心件架构体系ActiveMQ入门事理分析优缺点

4)JMS目的。
JMS目的(Destination),又称为行列步队,是实际的源。

activemqphpjava技巧_速读消息中心件架构体系ActiveMQ入门事理分析优缺点
(图片来自网络侵删)

5)JMS生产者和消费者。
生产者(Message Producer)和消费者(Message Consumer)工具由Session工具创建,用于发送和吸收。

6)JMS常日有两种类型:

① 点对点(Point-to-Point)。
在点对点的系统中,分发给一个单独的利用者。
点对点每每与行列步队(javax.jms.Queue)干系联。

② 发布/订阅(Publish/Subscribe)。
发布/订阅系统支持一个事宜驱动模型,生产者和消费者都参与的通报。
生产者发布事宜,而利用者订阅感兴趣的事宜,并利用事宜。
该类型一样平常与特定的主题(javax.jms.Topic)关联。

06 安装ActiveMQ

windows安装

下载地址:http://activemq.apache.org/activemq-5150-release.html

下载完成后解压进入bin目录 运行 activemq.bat。

如果你碰着如下问题,5672端口被占用

可以去修正activemq的conf目录下的activemq.xml,把amqp的端口改为其他的,这里改成了5673

再次启动:

访问地址:http://127.0.0.1:8161/admin/进入后台页面 初始账号密码 admin admin

Docker安装ActiveMQ

docker run -d --name activemq -p 61616:61616 -p 8161:8161 webcenter/activemq

07 ActiveMQ快速入门

Springboot集成ActiveMQ

导入依赖

<dependencies> <!--Springboot--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.3.0.RELEASE</version> </dependency> <!--ActiveMq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> <version>1.5.0.RELEASE</version> </dependency> <!--行列步队连接池--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.0</version> </dependency> </dependencies>

配置MQ

server: port: 8080spring: activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: admin close-timeout: 15s # 在考虑结束之前等待的韶光 in-memory: true # 默认代理URL是否该当在内存中。
如果指定了显式代理,则忽略此值。
non-blocking-redelivery: false # 是否在回滚回滚之前停滞通报。
这意味着当启用此命令时,顺序不会被保留。
send-timeout: 0 # 等待发送相应的韶光。
设置为0等待永久。
queue-name: active.queue topic-name: active.topic.name.model # packages: # trust-all: true #不配置此项,会报错 pool: enabled: true max-connections: 10 #连接池最大连接数 idle-timeout: 30000 #空闲的连接过期韶光,默认为30秒 # jms: # pub-sub-domain: true #默认情形下activemq供应的是queue模式,若要利用topic模式须要配置下面配置# 是否信赖所有包#spring.activemq.packages.trust-all=# 要信赖的特定包的逗号分隔列表(当不信赖所有包时)#spring.activemq.packages.trusted=# 当连接要乞降池满时是否壅塞。
设置false会抛“JMSException非常”。
#spring.activemq.pool.block-if-full=true# 如果池仍旧满,则在抛出非常前壅塞韶光。
#spring.activemq.pool.block-if-full-timeout=-1ms# 是否在启动时创建连接。
可以在启动时用于加热池。
#spring.activemq.pool.create-connection-on-startup=true# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false# 连接过期超时。
#spring.activemq.pool.expiry-timeout=0ms# 连接空闲超时#spring.activemq.pool.idle-timeout=30s# 连接池最大连接数#spring.activemq.pool.max-connections=1# 每个连接的有效会话的最大数目。
#spring.activemq.pool.maximum-active-session-per-connection=500# 当有"JMSException"时考试测验重新连接#spring.activemq.pool.reconnect-on-exception=true# 在空闲连接打消线程之间运行的韶光。
当为负数时,没有空闲连接驱逐线程运行。
#spring.activemq.pool.time-between-expiration-check=-1ms# 是否只利用一个MessageProducer#spring.activemq.pool.use-anonymous-producers=true

编写配置类

/ @author 原 @date 2020/12/16 @since 1.0 /@Configurationpublic class BeanConfig { @Value("${spring.activemq.broker-url}") private String brokerUrl; @Value("${spring.activemq.user}") private String username; @Value("${spring.activemq.topic-name}") private String password; @Value("${spring.activemq.queue-name}") private String queueName; @Value("${spring.activemq.topic-name}") private String topicName; @Bean(name = "queue") public Queue queue() { return new ActiveMQQueue(queueName); } @Bean(name = "topic") public Topic topic() { return new ActiveMQTopic(topicName); } @Bean public ConnectionFactory connectionFactory(){ return new ActiveMQConnectionFactory(username, password, brokerUrl); } @Bean public JmsMessagingTemplate jmsMessageTemplate(){ return new JmsMessagingTemplate(connectionFactory()); } / 在Queue模式中,对的监听须要对containerFactory进行配置 @param connectionFactory @return / @Bean("queueListener") public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } / 在Topic模式中,对的监听须要对containerFactory进行配置 @param connectionFactory @return / @Bean("topicListener") public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; }}

编写启动类

/ @author 原 @date 2020/12/8 @since 1.0 /@SpringBootApplication@EnableJms //开启JMS支持public class DemoApplication { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } / 运用启动后,会实行该方法 会分别向queue和topic发送一条 / @PostConstruct public void sendMsg(){ jmsMessagingTemplate.convertAndSend(queue,"queue-test"); jmsMessagingTemplate.convertAndSend(topic,"topic-test"); }}

查看activemq后台

active.queue 为行列步队的名称

Number Of Pending Messages 等待消费的数量 3是由于我自己发了3次

Messages Enqueued 已经进入行列步队的数量

由于没有消费者,一贯没有被消费。
下面我们编写消费者代码。

/ @author 原 @date 2020/12/16 @since 1.0 /@Componentpublic class QueueConsumerListener { @JmsListener(destination = "${spring.activemq.queue-name}",containerFactory = "queueListener") public void getQueue(String message){ System.out.println("接管queue:"+message); } @JmsListener(destination = "${spring.activemq.topic-name}",containerFactory = "topicListener") public void getTopic(String message){ System.out.println("接管topic:"+message); }}

在后台发送一条

掌握台打印

发送topic

掌握台打印:

但是创造一个问题是,之前在没有消费的时候,有3条queue和一条topic,但是当我启动消费者时,queue的3条被消费了,topic确没有。
这是由于:

topic模式有普通订阅和持久化订阅

普通订阅:在消费者启动之前发送过来的,消费者启动之后不会去消费;

持久化订阅: 在消费者启动之前发送过来的,消费者启动之后会去消费;

08 ActiveMQ事理剖析

同步发送与异步发送

ActiveMQ支持同步、异步两种发送模式将发送到broker上。
同步发送过程中,发送者发送一条会壅塞直到broker反馈一个确认,表示已经被broker处理。
这个机制供应了的安全性保障,但是由于是壅塞的操作,会影响到客户端发送的性能异步发送的过程中,发送者不须要等待broker供应反馈,以是性能相对较高。
但是可能会涌现丢失的情形。
以是利用异步发送的条件是在某些情形下许可涌现数据丢失的情形。
默认情形下,非持久化是异步发送的,持久化并且是在非事务模式下是同步发送的。
但是在开缘由务的情形下,都是异步发送。
由于异步发送的效率会比同步发送性能更高。
以是在发送持久化的时候,只管即便去开缘由务会话。

发送事理

ProducerWindowSize的含义

producer每发送一个,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,须要等待broker的确认,才能连续发送。

代码在:ActiveMQSession的1957行紧张用来约束在异步发送时producer端许可积压的(尚未ACK)的的大小,且只对异步发送故意义。
每次发送之后,都将会导致memoryUsage大小增加(+message.size),当broker返回producerAck时,memoryUsage尺寸减少(producerAck.size,此size表示先前发送的大小)。

可以通过如下2种办法设置:Ø 在brokerUrl中设置: "tcp://localhost:61616?jms.producerWindowSize=1048576",这种设置将会对所有的producer生效。
Ø 在destinationUri中设置: "test-queue?producer.windowSize=1048576",此参数只会对利用此Destination实例的producer失落效,将会覆盖brokerUrl中的producerWindowSize值。

把稳:此值越大,意味着花费Client真个内存就越大。

源码剖析

ActiveMQMessageProducer.send(...)方法

public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { checkClosed();//检讨session连接,若已关闭直接抛出非常 if (destination == null) {//校验发送的目的地是否为空,也便是必须制订queue或者topic信息 if (info.getDestination() == null) { throw new UnsupportedOperationException("A destination must be specified."); } throw new InvalidDestinationException("Don't understand null destinations"); } //这里做的是封装Destination ActiveMQDestination dest; if (destination.equals(info.getDestination())) { dest = (ActiveMQDestination)destination; } else if (info.getDestination() == null) { dest = ActiveMQDestination.transform(destination); } else { throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); } if (dest == null) { throw new JMSException("No destination specified"); }//封装Message if (transformer != null) { Message transformedMessage = transformer.producerTransform(session, this, message); if (transformedMessage != null) { message = transformedMessage; } }//如果设置了producerWindow,则须要校验producerWindow大小 if (producerWindow != null) { try { producerWindow.waitForSpace(); } catch (InterruptedException e) { throw new JMSException("Send aborted due to thread interrupt."); } }//发送 this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);//做统计的 stats.onMessage(); }

ActiveMQSession的send方法

protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {//校验连接 checkClosed(); //校验发送目标 if (destination.isTemporary() && connection.isDeleted(destination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } //互斥锁,如果一个session的多个producer发送到这里,会担保发送的有序性 synchronized (sendMutex) { // tell the Broker we are about to start a new transaction doStartTransaction(); TransactionId txid = transactionContext.getTransactionId(); long sequenceNumber = producer.getMessageSequence(); //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 message.setJMSDeliveryMode(deliveryMode);//设置是否持久化 long expiration = 0L; if (!producer.getDisableMessageTimestamp()) { long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } message.setJMSExpiration(expiration);//过期韶光 message.setJMSPriority(priority);//优先级 message.setJMSRedelivered(false);//是否重复发送 // transform to our own message format here 统一封装 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); msg.setDestination(destination); //设置ID msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); // Set the message id. if (msg != message) {//如果是经由转化的,则更新原来的id和目的地 message.setJMSMessageID(msg.getMessageId().toString()); // Make sure the JMS destination is set on the foreign messages too. message.setJMSDestination(destination); } //clear the brokerPath in case we are re-sending this message msg.setBrokerPath(null); msg.setTransactionId(txid); if (connection.isCopyMessageOnSend()) { msg = (ActiveMQMessage)msg.copy(); } msg.setConnection(connection); msg.onSend();//把属性和体都设置为只读,防止被修正 msg.setProducerId(msg.getMessageId().getProducerId()); if (LOG.isTraceEnabled()) { LOG.trace(getSessionId() + " sending message: " + msg); } //如果onComplete没有设置,且发送超时时间小于0,且不须要反馈,且连接器不是同步发送模式,且非持久化或者连接器是异步发送模式//或者存在事务id的情形下,走异步发送,否则走同步发送 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { this.connection.asyncSendPacket(msg); if (producerWindow != null) { // Since we defer lots of the marshaling till we hit the // wire, this might not // provide and accurate size. We may change over to doing // more aggressive marshaling, // to get more accurate sizes.. this is more important once // users start using producer window // flow control. int size = msg.getSize();//异步发送的情形下,须要设置producerWindow的大小 producerWindow.increaseUsage(size); } } else { if (sendTimeout > 0 && onComplete==null) { this.connection.syncSendPacket(msg,sendTimeout);//带超时时间的同步发送//带回调的同步发送 }else { this.connection.syncSendPacket(msg, onComplete);//带回调的同步发送 } } } }

看下异步发送的代码ActiveMQConnection. asyncSendPacket()

/ send a Packet through the Connection - for internal use only @param command @throws JMSException / public void asyncSendPacket(Command command) throws JMSException { if (isClosed()) { throw new ConnectionClosedException(); } else { doAsyncSendPacket(command); } } private void doAsyncSendPacket(Command command) throws JMSException { try { this.transport.oneway(command); } catch (IOException e) { throw JMSExceptionSupport.create(e); } }

再看看transport是个什么东西?在哪里实例化的?按照以前看源码的老例来看,它肯定不是一个纯挚的工具。
按照以往我看源码的履历来看,一定是在创建连接的过程中初始化的。
以是我们定位到代码

//从connection=connectionFactory.createConnection();这行代码作为入口,一贯跟踪ActiveMQConnectionFactory. createActiveMQConnection这个方法中。
代码如下protected ActiveMQConnection createActiveMQConnection(String userName, String password) throwsJMSException {if (brokerURL == null) {throw new ConfigurationException("brokerURL not set.");}ActiveMQConnection connection = null;try {Transport transport = createTransport();//代码往下看connection = createActiveMQConnection(transport, factoryStats);connection.setUserName(userName);connection.setPassword(password);//省略后面的代码}

//这个方法便是实例化Transport的 1.构建Broker的URL 2.根据这个URL去创建一个链接TransportFactory.connect 默认利用的TCP连接 protected Transport createTransport() throws JMSException { try { URI connectBrokerUL = brokerURL; String scheme = brokerURL.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + brokerURL + "]"); } if (scheme.equals("auto")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp")); } else if (scheme.equals("auto+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl")); } else if (scheme.equals("auto+nio")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio")); } else if (scheme.equals("auto+nio+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl")); } return TransportFactory.connect(connectBrokerUL);//里面的代码连续往下看 } catch (Exception e) { throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); } }

TransportFactory. findTransportFactory

从TRANSPORT_FACTORYS这个Map凑集中,根据scheme去得到一个TransportFactory指定的实例工具如果Map凑集中不存在,则通过TRANSPORT_FACTORY_FINDER去找一个并且构建实例Ø 这个地方又有点类似于我们之前所学过的SPI的思想吧?他会从METAINF/services/org/apache/activemq/transport/ 这个路径下,根据URI组装的scheme去找到匹配class工具并且实例化,以是根据tcp为key去对应的路径下可以找到T cpT ransportFactory

//TransportFactory.connect(connectBrokerUL) public static Transport connect(URI location) throws Exception { TransportFactory tf = findTransportFactory(location); return tf.doConnect(location); } //findTransportFactory(location) public static TransportFactory findTransportFactory(URI location) throws IOException { String scheme = location.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + location + "]"); } TransportFactory tf = TRANSPORT_FACTORYS.get(scheme); if (tf == null) { // Try to load if from a META-INF property. try { tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); TRANSPORT_FACTORYS.put(scheme, tf); } catch (Throwable e) { throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); } } return tf; }

调用TransportFactory.doConnect去构建一个连接

public Transport doConnect(URI location) throws Exception { try { Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); if( !options.containsKey("wireFormat.host") ) { options.put("wireFormat.host", location.getHost()); } WireFormat wf = createWireFormat(options); Transport transport = createTransport(location, wf); Transport rc = configure(transport, wf, options); //remove auto IntrospectionSupport.extractProperties(options, "auto."); if (!options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); } return rc; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } }

configure

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { //组装一个复合的transport,这里会包装两层,一个是IactivityMonitor.另一个是WireFormatNegotiator transport = compositeConfigure(transport, wf, options); transport = new MutexTransport(transport);//再做一层包装,MutexTransport transport = new ResponseCorrelator(transport);//包装ResponseCorrelator return transport; }

到目前为止,这个transport实际上便是一个调用链了,他的链构造为ResponseCorrelator(MutexT ransport(WireFormatNegotiator(IactivityMonitor(T cpT ransport()))每一层包装表示什么意思呢?ResponseCorrelator 用于实现异步要求。
MutexT ransport 实现写锁,表示同一韶光只许可发送一个要求WireFormatNegotiator 实现了客户端连接broker的时候先发送数据解析干系的协议信息,比如解析版本号,是否利用缓存等InactivityMonitor 用于实现连接成功成功后的心跳检讨机制,客户端每10s发送一次心跳信息。
做事端每30s读取一次心跳信息。

同步发送和异步发送的差异

public Object request(Object command, int timeout) throws IOException {FutureResponse response = asyncRequest(command, null);return response.getResult(timeout); // 从future方法壅塞等待返回}

持久化和非持久化的存储事理

正常情形下,非持久化是存储在内存中的,持久化是存储在文件中的。
能够存储的最大数据在${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点SystemUsage配置设置了一些系统内存和硬盘容量

<systemUsage><systemUsage><memoryUsage>//该子标记设置全体ActiveMQ节点的“可用内存限定”。
这个值不能超过ActiveMQ本身设置的最大内存大小。
个中的percentOfJvmHeap属性表示百分比。
占用70%的堆内存<memoryUsage percentOfJvmHeap="70" /></memoryUsage><storeUsage>//该标记设置全体ActiveMQ节点,用于存储“持久化”的“可用磁盘空间”。
该子标记的limit属性必须要进行设置<storeUsage limit="100 gb"/></storeUsage><tempUsage>//一旦ActiveMQ做事节点存储的达到了memoryUsage的限定,非持久化就会被转储到 temp store区域,虽然我们说过非持久化不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”涌现时非持久化大量堆积致使内存耗尽的情形涌现,还是会将非持久化写入到磁盘的临时区域——temp store。
这个子标记便是为了设置这个tempstore区域的“可用磁盘空间限定”<tempUsage limit="50 gb"/></tempUsage></systemUsage></systemUsage>

从上面的配置我们须要get到一个结论,当非持久化堆积到一定程度的时候,也便是内存超过指定的设置阀值时,ActiveMQ会将内存中的非持久化写入到临时文件,以便腾出内存。
但是它和持久化的差异是,重启之后,持久化会从文件中规复,非持久化的临时文件会直接删除

的持久化策略剖析

持久性对付可靠通报来说是一种比较好的方法,即时发送者和接管者不是同时在线或者中央在发送者发送后宕机了,在中央重启后仍旧可以将发送出去。
持久性的事理很大略,便是在发送出去后,中央首先将存储在本地文件、内存或者远程数据库,然后把发送给接管者,发送成功后再把从存储中删除,失落败则连续考试测验。
接下来我们来理解一下在broker上的持久化存储实现办法

持久化存储支持类型

ActiveMQ支持多种不同的持久化办法,紧张有以下几种,不过,无论利用哪种持久化办法,的存储逻辑都是同等的。
Ø KahaDB存储(默认存储办法)

Ø JDBC存储

Ø Memory存储

Ø LevelDB存储

Ø JDBC With ActiveMQ Journal

KahaDB存储KahaDB是目前默认的存储办法,可用于任何场景,提高了性能和规复能力。
存储利用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB是一个专门针对持久化的办理方案,它对范例的利用模式进行了优化。
在Kaha中,数据被追加到data logs中。
当不再须要log文件中的数据的时候,log文件会被丢弃。

配置办法

<persistenceAdapter><kahaDB directory="${activemq.data}/kahadb"/></persistenceAdapter>

KahaDB的存储事理在data/kahadb这个目录下,会天生四个文件Ø db.data 它是的索引文件,实质上是B-Tree(B树),利用B-Tree作为索引指向db-.log里面存储的Ø db.redo 用来进行规复Ø db-.log 存储内容。
新的数据以APPEND的办法追加到日志文件末端。
属于顺序写入,因此存储是比较快的。
默认是32M,达到阀值会自动递增Ø lock文件 锁,表示当前得到kahadb读写权限的broker

JDBC存储利用JDBC持久化办法,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。
ACTIVEMQ_MSGS 表,queue和topic都存在这个表中ACTIVEMQ_ACKS 存储持久订阅的信息和末了一个持久订阅吸收的IDACTIVEMQ_LOCKS 锁表,用来确保某一时候,只能有一个ActiveMQ broker实例来访问数据库JDBC存储配置

<persistenceAdapter><jdbcPersistenceAdapter dataSource="# MySQL-DS " createTablesOnStartup="true" /></persistenceAdapter>

dataSource指定持久化数据库的bean,createT ablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一样平常是第一次启动的时候设置为true,之后改成falseMysql持久化Bean配置

<bean id="Mysql-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://192.168.11.156:3306/activemq?relaxAutoCommit=true"/><property name="username" value="root"/><property name="password" value="root"/></bean>

LevelDB存储

LevelDB持久化性能高于KahaDB,虽然目前默认的持久化办法仍旧是KahaDB。
并且,在ActiveMQ 5.9版本供应了基于LevelDB和Zookeeper的数据复制办法,用于Master-slave办法的首选数据复制方案。
不过,据ActiveMQ官网对LevelDB的表述:LevelDB官方建议利用以及不再支持,推举利用的是KahaDB

<persistenceAdapter><levelDBdirectory="activemq-data"/></persistenceAdapter>

Memory 存储基于内存的存储,内存存储紧张是存储所有的持久化的在内存中。
persistent=”false”,表示不设置持久化存储,直接存储到内存中

<beans><broker brokerName="test-broker" persistent="false"xmlns="http://activemq.apache.org/schema/core"><transportConnectors><transportConnector uri="tcp://localhost:61635"/></transportConnectors> </broker></beans>

JDBC Message store with ActiveMQ Journal

这种办法战胜了JDBC Store的不敷,JDBC每次过来,都须要去写库和读库。
ActiveMQ Journal,利用高速缓存写入技能,大大提高了性能。
当消费者的消费速率能够及时跟上生产者的生产速率时,journal文件能够大大减少须要写入到DB中的。
举个例子,生产者生产了1000条,这1000条会保存到journal文件,如果消费者的消费速率很快的情形下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上的,那么这个时候只须要同步剩余的10%的到DB。
如果消费者的消费速率很慢,这个时候journal文件可以使以批量办法写到DB。
Ø 将原来的标签注释掉Ø 添加如下标签

<persistenceFactory><journalPersistenceAdapterFactory dataSource="#Mysql-DS" dataDirectory="activemqdata"/></persistenceFactory>

Ø 在做事端循环发送。
可以看到数据是延迟同步到数据库的

消费端消费的事理

我们知道有两种方法可以吸收,一种是利用同步壅塞的MessageConsumer#receive方法。
另一种是利用监听器MessageListener。
这里须要把稳的是,在同一个session下,这两者不能同时事情,也便是说不能针对不同采取不同的吸收办法。
否则会抛出非常。
至于为什么这么做,最大的缘故原由还是在事务性会话中,两莳花费模式的事务不好管控

消费流程图

ActiveMQMessageConsumer.receive消费端同步吸收的源码入口

public Message receive() throws JMSException { checkClosed(); checkMessageListener(); //检讨receive和MessageListener是否同时配置在当前的会话中,同步消费不须要设置MessageListener 否则会报错 sendPullCommand(0); //如果PrefetchSizeSize为0并且unconsumerMessage为空,则发起pull命令 MessageDispatch md = dequeue(-1); //从unconsumerMessage出行列步队获取消息 if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); //发送ack给到broker return createActiveMQMessage(md);//获取消息并返回}

sendPullCommand发送pull命令从broker上获取消息,条件是prefetchSize=0并且unconsumedMessages为空。
unconsumedMessage表示未消费的,这里面预读取的大小为prefetchSize的值

protected void sendPullCommand(long timeout) throws JMSException { clearDeliveredList(); if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); messagePull.setTimeout(timeout); session.asyncSendPacket(messagePull); //向做事端异步发送messagePull指令 }}

clearDeliveredList

在上面的sendPullCommand方法中,会先调用clearDeliveredList方法,紧张用来清理已经分发的链表deliveredMessagesdeliveredMessages,存储分发给消费者但还未应答的链表Ø 如果session是事务的,则会遍历deliveredMessage中的放入到previouslyDeliveredMessage中来做重发Ø 如果session是非事务的,根据ACK的模式来选择不同的应答操作

// async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again private void clearDeliveredList() { if (clearDeliveredList) { synchronized (deliveredMessages) { if (clearDeliveredList) { if (!deliveredMessages.isEmpty()) { if (session.isTransacted()) { if (previouslyDeliveredMessages == null) { previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId()); } for (MessageDispatch delivered : deliveredMessages) { previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); } LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size()); } else { if (session.isClientAcknowledge()) { LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); // allow redelivery if (!this.info.isBrowser()) { for (MessageDispatch md: deliveredMessages) { this.session.connection.rollbackDuplicate(this, md.getMessage()); } } } LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); deliveredMessages.clear(); pendingAck = null; } } clearDeliveredList = false; } } } }

dequeue

从unconsumedMessage中取出一个,在创建一个消费者时,就会为这个消费者创建一个未消费的道,这个通道分为两种,一种是大略优先级行列步队分发通道SimplePriorityMessageDispatchChannel ;另一种是前辈先出的分发通道FifoMessageDispatchChannel.至于为什么要存在这样一个分发通道,大家可以想象一下,如果消费者每次去消费完一个往后再broker拿一个,效率是比较低的。
以是通过这样的设计可以许可session能够一次性将多条分发给一个消费者。
默认情形下对付queue来说,prefetchSize的值是1000

beforeMessageIsConsumed

这里面紧张是做消费之前的一些准备事情,如果ACK类型不是DUPS_OK_ACKNOWLEDGE或者行列步队模式(大略来说便是除了T opic和DupAck这两种情形),所有的先放到deliveredMessages链表的开头。
并且如果当前是事务类型的会话,则判断transactedIndividualAck,如果为true,表示单条直接返回ack。
​ 否则,调用ackLater,批量应答, client端在消费后暂且不发送ACK,而是把它缓存下来(pendingACK),等到这些的条数达到一定阀值时,只须要通过一个ACK指令把它们全部确认;这比对每条都逐个确认,在性能上要提高很多

private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { md.setDeliverySequenceId(session.getNextDeliveryId()); lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); if (!isAutoAcknowledgeBatch()) { synchronized(deliveredMessages) { deliveredMessages.addFirst(md); } if (session.getTransacted()) { if (transactedIndividualAck) { immediateIndividualTransactedAck(md); } else { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } } }

afterMessageIsConsumed

这个方法的紧张浸染是实行应答操作,这里面做以下几个操作Ø 如果过期,则返回过期的ackØ 如果是事务类型的会话,则不做任何处理Ø 如果是AUTOACK或者(DUPS_OK_ACK且是行列步队),并且是优化ack操作,则走批量确认ackØ 如果是DUPS_OK_ACK,则走ackLater逻辑Ø 如果是CLIENT_ACK,则实行ackLater

private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { if (unconsumedMessages.isClosed()) { return; } if (messageExpired) { acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); stats.getExpiredMessageCount().increment(); } else { stats.onMessage(); if (session.getTransacted()) { // Do nothing. } else if (isAutoAcknowledgeEach()) { if (deliveryingAcknowledgements.compareAndSet(false, true)) { synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if (ackCounter + deliveredCounter >= (info.getPrefetchSize() .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } // AMQ-3956 - as further optimization send // ack for expired msgs when there are any. // This resets the deliveredCounter to 0 so that // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 prefetch as used in ackLater() if (pendingAck != null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } } } else { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack!=null) { deliveredMessages.clear(); session.sendAck(ack); } } } } deliveryingAcknowledgements.set(false); } } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { boolean messageUnackedByConsumer = false; synchronized (deliveredMessages) { messageUnackedByConsumer = deliveredMessages.contains(md); } if (messageUnackedByConsumer) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } else { throw new IllegalStateException("Invalid session state."); } } }09 ActiveMQ的优缺陷

ActiveMQ 采取推送办法,以是最适宜的场景是默认都可在短韶光内被消费。
数据量越大,查找和消费就越慢,积压程度与速率成反比。

缺陷

1.吞吐量低。
由于 ActiveMQ 须要建立索引,导致吞吐量低落。
这是无法战胜的缺陷,只要利用完备符合 JMS 规范的中间件,就要接管这个级别的TPS。
2.无分片功能。
这是一个功能缺失落,JMS 并没有规定中间件的集群、分片机制。
而由于 ActiveMQ 是伟企业级开拓设计的中间件,初衷并不是为了处理海量和高并发要求。
如果一台做事器不能承受更多,则须要横向拆分。
ActiveMQ 官方不供应分片机制,须要自己实现。

适用场景

对 TPS 哀求比较低的系统,可以利用 ActiveMQ 来实现,一方面比较大略,能够快速上手开拓,另一方面可控性也比较好,还有比较好的监控机制和界面

不适用的场景

量巨大的场景。
ActiveMQ 不支持自动分片机制,如果量巨大,导致一台做事器不能处理全部,就须要自己开拓分片功能。

原文链接:https://www.cnblogs.com/whgk/p/14155010.html

相关文章

PHP实现文字转图片的代码与应用

图片处理技术在各个领域得到了广泛应用。在PHP编程中,文字转图片功能同样具有很高的实用价值。本文将针对PHP实现文字转图片的代码进...

网站建设 2025-03-02 阅读1 评论0

NAN0017探索新型纳米材料的奥秘与应用

纳米技术作为一门新兴的交叉学科,近年来在材料科学、生物医学、电子工程等领域取得了举世瞩目的成果。其中,NAN0017作为一种新型纳...

网站建设 2025-03-02 阅读5 评论0

L26368XO代码其背后的创新与突破

编程语言在各个领域发挥着越来越重要的作用。在众多编程语言中,L26368XO代码以其独特的优势,成为了业界关注的焦点。本文将深入剖...

网站建设 2025-03-02 阅读3 评论0

HTML字体背景打造个化网页设计的关键元素

网页设计已经成为现代网络传播的重要手段。在众多网页设计元素中,字体和背景的搭配尤为关键。本文将从HTML字体背景设置的角度,探讨其...

网站建设 2025-03-02 阅读1 评论0