码迷,mamicode.com
首页 > 编程语言 > 详细

【RabbitMQ系列】 Spring mvc整合RabbitMQ

时间:2018-02-06 20:28:45      阅读:221      评论:0      收藏:0      [点我收藏+]

标签:bytes   xmla   generic   image   1.2   生产者   download   ota   core   

一、linux下安装rabbitmq

1、安装erlang环境

wget http://erlang.org/download/otp_src_18.2.1.tar.gz  
tar xvfz otp_src_18.2.1.tar.gz   
cd otp_src_18.2.1
./configure   
make install 

2、安装RabbitMQ

wget http://www.rabbitmq.com/releases/rabbitmq-server/vx.x.x/rabbitmq-server-generic-unix-x.x.x.tar.xz  
//xy文件压缩工具
yum install xz  
//解压
xz -d rabbitmq-server-generic-unix-x.x.x.tar.xz  
tar -xvf rabbitmq-server-generic-unix-x.x.x.tar
//将其移动至/usr/local/下 按自己习惯
cp -r rabbitmq_server-x.x.x /usr/local/rabbitmq  
//改变环境变量 
vi /etc/profile
export PATH=/usr/local/rabbitmq/sbin:$PATH  
source /etc/profile
//启用MQ管理方式
rabbitmq-plugins enable rabbitmq_management   #启动后台管理  
rabbitmq-server -detached   #后台运行rabbitmq  
//设置端口号 可供外部使用
iptables -I INPUT -p tcp --dport 15672 -j ACCEPT  

3、添加用户和权限

//添加用户
rabbitmqctl add_user admin admin
//添加权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
//添加用户角色
rabbitmqctl set_user_tags admin administrator  

 

二、Spring mvc整合RabbitMQ

1、添加pom.xml依赖jar包

<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.5.RELEASE</version>
        </dependency>

2、添加配置applicationContext.xml

<!--配置rabbitmq开始-->
    <bean id="connectionFactoryMq" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="192.168.181.201"/>
        <property name="username" value="admin"/>
        <property name="password" value="admin"/>
        <property name="host" value="192.168.181.201"/>
        <property name="port" value="5672"/>
    </bean>
    <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
        <constructor-arg ref="connectionFactoryMq"/>
    </bean>
    <!--创建rabbitTemplate消息模板类-->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="connectionFactoryMq"/>
    </bean>
   <!--创建消息转换器为SimpleMessageConverter-->
    <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter">
    </bean>
    <!--创建持久化的队列-->
    <bean  id="queue" class="org.springframework.amqp.core.Queue">
        <constructor-arg index="0" value="testQueue"></constructor-arg>
        <constructor-arg index="1" value="true"></constructor-arg>
        <constructor-arg index="2" value="false"></constructor-arg>
        <constructor-arg index="3" value="true"></constructor-arg>
    </bean>
    <!--创建交换器的类型 并持久化-->
    <bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange">
        <constructor-arg index="0" value="testExchange"></constructor-arg>
        <constructor-arg index="1" value="true"></constructor-arg>
        <constructor-arg index="2" value="false"></constructor-arg>
    </bean>
    <util:map id="arguments">

    </util:map>
    <!--绑定交换器 队列-->
    <bean id="binding" class="org.springframework.amqp.core.Binding">
        <constructor-arg index="0" value="testQueue"></constructor-arg>
        <constructor-arg index="1" value="QUEUE"></constructor-arg>
        <constructor-arg index="2" value="testExchange"></constructor-arg>
        <constructor-arg index="3" value="testQueue"></constructor-arg>
        <constructor-arg index="4" value="#{arguments}"></constructor-arg>
    </bean>
    <!--用于接收消息的处理类-->
    <bean id="rqmConsumer" class="com.slp.mq.RmqConsumer"></bean>

    <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="rqmConsumer" />
        <property name="defaultListenerMethod" value="rmqProducerMessage"></property>
        <property name="messageConverter" ref="serializerMessageConverter"></property>
    </bean>
    <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列  queues可以传多个-->
    <bean id="listenerContainer"  class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="queues" ref="queue"></property>
        <property name="connectionFactory" ref="connectionFactoryMq"></property>
        <property name="messageListener" ref="messageListenerAdapter"></property>
    </bean>
    <bean id="rmqProducer" class="com.slp.mq.RmqProducer"></bean>
    <!--配置rabbitmq结束-->

3、消息实体类

package com.slp.mq;

import java.io.*;

/**
 * @author sanglp
 * @create 2018-02-06 14:00
 * @desc rabbit消息类
 **/
public class RabbitMessage implements Serializable {
    /**
     * 参数类型
     */
    private Class<?>[] paramTypes ;
    /**
     *  交换器
     */
    private String exchange;

    private Object[] params;
    /**
     * 路由key
     */
    private String routekey;

    public RabbitMessage() {
    }

    public RabbitMessage(String exchange,  String routekey,Object...params) {
        this.exchange = exchange;
        this.params = params;
        this.routekey = routekey;
    }

    @SuppressWarnings("rawtypes")
    public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
    {
        this.params=params;
        this.exchange=exchange;
        this.routekey=routeKey;
        int len=params.length;
        Class[] clazzArray=new Class[len];
        for(int i=0;i<len;i++) {
            clazzArray[i] = params[i].getClass();
        }
        this.paramTypes=clazzArray;
    }

    public byte[] getSerialBytes(){
        byte[] res = new byte[0];
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       ObjectOutput oos ;
       try {
           oos = new ObjectOutputStream(baos);
           oos.writeObject(this);
           oos.close();
           res = baos.toByteArray();
       } catch (IOException e) {
           e.printStackTrace();
       }

       return res;
   }

    public Class<?>[] getParamTypes() {
        return paramTypes;
    }

    public void setParamTypes(Class<?>[] paramTypes) {
        this.paramTypes = paramTypes;
    }

    public String getExchange() {
        return exchange;
    }

    public void setExchange(String exchange) {
        this.exchange = exchange;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }

    public String getRoutekey() {
        return routekey;
    }

    public void setRoutekey(String routekey) {
        this.routekey = routekey;
    }
}

 

4、生产者

package com.slp.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import javax.annotation.Resource;

/**
 * @author sanglp
 * @create 2018-02-06 14:19
 * @desc 生产者
 **/
public class RmqProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送信息
     * @param msg
     */
    public void sendMessage(RabbitMessage msg){
        System.out.println(rabbitTemplate.getConnectionFactory().getHost());
        System.out.println(rabbitTemplate.getConnectionFactory().getPort());
        System.out.println("msg"+msg);
        rabbitTemplate.convertAndSend(msg.getExchange(),msg.getRoutekey(),msg);
        System.out.println("发送完成");

    }
}

  

5、消费者

package com.slp.mq;

/**
 * @author sanglp
 * @create 2018-02-06 14:23
 * @desc 消费者
 **/
public class RmqConsumer {

    public void rmqProducerMessage(Object object){
        System.out.println("消费前");
        RabbitMessage rabbitMessage = (RabbitMessage) object;
        System.out.println(rabbitMessage.getExchange());
        System.out.println(rabbitMessage.getRoutekey());
        System.out.println(rabbitMessage.getParams().toString());
    }
}

  

6、测试类

package com.slp;

import com.slp.mq.RabbitMessage;
import com.slp.mq.RmqConsumer;
import com.slp.mq.RmqProducer;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

import java.util.HashMap;
import java.util.Map;

/**
 * @author sanglp
 * @create 2018-02-06 14:36
 * @desc mq测试类
 **/
public class MqTest {


    private RmqProducer rmqProducer ;
    private RmqConsumer rqmConsumer ;
    @Before
    public void setUp() throws Exception {
        //ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("D:/web-back/web-back/myweb/web/WEB-INF/applicationContext.xml");
        //context.start();

        String path="web/WEB-INF/applicationContext.xml";
        ApplicationContext context = new FileSystemXmlApplicationContext(path);
        rmqProducer = (RmqProducer) context.getBean("rmqProducer");
        rqmConsumer = (RmqConsumer)context.getBean("rqmConsumer");
    }
    @Test
    public void test(){
        String exchange = "testExchange";
        String routeKey ="testQueue";
        String methodName = "test";
       //参数
        for (int i=0;i<10;i++){
            Map<String,Object> param=new HashMap<String, Object>();
            param.put("data","hello");

            RabbitMessage  msg=new RabbitMessage(exchange,routeKey, methodName, param);
            //发送消息
            rmqProducer.sendMessage(msg);
        }

       // rqmConsumer.rmqProducerMessage(msg);

    }
}

 

运行结果:

没有开启消费者之前:

 技术分享图片

  

  

  

【RabbitMQ系列】 Spring mvc整合RabbitMQ

标签:bytes   xmla   generic   image   1.2   生产者   download   ota   core   

原文地址:https://www.cnblogs.com/dream-to-pku/p/8423350.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!