ActiveMQ:设置多个并⾏的消费者
消息队列本来就是⼀种经典的⽣产者与消费者模式。⽣产者向消息队列中发送消息,消费者从消息队列中获取消息来消费。
消息的传送⼀般由⼀个代理来实现的,那就是Message broker(即消息代理)。Message broker有两⼤职责,⼀是消息路由,⼆是数据转换。这就好⽐A给B寄信,如果不使⽤邮局的话,就要⾃⼰想办法送达,费时费⼒,⽽通过邮局的话,只要B的地址在邮局中注册过,那么天涯海⾓也能送达。这⾥的邮局扮演的⾓⾊就像消息系统中的Message broker。
众所周知,消息队列是典型的’send and forget’原则的体现,⽣产者只管发送,不管消息的后续处理。为了最⼤效率的完成对消息队列中的消息的消费,⼀般可以同时起多个⼀模⼀样的消费者,以并⾏的⽅式来拉取消息队列中的消息。这样的好处有多个:
1.加快处理消息队列中的消息。
2.增强稳定性,如果⼀个消费者出现问题,不会影响对消息队列中消息的处理。
使⽤spring JMS来配置多个Listener实例,只需配置MessageListenerContainer就⾏。
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="${jms.queue.name}"/>
<property name="messageListener" ref="messageReceiver"/>
<property name="concurrentConsumers" value="4"/>
</bean>
多配置⼀个属性concurrentConsumers,设置值为4,就是同时启动4个Listener实例来消费消息。
使⽤MessageSender来发送100条消息,可以检查消息处理的顺序会发⽣变化。
for (int i = 0; i < 100; i++) {
messageSender.send(String.format("message %d",i));
}
运⾏结果如下:
...
Received: message 4
Received: message 7
Received: message 6
Received: message 5
Received: message 8
activemq启动报错
Received: message 10
Received: message 9
除了设置⼀个固定的Listener数量,也可以设置⼀个Listener区间,这样MessageListenerContainer可以根据消息队列中的消息规模⾃动调整并⾏数量。
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="${jms.queue.name}"/>
<property name="messageListener" ref="messageReceiver"/>
<property name="concurrency" value="4-8"/>
</bean>
这次使⽤的是concurrency属性,4-8表⽰最⼩并发数是4,最⼤并发数为8,当然也可以给⼀个固定值,⽐如5,这样就相当于concurrentConsumers属性了。