码迷,mamicode.com
首页 > 其他好文 > 详细

Rocket MQ 1 - 用

时间:2018-12-05 20:58:30      阅读:188      评论:0      收藏:0      [点我收藏+]

标签:end   unique   tps   send   class   client   one   proc   hat   

参考 http://www.iocoder.cn/categories/RocketMQ/ ; https://www.jianshu.com/nb/16219849

首先上启动方法,分别启动namesrv/broker/procedure/consumer

    public static void main(String[] args) throws Exception {
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        NamesrvConfig namesrvConfig = new NamesrvConfig();
        nettyServerConfig.setListenPort(9876);
        NamesrvController nameSrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
        nameSrvController.initialize();
        nameSrvController.start();
        Thread.sleep(1000000);
    }
    public static void main(String[] args) throws Exception {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(10911);
        final BrokerConfig brokerConfig = new BrokerConfig();
        brokerConfig.setBrokerName("broker-a");
        brokerConfig.setAutoCreateTopicEnable(true);
        brokerConfig.setNamesrvAddr("127.0.0.1:9876");
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
        messageStoreConfig.setDeleteWhen("04");
        messageStoreConfig.setFileReservedTime(48);
        messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
        messageStoreConfig.setDuplicationEnable(false);
        
        BrokerController brokerController = new BrokerController(
                brokerConfig,
                nettyServerConfig,
            new NettyClientConfig(),
            messageStoreConfig);
        assertThat(brokerController.initialize());
        brokerController.start();
        Thread.sleep(1000000);
        brokerController.shutdown();
    }
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagB" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

 

Rocket MQ 1 - 用

标签:end   unique   tps   send   class   client   one   proc   hat   

原文地址:https://www.cnblogs.com/it-worker365/p/10039419.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!