activemq消息队列的⼤⼩配置的配置
在⽹上有⼈问,如何在l⾥⾯配置消息队列的⼤⼩,这样才保证队列不会溢出!如果采⽤⾮持久化消息,那么当⼤量发
送失败时候,⾸先⼤量占⽤内存。造成消息堆积,容易造成内存溢出,所以个⼈⽐较倾向于持久化消息的同时配合其他⽅式的
master/slave或者failover机制,尽量保持消息的畅通。当我们开发的Java的使⽤应⽤程序的时候,有的时候需要为java应⽤指定对应的内
存⼤⼩。当在测试的时候,有的同学也许会遇到ActiveMQ报内存不⾜的问题,我们可以修改ActiveMQ使⽤的内存的多少类似Java 的内存
的设置,在ActiveMQ.bat或者activemq(linux)⽂件中
如下:
if "%ACTIVEMQ_BASE%" == "" set ACTIVEMQ_BASE=%ACTIVEMQ_HOME%
if "%ACTIVEMQ_OPTS%" == "" set ACTIVEMQ_OPTS=-Xmx512M -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.fig.file=logging.p if "%SUNJMX%" == "" set SUNJMX=-Dcom.sun.management.jmxremote
其中ACTIVEMQ_OPTS中指定ActiveMQ的使⽤的内存的⼤⼩。
下⾯为ActiveMQ的内存分配信息。l中的配置情况:
<!--
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
/producer-flow-control.html
<systemUsage  sendFailIfNoSpaceAfterTimeout="3000"  sendFailIfNoSpace="true">
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-->
memoryUsage:表⽰所有队列对象占⽤的内存⼤⼩为20mb;
AbstractPendingMessageCursor计算空间的⽅法源代码如下:
public boolean hasSpace() {
return systemUsage != null ? (MemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
}
public boolean isFull() {
return systemUsage != null ? MemoryUsage().isFull() : false;
}
ActiveMQ消息持久化
在broker中设置属性persistent=”true”(默认是true),同时发送的消息也应该是persitent类型的。ActiveMQ消息持久化有三种⽅式:AMQ、KahaDB、JDBC。
1、AMQ
AMQ是⼀种⽂件存储形式,它具有写⼊速度快和容易恢复的特点。消息存储在⼀个个⽂件中,⽂件的默认⼤⼩为32兆,如果⼀条消息的⼤⼩超过了32兆,那么这个值必须设置⼤点。当⼀个存储⽂件中的消息已经全部被消费,那么这个⽂件将被标识为可删除,在下⼀个清除阶段,这个⽂件被删除。默认配置如下:
Java
1 2 3<persistenceAdapter>
<amqPersistenceAdapter directory="activemq-data"maxFileLength="32mb"/> </ persistenceAdapter >
AMQ的属性:
属性名称默认值描述
directory activemq-
data
消息⽂件和⽇志的存储⽬录
useNIO true使⽤NIO协议存储消息
syncOnWrite false同步写到磁盘,这个选项对性能影响⾮常⼤
maxFileLength32mb⼀个消息⽂件的⼤⼩
persistentIndex true消息索引的持久化,如果为false,那么索引保存在内存中
maxCheckpointMessageAddSize4kb⼀个事务允许的最⼤消息量
activemq启动报错cleanupInterval30000清除操作周期,单位ms
indexBinSize1024索引⽂件缓存页⾯数,缺省为1024,当amq扩充或者缩减存储时,会锁定整个broker,导致⼀定时间的阻塞,所
以这个值应该调整到⽐较⼤,但是代码中实现会动态伸缩,调整效果并不理想。
indexKeySize96索引key的⼤⼩,key是消息ID
indexPageSize16kb索引的页⼤⼩
directoryArchive archive存储被归档的消息⽂件⽬录
archiveDataLogs false当为true时,归档的消息⽂件被移到directoryArchive,⽽不是直接删除
2、KahaDB
KahaDB是基于⽂件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间⽐AMQ短,从5.4版本之后KahaDB做为默认的持久化⽅式。默认配置如下:
Java
1 2 3<persistenceAdapter>
<kahaDB directory="activemq-data"journalMaxFileLength="32mb"/> </ persistenceAdapter >
KahaDB的属性:
property name default value Comments
directory activemq-data消息⽂件和⽇志的存储⽬录
indexWriteBatchSize1000⼀批索引的⼤⼩,当要更新的索引量到达这个值时,更新到消息⽂件中
indexCacheSize10000内存中,索引的页⼤⼩
enableIndexWriteAsync false索引是否异步写到消息⽂件中
journalMaxFileLength32mb⼀个消息⽂件的⼤⼩
enableJournalDiskSyncs true是否讲⾮事务的消息同步写⼊到磁盘
cleanupInterval30000清除操作周期,单位ms
checkpointInterval5000索引写⼊到消息⽂件的周期,单位ms
ignoreMissingJournalfiles false忽略丢失的消息⽂件,false,当丢失了消息⽂件,启动异常
checkForCorruptJournalFiles false检查消息⽂件是否损坏,true,检查发现损坏会尝试修复
checksumJournalFiles false产⽣⼀个checksum,以便能够检测journal⽂件是否损坏。
5.4版本之后有效的属性:
archiveDataLogs false当为true时,归档的消息⽂件被移到directoryArchive,⽽不是直接删除
directoryArchive null存储被归档的消息⽂件⽬录
databaseLockedWaitDelay10000在使⽤负载时,等待获得⽂件锁的延迟时间,单位ms
maxAsyncJobs10000同个⽣产者产⽣等待写⼊的异步消息最⼤量
concurrentStoreAndDispatchTopics false当写⼊消息的时候,是否转发主题消息
concurrentStoreAndDispatchQueues true当写⼊消息的时候,是否转发队列消息
5.6版本之后有效的属性:
archiveCorruptedIndex false是否归档错误的索引
从5.6版本之后,有可能发布通过多个kahadb持久适配器来实现分布式⽬标队列存储。什么时候⽤呢?如果有⼀个快速的⽣产者和消费者,当某⼀个时刻⽣产者发⽣了不规范的消费,那么有可能产⽣⼀条消息被存储在两个消息⽂件中,同时,有些⽬标队列是危险的并且要求访问磁盘。在这种情况下,你应该⽤通配符来使⽤mKahaDB。如果⽬标队列是分布的,事务是可以跨越多个消息⽂件的。
每个KahaDB的实例都可以配置单独的适配器,如果没有⽬标队列提交给filteredKahaDB,那么意味着对所有的队列有效。如果⼀个队列没有对应的适配器,那么将会抛出⼀个异常。配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19<persistenceAdapter>
<mKahaDB directory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters>
<!--match all queues -->
<filteredKahaDB queue=">">
<persistenceAdapter>
<kahaDB journalMaxFileLength="32mb"/>
</ persistenceAdapter >
</ filteredKahaDB >
<!--match all destinations -->
<filteredKahaDB>
<persistenceAdapter>
<kahaDB enableJournalDiskSyncs="false"/>
</ persistenceAdapter >
</ filteredKahaDB >
</ filteredPersistenceAdapters >
</ mKahaDB >
</ persistenceAdapter >
如果filteredKahaDB的perDestination属性设置为true,那么匹配的⽬标队列将会得到⾃⼰对应的KahaDB实例。配置如下:Java
1 2 3 4 5 6 7 8 9 10 11 12<persistenceAdapter>
<mKahaDB directory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters>
<!--kahaDB per destinations -->
<filteredKahaDB perDestination="true">
<persistenceAdapter>
<kahaDB journalMaxFileLength="32mb"/>
</ persistenceAdapter >
</ filteredKahaDB >
</ filteredPersistenceAdapters >
</ mKahaDB >
</ persistenceAdapter >
3、JDBC
配置JDBC适配器:Java
1 2 3<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"createTablesOnStartup="false"/> </ persistenceAdapter >
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,⼀般是第⼀次启动的时候设置为true,之后改成false。
MYSQL持久化bean
Java
1 2 3 4 5 6 7<bean id="mysql-ds"class="org.apachemons.dbcp.BasicDataSource" destroy -method="close"> <property name="driverClassName"value="sql.jdbc.Driver"/>
<property name="url"value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username"value="activemq"/>
<property name="password"value="activemq"/>
<property name="poolPreparedStatements"value="true"/>
</ bean >
SQL Server持久化bean Java
1 2 3 4 5 6 7<bean id="mssql-ds"class="net.sourceforge.jtds.jdbcx.JtdsDataSource" destroy -method="close">  <property name="serverName"value="SERVERNAME"/>
<property name="portNumber"value="PORTNUMBER"/>
<property name="databaseName"value="DATABASENAME"/>
<property name="user"value="USER"/>
<property name="password"value="PASSWORD"/>
</ bean >
Oracle持久化bean Java
1 2 3 4 5 6 7 8<bean id="oracle-ds"class="org.apachemons.dbcp.BasicDataSource" destroy -method="close"> <property name="driverClassName"value="oracle.jdbc.driver.OracleDriver"/>
<property name="url"value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/>
<property name="username"value="activemq"/>
<property name="password"value="activemq"/>
<property name="maxActive"value="200"/>
<property name="poolPreparedStatements"value="true"/>
</ bean >
DB2持久化bean