activemqplugin开发指南及⽰例
activemq提供了⼀种插件(plugin)开发的机制(), 可以⽅便的添加各种⾃定义功能。其效果类似于直接去修改activemq的源码,⽆论从⽅便性还是风险程度上,使⽤plugin都要⽐去修改源码要好很多。通过这种⽅式,理论上我们可以实现⼏乎所有能想到的功能。
activemq重启命令
开发指南
⾸先需要在⼯程中添加activemq的依赖,为了⽅便,可以直接⽤activemq-all包。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.2</version>
</dependency>
Actviemq plugin的开发⾮常⽅便,只需要实现两个类。
⼀个是plugin类,需要implements BrokerPlugin这个接⼝,然后将接⼝的installPlugin⽅法实现了即可。
这个⽅法的唯⼀作⽤就是指定⼀个broker类,代码如下:
public Broker installPlugin(Broker broker) throws Exception {
return new LimitQueueSIzeBroker(broker);
}
接下来就是在实现⼀个broker类,⼀般情况下可以extends BrokerFilter 这个类,然后选择需要的⽅法重写,构造⽅法和重写的每个⽅法执⾏完毕后,执⾏以下super类的对应⽅法,这样就不会对activemq的实际运⾏造成影响。
例如:
public FoxBroker(Broker next) {
super(next);
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
try {
String ip = ConnectionContext().getConnection().getRemoteAddress();
String destinationName = Destination().getPhysicalName();
logger.info("send_" + destinationName + " "  + ip);
} catch (Exception e) {
<("activemq send log error: " + e, e);
}
super.send(producerExchange, messageSend);
}
这⾥我们重写了activemq的send⽅法, 并且增加了⼀些⽇志。
然后打包,将jar包放到activemq的lib⽬录下。再在l下增加plugins这个模块,并把需要的插件依次列在⾥边。
<plugins>
<bean xmlns="/schema/beans" id="testPlugin" class="com.mallow.activemq.FoxBrokerPlugin"/>
<bean xmlns="/schema/beans" id="purgePlugin" class="com.mallow.activemq.LimitQueueSizePlugin"/>
</plugins>
重启activemq,再收发消息的时候就能看到效果了。
例⼦
可以参考我在github上放的代码:
⾥边提供了两个⾮常简单的插件⽰例。FoxBrokerPlugin会在收发消息时记录⽣产者、消费者的ip; LimitQueueSizePlugin则可以控制队列⼤⼩,队列中积压1000条消息以后新的消息就会被丢弃,在测试环境下会⽐较有⽤。
此外,activemq⾃⼰也提供了⼏个常⽤的插件,包括LoggingBrokerPlugin, StatisticsBrokerPlugin, 等,也可以参考他们的实现。