码迷,mamicode.com
首页 > 编程语言 > 详细

springbatch apache-activemq 整合(往mq中put数据,从mq中take数据)

时间:2017-08-28 23:52:05      阅读:1101      评论:0      收藏:0      [点我收藏+]

标签:springbatch apache-activemq


简单测试如下:

1:收下下载apache-activemq-5.14.4 解压apache-activemq-5.14.4\bin\win64,运行activemq.bat

启动本地MQ服务器。

通过浏览器可以查看本地MQ服务器的信息。

http://127.0.0.1:8161/admin/index.jsp

2: 先往mq中put数据

配置如下:

<job id="jmsReadJob">
        <step id="jmsReadStep">
            <tasklet transaction-manager="transactionManager">
                <chunk reader="jmsItemReader" processor="creditBillProcessor" writer="creditItemWriter" commit-interval="2"></chunk>
            </tasklet>
        </step>
 </job>
      <!-- 从数据库读取 -->
     <bean:bean id="jmsItemReader" scope="step"
        class="org.springframework.batch.item.database.JdbcCursorItemReader" >
        <bean:property name="dataSource" ref="dataSource"/>
<bean:property name="sql" 
        value="
        select BIZ_UUID,BIZ_TYPE_CODE,BIZ_TYPE_PROP_CODE,BIZ_TYPE_PROP_NAME from IMS_BIZ_ARGS
        "/>
        <bean:property name="rowMapper">
            <bean:bean class="org.springframework.jdbc.core.BeanPropertyRowMapper">
                <bean:property name="mappedClass" value="com.citiccard.channel.sts.mqbatch.BizArgs"/>
            </bean:bean>
        </bean:property>
    </bean:bean>

 <!-- 写到MQ中 -->
<bean:bean id="creditItemWriter" class="org.springframework.batch.item.jms.JmsItemWriter">
<bean:property name="jmsTemplate" ref="jmsTemplate" />
</bean:bean>

 <!-- MQ对象配置 -->
<bean:bean id="jmsTemplate"  class="org.springframework.jms.core.JmsTemplate">
<bean:property name="connectionFactory" ref="mqfactory" />
<bean:property name="messageConverter">
<bean:bean
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</bean:property> 
<bean:property name="defaultDestination" ref="destination" />
<bean:property name="receiveTimeout" value="500" />
</bean:bean>
<bean:bean id="mqfactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
<bean:property name="brokerURL" value="tcp://localhost:61616" />
<bean:property name="trustAllPackages" value="true" />
<bean:property name="useAsyncSend" value="true" />
<bean:property name="redeliveryPolicy">
<bean:bean class="org.apache.activemq.RedeliveryPolicy">
<bean:property name="maximumRedeliveries" value="0" />
</bean:bean>
</bean:property>
</bean:bean>
<bean:bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
 <!-- MQ队列名称(MQ服务器中可以有多个队列)-->
<bean:constructor-arg value="enrollMq444" />
</bean:bean>
 <!-- 处理器(意思就是你从源中取到数据后,这些数据是否要加工一下(比如我根据这个数据发一个socket请求或者我把某个字段的值乘以2等等))-->
<bean:bean id="creditBillProcessor" scope="step"
class="com.citiccard.channel.sts.mqbatch.CreditBillProcessor">
</bean:bean>
<!-- 我这个测试的例子的意思是:我从数据库中把对象取出来啥没干直接把对象发送给springbatch的creditItemWriter对象
也就是说MQ中存放的是BizArgs对象。这也意味着我从MQ中取数据时数据是个BizArgs对象而不是String
什么的其他的对象。
当然我也可以toString一下玩玩,测试么。
-->
public class CreditBillProcessor implements ItemProcessor<BizArgs, BizArgs> {
private JdbcTemplate jdbcTemplate = null;
public JdbcTemplate getJdbcTemplate() {
return jdbcTemplate;
}
public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public BizArgs process(BizArgs bizArgs) throws Exception {
System.out.println(bizArgs);
return bizArgs;
}
}
<!--
跑一把
-->
public static void main(String[] args) {
//ApplicationContext context = getContext("springJob/mq/job-db-jdbc-month01-putMq.xml");
ApplicationContext context = getContext("springJob/mq/job-db-jdbc-month01-getMq.xml");
//JmsTemplate jmsTemplate = getJmsTemplate(context);
//sendMessage(jmsTemplate, new CreditBill("4047390012345678","tom",100.00,"2013-2-2 12:00:08","Lu Jia Zui road"));
//sendMessage(jmsTemplate, new CreditBill("4047390012345678","tom",320,"2013-2-3 10:35:21","Lu Jia Zui road"));
//sendMessage(jmsTemplate, new CreditBill("4047390012345678","tom",360.00,"2013-2-11 11:12:38","Longyang road"));
executeJob(context, "jmsReadJob",
new JobParametersBuilder().addDate("date", new Date()));
}

完事后,apache-activemq服务器中会多一个叫enrollMq444(配置文件中配的)的Queue,并且数据和数据库中的条数一致。


3: 从mq中take数据,不详细说了,直接贴配置

<bean:import resource="classpath:springJob/job-context.xml" />

<job id="jmsReadJob">
        <step id="jmsReadStep">
            <tasklet transaction-manager="transactionManager">
                <chunk reader="jmsItemReader" processor="creditBillProcessor" writer="creditItemWriter" commit-interval="2"></chunk>
            </tasklet>
        </step>
    </job>
 <bean:bean id="jmsItemReader"
class="org.springframework.batch.item.jms.JmsItemReader">
<bean:property name="itemType" value="com.citiccard.channel.sts.mqbatch.BizArgs"/>
<bean:property name="jmsTemplate" ref="jmsTemplate"/>
</bean:bean>
    <bean:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" >
<bean:property name="connectionFactory" ref="mqfactory" />
<bean:property name="messageConverter">
<bean:bean
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</bean:property> 
<bean:property name="defaultDestination" ref="destination" />
<bean:property name="receiveTimeout" value="500" />
</bean:bean>
<bean:bean class="org.apache.activemq.ActiveMQConnectionFactory" id="mqfactory">
<bean:property name="brokerURL" value="tcp://localhost:61616" />
<bean:property name="trustAllPackages" value="true" />
</bean:bean>
<bean:bean id="destination2" class="org.apache.activemq.command.ActiveMQQueue">
<bean:constructor-arg value="enroll" />
</bean:bean>
<bean:bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<bean:constructor-arg value="enrollMq444" />
</bean:bean>
<bean:bean id="creditBillProcessor" scope="step"
class="com.citiccard.channel.sts.mqbatch.GetCreditBillProcessor">
</bean:bean>
<!-- 写数据库 -->
<bean:bean id="creditItemWriter"
class="org.springframework.batch.item.database.JdbcBatchItemWriter"
scope="step">
<bean:property name="dataSource" ref="w" />
<bean:property name="sql"
value="update  IMS_BIZ_ARGS t  set t.BIZ_TYPE_PROP_NAME = ? where   t.BIZ_UUID  = ? " />
<bean:property name="itemPreparedStatementSetter">
<bean:bean
class="com.citiccard.channel.sts.mqbatch.UpdateSqlSetter" />
</bean:property>
</bean:bean>
</bean:beans>
<!-- 我把name值加了一个haha,哈哈哈 -->
public class UpdateSqlSetter implements
ItemPreparedStatementSetter<BizArgs> {
@Override
public void setValues(BizArgs bigargs, PreparedStatement ps) throws SQLException {
ps.setString(1, bigargs.getBizTypePropName()+"haha");
ps.setString(2, bigargs.getBizUuid());
}
}


4:完了,小试一把。


springbatch apache-activemq 整合(往mq中put数据,从mq中take数据)

标签:springbatch apache-activemq

原文地址:http://11760512.blog.51cto.com/11750512/1959848

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!