`
zhangziyangup
  • 浏览: 1078680 次
文章分类
社区版块
存档分类
最新评论

ActiveMQ4.1 +Spring2.0的POJO JMS方案

 
阅读更多

1.概述

1.1 JMS与ActiveMQ特性

JMS始终在JavaEE五花八门的协议里,WebService满天飞的时候占一位置,是因为:

  • 它可以把不影响用户执行结果又比较耗时的任务(比如发邮件通知管理员)异步的扔给JMS 服务端去做,而尽快的把屏幕返还给用户。
  • 服务端能够多线程排队响应高并发的请求,并保证请求不丢失。
  • 可以在Java世界里达到最高的解耦。客户端与服务端无需直连,甚至无需知晓对方是谁、在哪里、有多少人,只要对流过的信息作响应就行了,在企业应用环境复杂时作用明显。

ActiveMQ的特性:

  • 完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,也是Apache Geronimo默认的JMS provider。
  • POJO withdout EJB Container,不需要实现EJB繁琐复杂的Message Bean接口和配置。
  • Spring Base,可以使用Spring的各种特性如IOC、AOP。
  • Effective,基于Jencks的JCA Container实现pool connection,control transactions and manage security。

1.2 SpringSide 的完全POJO的JMS方案

SpringSide 2.0在BookStore示例中,演示了用户下订单时,将发通知信到用户邮箱的动作,通过JMS交给JMS服务端异步完成,避免了邮件服务器的堵塞而影响用户的下订。

全部代码于examples/bookstore/src/java/org/springside/bookstore/components/activemq 目录中。

一个JMS场景通常需要三者参与:

  • 一个POJO的的Message Producer,负责使用Spring的JMS Template发送消息。
  • 一个Message Converter,负责把Java对象如订单(Order)转化为消息,使得Producer能够直接发送POJO。
  • 一个MDP Message Consumer,负责接收并处理消息。

SpringSide 2.0采用了ActiveMQ 4.1-incubator 与Spring 2.0 集成,对比SS1.0M3,有三个值得留意的地方,使得代码中几乎不见一丝JMS的侵入代码:

  1. 采用Spring2.0的Schema式简化配置。
  2. 实现Message Converter转化消息与对象,使得Producer能够直接发送POJO而不是JMS Message。
  3. 使用了Spring2.0的DefaultMessageListenerContainer与MessageListenerAdapter,消息接收者不用实现MessageListener 接口。
  4. 同时,Spring 2.0 的DefaultMessageListenerContainer 代替了SS1.0M3中的Jenck(JCA Container),充当MDP Container的角色。

2.引入ActiveMQ的XSD

ActiveMQ4.1 响应Spring 2.0号召,支持了引入XML Schema namespace的简单配置语法,简化了配置的语句。

在ApplicationContext.xml(Spring的配置文件)中引入ActiveMQ的XML Scheam 配置文件),如下:

<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq
="http://activemq.org/config/1.0"
xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd"
>

由于ActiveMQ4.1 SnapShot的那个XSD有部分错误,因此使用的是自行修改过的XSD。

先在ClassPath根目录放一个修改过的activemq-core-4.1-incubator-SNAPSHOT.xsd。

在ClassPath 下面建立META-INF/spring.schemas 内容如下。这个spring.schemas是spring自定义scheam的配置文件,请注意"http:/://"部分写法

http/://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd=/activemq-core-4.1-incubator-SNAPSHOT.xsd

3. 配置方案

3.1基础零件

1.配置ActiveMQ Broker

暂时采用在JVM中嵌入这种最简单的模式, 当spring初始化时候,ActiveMQ embedded Broker 就会启动了。

<!--letscreateanembeddedActiveMQBroker-->
<amq:brokeruseJmx="false"persistent="false">
<amq:transportConnectors>
    
<amq:transportConnectoruri="tcp://localhost:0"/>
 
</amq:transportConnectors>
 
</amq:broker>

2. 配置(A)ConnectionFactory

由于前面配置的Broker是JVM embedded 所以URL为:vm://localhost

<!--ActiveMQconnectionFactorytouse-->
<amq:connectionFactoryid="jmsConnectionFactory"brokerURL="vm://localhost"/>

3 配置(B)Queue

<!--ActiveMQdestinationstouse-->
<amq:queuename="destination"physicalName="org.apache.activemq.spring.Test.spring.embedded"/>

4.配置(C)Converter

配置Conveter,使得Producer能够直接发送Order对象,而不是JMS的Message对象。

<!--OrderMessageconverter-->
<beanid="orderMessageConverter"class="org.springside.bookstore.components.activemq.OrderMessageConverter"/>

3.2发送端

1 配置JmsTemplate

Spring提供的Template,绑定了(A)ConnectionFactory与(C)Converter。

<!--SpringJmsTemplateconfig-->
<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="connectionFactory">
<!--letswrapinapooltoavoidcreatingaconnectionpersend-->
<beanclass="org.springframework.jms.connection.SingleConnectionFactory">
<propertyname="targetConnectionFactory"ref="jmsConnectionFactory"/>
</bean>
</property>
<!--customMessageConverter-->
<propertyname="messageConverter"ref="orderMessageConverter"/>
</bean>

2.Producer

消息发送者,使用JmsTemplate发送消息,绑定了JmsTemplate (含A、C)与(B)Queue。

<!--POJOwhichsendMessageusesSpringJmsTemplate,绑定JMSTemplate与Queue-->
<beanid="orderMessageProducer"class="org.springside.bookstore.components.activemq.OrderMessageProducer">
<propertyname="template"ref="jmsTemplate"/>
<propertyname="destination"ref="destination"/>
</bean>

3.3 接收端

1.接收处理者(MDP)

使用Spring的MessageListenerAdapter,指定负责处理消息的POJO及其方法名,绑定(C)Converter。

<!--MessageDrivenPOJO(MDP),绑定Converter-->
<beanid="messageListener"class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<beanclass="org.springside.bookstore.components.activemq.OrderMessageConsumer">
<propertyname="mailService"ref="mailService"/>
</bean>
</constructor-arg>
<!--maybeothermethod-->
<propertyname="defaultListenerMethod"value="sendEmail"/>
<!--customMessageConverterdefine-->
<propertyname="messageConverter"ref="orderMessageConverter"/>
</bean>

2. listenerContainer

负责调度MDP,绑定(A) connectionFactory, (B)Queue和MDP。

<!--thisistheattendantmessagelistenercontainer-->
<beanid="listenerContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<propertyname="connectionFactory"ref="jmsConnectionFactory"/>
<propertyname="destination"ref="destination"/>
<propertyname="messageListener"ref="messageListener"/>

</bean>

互相绑定的关系有点晕,发送端和接收端都以不同形式绑定了(A) connectionFactory, (B)Queue和 (C)Converter。

2.持久化消息

2.1 给Broker加入Persistence 配置

在配置文件applicationContext-activemq-embedded-persitence.xml中的<amq:broker>节点加入

<amq:persistenceAdapter>
<amq:jdbcPersistenceAdapterid="jdbcAdapter"dataSource="#hsql-ds"createTablesOnStartup="true"useDatabaseLock="false"/>
</amq:persistenceAdapter>

请注意MSSQL(2000/2005)和HSQL由于不支持[SELECT * ACTIVEMQ_LOCK FOR UPDATE ]语法,因此不能使用默认的userDatabaseLock="true",只能设置成useDatabaseLock="false"

2.2 配置多种数据源

配置多种数据源,给jdbcPersistenceAdapter使用,SpringSide 中使用的内嵌HSQL

<!--TheHSQLDatasourcethatwillbeusedbytheBroker-->
<beanid="hsql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">
<propertyname="driverClassName"value="org.hsqldb.jdbcDriver"/>
<propertyname="url"value="jdbc:hsqldb:res:hsql/activemq"/>
<propertyname="username"value="sa"/>
<propertyname="password"value=""/>
<propertyname="poolPreparedStatements"value="true"/>
</bean>

2. 3 说明

笔者仅仅使用了jdbcPersistenceAdapter,其实在ActiveMQ的XSD已经描述了多种PersistenceAdapter,可以参考对应的XSD文件.

另外对于数据库的差异主要表现在设置了userDatabaseLock="true"之后,ActiveMQ使用的[SELECT * ACTIVEMQ_LOCKFOR UPDATE] 上面,会导致一些数据库出错(测试中MSSQL2000/2005,HSQL都会导致出错)。另外HSQL的脚本请参见activemq.script。

3. Jenck(JCA Container)

Spring 2.0本身使用DefaultMessageListenerContainer 可以充当MDP中的Container角色,但是鉴于Jencks是JCA标准的,它不仅仅能够提供jms的jca整合,包括其他资源比如jdbc都可以做到jca管理

所以,同时完成了这个ActiveMQ+Spring+Jencks 配置演示,更多的针对生产系统的JCA特性展示,会在稍后的开发计划讨论中确定。

此文档适用于说明使用 Jecncks 和 使用Spring 2.0(DefaultMessageListenerContainer) 充当MDP Container时的区别,同时演示Jecnks 的Spring 2.0 新配置实例。

3.1 引入ActiveMQ ResourceAdapter 和Jencks 的XSD

在ApplicationContext.xml(Spring的配置文件)中引入ActiveMQ ResourceAdapter 和Jencks 的XML Scheam 配置文件),如下:

ActiveMQ4.1 响应Spring 2.0号召,支持了引入XML Schema namespace的简单配置语法,简化了配置的语句。

在ApplicationContext.xml(Spring的配置文件)中引入ActiveMQ的XML Scheam 配置文件),如下:

<beans
xmlns="http://www.springframework.org/schema/beans"xmlns:amq="http://activemq.org/config/1.0"xmlns:ampra="http://activemq.org/ra/1.0"xmlns:jencks="http://jencks.org/1.3"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd
http://activemq.org/ra/1.0http://people.apache.org/repository/org.apache.activemq/xsds/activemq-ra-4.1-incubator-SNAPSHOT.xsd
http://jencks.org/1.3http://repository.codehaus.org/org/jencks/jencks/1.3/jencks-1.3.xsd"
>

由于ActiveMQ RA和Jencks那个XSD 仍然有部分错误,因此使用的是自行修改过的XSD。(是xs:any元素引起的错误)

先在ClassPath根目录放一个修改过的activemq-ra-4.1-incubator-SNAPSHOT.xsd和jencks-1.3.xsd。

同样修改 ClassPath 下面META-INF/spring.schemas 增加内容如下。这个spring.schemas是spring自定义scheam的配置文件,请注意"http:/://"部分写法

http/://people.apache.org/repository/org.apache.activemq/xsds/activemq-ra-4.1-incubator-SNAPSHOT.xsd=/activemq-ra-4.1-incubator-SNAPSHOT.xsd
http/://repository.codehaus.org/org/jencks/jencks/1.3/jencks-1.3.xsd=/jencks-1.3.xsd

3.2 配置方案

3.2.1基础零件

1.配置ActiveMQ Broker 参见 ActiveMQ+Spring

2.配置ActiveMQ Resource Adapter

<amqra:managedConnectionFactoryid="jmsManagedConnectionFactory"resourceAdapter="#resourceAdapter"/><amqra:resourceAdapterid="resourceAdapter"serverUrl="vm://localhost"/>

3.配置Jencks 基础配置

具体的配置可以参见Jencks的XSD

<!--jencksPoolFactoryconfig-->
<jencks:singlePoolFactoryid="poolingSupport"maxSize="16"minSize="5"blockingTimeoutMilliseconds="60"idleTimeoutMinutes="60"matchOne="true"matchAll="true"selectOneAssumeMatch="true"/><!--jencksXATransactionFactory-->
<jencks:xATransactionFactoryid="transactionSupport"useTransactionCaching="true"useThreadCaching="true"/>
<!--jencksConnectionManagerFactory-->
<jencks:connectionManagerFactoryid="connectionManager"containerManagedSecurity="false"poolingSupport="#poolingSupport"transactionSupport="#transactionSupport"/><!--jencksTransactionContextManagerFactory-->
<jencks:transactionContextManagerFactoryid="transactionContextManagerFactory"/>

4.配置给JmsTemplate使用的connectionFactory (主要是生成者/发送者 使用)

这里注意下,在配置jmsTemplate的使用的targetConnectionFactory就是使用jencks配置的connectionManager

<!--springconfigjmswithjca-->
<beanid="jmsManagerConnectionFactory"class="org.springframework.jca.support.LocalConnectionFactoryBean">
<propertyname="managedConnectionFactory">
<reflocal="jmsManagedConnectionFactory"/>
</property>
<propertyname="connectionManager">
<reflocal="connectionManager"/>
</property>
</bean>

<!--SpringJmsTemplateconfig-->
<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="connectionFactory">
<!--letswrapinapooltoavoidcreatingaconnectionpersend-->
<beanclass="org.springframework.jms.connection.SingleConnectionFactory">
<propertyname="targetConnectionFactory"ref="jmsManagerConnectionFactory"/>
</bean>
</property>
<!--customMessageConverter-->
<propertyname="messageConverter"ref="orderMessageConverter"/>
</bean>

5.配置Spring 2.0的MessageListenerAdapter,保证不需要用户实现MessageListener

ActiveMQ+Spring

6.配置Jecnks 充当MDP的Container

就是把上面的MessageListenerAdapter配置到Jencks里面,完成整个MDP的配置

<!--JencksContainer-->
<jencks:jcaContainer><jencks:bootstrapContext>
<jencks:bootstrapContextFactorythreadPoolSize="25"/>
</jencks:bootstrapContext>
<jencks:connectors>
<!--usejenckscontainer(usespringMessageListenerAdapter)-->
<jencks:connectorref="messageListener">
<jencks:activationSpec>
<amqra:activationSpecdestination="org.apache.activemq.spring.Test.spring.embedded"destinationType="javax.jms.Queue"/>
</jencks:activationSpec>
</jencks:connector></jencks:connectors><jencks:resourceAdapter>
<amqra:resourceAdapterserverUrl="vm://localhost"/>
</jencks:resourceAdapter>

</jencks:jcaContainer>
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics