码迷,mamicode.com
首页 > Web开发 > 详细

PHP使用RabbitMQ消息队列

时间:2019-10-29 09:58:16      阅读:290      评论:0      收藏:0      [点我收藏+]

标签:pass   ids   绑定   cin   php   开始   tde   cannot   nrv   

1、安装amqp拓展 安装流程

2、下载工具包 php-amqplib 

composer require php-amqplib/php-amqplib
 
3、代码操作如下
【消费消息】
 1 <?php 
 2 //配置信息 
 3 $conn_args = array( 
 4     ‘host‘ => ‘127.0.0.1‘,
 5     ‘port‘ => ‘5672‘,  
 6     ‘login‘ => ‘zcw‘,  
 7     ‘password‘ => ‘123456‘, 
 8     ‘vhost‘=>‘/‘ 
 9 );   
10 $e_name = ‘exchange1‘; //交换机名 
11 $q_name = ‘queue1‘; //队列名 
12 $k_route = ‘route1‘; //路由key 
13  
14 //创建连接和channel 
15 $conn = new AMQPConnection($conn_args);   
16 if (!$conn->connect()) {   
17     die("Cannot connect to the broker!\n");   
18 }   
19 $channel = new AMQPChannel($conn);   
20  
21 //创建交换机    
22 $ex = new AMQPExchange($channel);   
23 $ex->setName($e_name); 
24 $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型(常用的有fanout、direct、topic、headers)  
25 $ex->setFlags(AMQP_DURABLE); //持久化 
26 
27    
28 //创建队列    
29 $q = new AMQPQueue($channel); 
30 $q->setName($q_name);   
31 $q->setFlags(AMQP_DURABLE); //持久化  
32 
33 $total = $q->declareQueue();//获取所有的消息数量
34  
35 //绑定交换机与队列,并指定路由键 
36 $q->bind($e_name, $k_route); 
37  
38 //1、阻塞模式接收消息 
39 while(True){ 
40     $q->consume(‘processMessage‘);   
41     //$q->consume(‘processMessage‘, AMQP_AUTOACK); //自动ACK应答  
42 } 
43 
44 //2 非阻塞模式接收消息 可定时调用
45 //if($total){
46 //    for($i=0;$i<$total;$i++){
47 //        $envelope = $q->get();
48 //        if($envelope){
49 //             $msg = $envelope->getBody(); 
50 //             echo $msg."\n"; //处理消息 
51 //             $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答
52 //        }
53 //    }
54 //}
55 
56 $conn->disconnect();   
57  
58 /**
59  * 消费回调函数
60  * 处理消息
61  */ 
62 function processMessage($envelope, $queue) { 
63     $msg = $envelope->getBody(); 
64     echo $msg."\n"; //处理消息 
65     $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
66 }
67 ?>

 

【生产消息】

 1 <?php
 2 //配置信息 
 3 $conn_args = array( 
 4     ‘host‘ => ‘127.0.0.1‘,  
 5     ‘port‘ => ‘5672‘,  
 6     ‘login‘ => ‘zcw‘,
 7     ‘password‘ => ‘123456‘,
 8     ‘vhost‘=>‘/‘ 
 9 );   
10 $e_name = ‘exchange1‘; //交换机名 
11 //$q_name = ‘queue1‘; //无需队列名 
12 $k_route = ‘route1‘; //路由key
13  
14 //创建连接和channel 
15 $conn = new AMQPConnection($conn_args);   
16 if (!$conn->connect()) {   
17     die("Cannot connect to the broker!\n");   
18 }   
19 $channel = new AMQPChannel($conn);   
20 //创建交换机对象    
21 $ex = new AMQPExchange($channel);   
22 $ex->setName($e_name);   
23 //发送消息 
24 //$channel->startTransaction(); //开始事务  
25 for($i=0; $i<5; ++$i){
26    $ex->publish($message, $k_route)."\n";
27 }
28  
29 //$channel->commitTransaction(); //提交事务 
30  
31 $conn->disconnect();
32 
33 ?>

 

 

PHP使用RabbitMQ消息队列

标签:pass   ids   绑定   cin   php   开始   tde   cannot   nrv   

原文地址:https://www.cnblogs.com/guliang/p/11743229.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!