标签:end unique tps send class client one proc hat
参考 http://www.iocoder.cn/categories/RocketMQ/ ; https://www.jianshu.com/nb/16219849
首先上启动方法,分别启动namesrv/broker/procedure/consumer
public static void main(String[] args) throws Exception { NettyServerConfig nettyServerConfig = new NettyServerConfig(); NamesrvConfig namesrvConfig = new NamesrvConfig(); nettyServerConfig.setListenPort(9876); NamesrvController nameSrvController = new NamesrvController(namesrvConfig, nettyServerConfig); nameSrvController.initialize(); nameSrvController.start(); Thread.sleep(1000000); }
public static void main(String[] args) throws Exception { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(10911); final BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setBrokerName("broker-a"); brokerConfig.setAutoCreateTopicEnable(true); brokerConfig.setNamesrvAddr("127.0.0.1:9876"); final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setDeleteWhen("04"); messageStoreConfig.setFileReservedTime(48); messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); messageStoreConfig.setDuplicationEnable(false); BrokerController brokerController = new BrokerController( brokerConfig, nettyServerConfig, new NettyClientConfig(), messageStoreConfig); assertThat(brokerController.initialize()); brokerController.start(); Thread.sleep(1000000); brokerController.shutdown(); }
public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest" /* Topic */, "TagB" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }
标签:end unique tps send class client one proc hat
原文地址:https://www.cnblogs.com/it-worker365/p/10039419.html