码迷,mamicode.com
首页 > Web开发 > 详细

metaq实例

时间:2016-05-07 07:44:02      阅读:455      评论:0      收藏:0      [点我收藏+]

标签:

1 java客户端maven加载包
<dependency>
    <groupId>com.taobao.metamorphosis</groupId>
    <artifactId>metamorphosis-client</artifactId>
    <version>1.4.6.2</version>
</dependency>
2 消息会话工厂类和生产者、消费者
<bean id="mqContext" class="com.liukunzhou.dpp.commons.mqclient.MQContextFactory" init-method="start" factory-method="getMQContext" >
</bean>
以下是实现代码:
/**
 * MQ上下文工厂
 */
public class MQContextFactory {

public static Map<String,MQConfigs> configMap=new HashMap<String, MQConfigs>();
public static Map<String,IMQContext> contextMap=new HashMap<String, IMQContext>();


public static IMQContext getMQContext(MQType type,String configName,String name){
String key=type.name()+"/"+configName;
if(configMap.get(key)==null){
MQConfigs conf=XmlParser.parse(configName);
configMap.put(key, conf);
}
String key2=key+"/"+name;
if(contextMap.get(key2)==null){
MQConfigs conf=configMap.get(key);
IMQContext context;
MQConfig config=conf.getConfig(name);
if(config==null){
return null;
}
if(type==MQType.METAQ){
context=new MetaQContext(config);
}else{
return null;
}
contextMap.put(key2, context );
}

return contextMap.get(key2);
}

public static IMQContext getMQContext(String configName,String name){
return getMQContext(MQType.METAQ, configName, name);
}

public static IMQContext getMQContext(String name){
return getMQContext("/mqclient_config.xml", name);
}

public static IMQContext getMQContext(){
return getMQContext("/mqclient_config.xml", "default");
}

public static enum MQType{
METAQ
}
}


import java.util.ArrayList;
import java.util.List;


import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.liukunzhou.dpp.commons.mqclient.IMQContext;
import com.liukunzhou.dpp.commons.mqclient.IMsgListener;
import com.liukunzhou.dpp.commons.mqclient.IProducerHelper;
import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQConfig;
import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQConsumerConfig;
import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQProducerConfig;
import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQTopicConfig;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;


/**
 * MetaQ实现
 */
public class MetaQContext implements IMQContext {

Logger logger=LoggerFactory.getLogger(getClass());

protected final MessageSessionFactory sessionFactory ;

protected final MetaQProducerHelper producerHelper;

protected final MQConfig config;

protected List<MessageConsumer> consumers=new ArrayList<MessageConsumer>();

protected List<MessageProducer> producers=new ArrayList<MessageProducer>();

public MetaQContext(MQConfig config) {
this.config = config;
sessionFactory=createSessionFactory(config.getUrl(),config.getZkRoot());
producerHelper=new MetaQProducerHelper(createProducer());
producers.add(producerHelper.getProducer());
}


@Override
public IProducerHelper getProducerHelper() {
return producerHelper;
}

/**
* 创建SessionFactory
* @param url
* @return
*/
protected MessageSessionFactory createSessionFactory(String url,String zkRoot) {
MessageSessionFactory sessionFactory = null;
try {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = url;
if(StringUtils.isNotEmpty(zkRoot)){
zkConfig.zkRoot=zkRoot;
}
metaClientConfig.setZkConfig(zkConfig);
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
} catch (Exception e) {
logger.error("create SessionFactory error",e);
throw new RuntimeException(e);
}
return sessionFactory;
}

/**
* 获取生产者
* @return
*/
protected MessageProducer createProducer() {
return sessionFactory.createProducer();
}

/**
* 获取消费者
* @return
*/
protected MessageConsumer createConsumer(String group,int runnerNum,long delay) {
try {
ConsumerConfig config = new ConsumerConfig(group);
// 抓取线程数
config.setFetchRunnerCount(runnerNum);
config.setConsumeFromMaxOffset();
// 抓取间隔最大间隔时间(每次递增10%)
config.setMaxDelayFetchTimeInMills(delay);
return sessionFactory.createConsumer(config);
} catch (Exception e) {
logger.error("create Consumer exception", e);
throw new RuntimeException(e.getCause());
}
}


@Override
public void start() {
for(MQProducerConfig mc:config.getProducers()){
producerHelper.publish(mc.getTopic());
}
for(MQConsumerConfig mc:config.getConsumers()){
MetaQConsumerHelper consumer=new MetaQConsumerHelper(this.createConsumer(mc.getGroup(), mc.getRunnerNum(), mc.getDelay()));
for(MQTopicConfig tc:mc.getTopics()){
IMsgListener listener;
try {
listener = (IMsgListener)Class.forName(tc.getListener()).newInstance();
} catch (Exception e) {
logger.error("init listener ("+tc+") error",e);
continue;
}
consumer.subscribe(tc.getName(), tc.getMaxSize(), listener );
}
consumer.completeSubscribe();
consumers.add(consumer.getConsumer());
}
}


@Override
public void stop() {
try {
for(MessageConsumer c:consumers){
c.shutdown();
}
for(MessageProducer c:producers){
c.shutdown();
}
} catch (MetaClientException e) {
logger.error("stop exception", e);
}
}
}
3 配置文件
/**
 * 配置文件解析器
 */
public class XmlParser {


public static MQConfigs parse(String configName){
Digester dig = new Digester();


// push 调用类
dig.push(new MQConfigs());


// 设置匹配规则处理类
dig.setRules(new ExtendedBaseRules());

// 遇到config结点时创建MQConfig对象 以下调用有顺序
dig.addObjectCreate("mqconfigs/config", MQConfig.class);
//处理属性
dig.addSetProperties("mqconfigs/config");
// 处理子结点
dig.addBeanPropertySetter("mqconfigs/config/?");

//注册生产主题
dig.addObjectCreate("mqconfigs/config/producer", MQProducerConfig.class);
dig.addSetProperties("mqconfigs/config/producer");
dig.addBeanPropertySetter("mqconfigs/config/producer/?");

//消费者
dig.addObjectCreate("mqconfigs/config/consumer", MQConsumerConfig.class);
dig.addSetProperties("mqconfigs/config/consumer");
dig.addBeanPropertySetter("mqconfigs/config/consumer/?");

//主题
dig.addObjectCreate("mqconfigs/config/consumer/topic", MQTopicConfig.class);
dig.addSetProperties("mqconfigs/config/consumer/topic");
dig.addBeanPropertySetter("mqconfigs/config/consumer/topic/?");
dig.addBeanPropertySetter("mqconfigs/config/consumer/topic/?");

dig.addSetNext("mqconfigs/config/consumer/topic", "addTopic");

dig.addSetNext("mqconfigs/config/consumer", "addConsumer");

dig.addSetNext("mqconfigs/config/producer", "addProducer");

// 遇到model结点时,调用XmlTest的addModel方法(参数为XmlModel)
dig.addSetNext("mqconfigs/config", "addConfig");


try {
return (MQConfigs) dig.parse(XmlParser.class.getResourceAsStream(configName));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<mqconfigs>
<config name="158" url="192.168.1.158:2181" >
<consumer group="meta-commonBiz" runnerNum="6" delay="200">
<topic name="ORDERBIZ" maxSize="2048" listener="com.liukunzhou.dpp.commons.mqclient.t.MyMsgListener" />
</consumer>
<producer topic="ORDERBIZ"/>
</config>

<config name="133" url="192.168.1.133:2181" zkRoot="/meta-order">
<consumer group="meta-commonBiz" runnerNum="6" delay="200">
<topic name="TEST" maxSize="2048" listener="com.liukunzhou.dpp.commons.mqclient.t.MyMsgListener" />
</consumer>
<producer topic="TEST"/>
</config>
</mqconfigs>

metaq实例

标签:

原文地址:http://blog.csdn.net/lius007/article/details/51332043

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!