码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ - Work queues

时间:2017-10-06 13:53:16      阅读:137      评论:0      收藏:0      [点我收藏+]

标签:icc   void   rar   basic   ++   tag   static   thread   def   

Producer:

    private static void newTask(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE, true, false, false, null);

            String message = getMessage(args);
            channel.basicPublish("", TASK_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println("[x] send ‘" + message + "‘");
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 1)
            return "Hello World!";
        return joinStrings(strings, " ");
    }

    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0)
            return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }

build.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
javac -cp %CP% Sender.java -d .

run.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_producer.Sender hello,world
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_producer.Sender hello,world...

  

Consumer:

private static void taskWorker() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        try {
            channel.queueDeclare(TASK_QUEUE, true, false, false, null);
            System.out.println("[*] Waiting for message, to exit press CTRL+C");
            channel.basicQos(1);

            final Consumer consumer = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("[x] Receive message ‘" + message + "‘");
                    try {
                        doWork(message);
                    } finally {
                        System.out.print("[x] Done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }

            };
            channel.basicConsume(TASK_QUEUE, false, consumer);
        } catch (Exception e) {
            // TODO: handle exception
        }

        // System.in.read();

        // channel.close();
        // connection.close();
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == ‘.‘) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

build.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
javac -cp %CP% Receiver.java -d .

run.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_consumer.Receiver > log.txt
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_consumer.Receiver

  

RabbitMQ - Work queues

标签:icc   void   rar   basic   ++   tag   static   thread   def   

原文地址:http://www.cnblogs.com/jmbkeyes/p/7631449.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!