标签:exchange host this moc result 参数 使用 esc data
# producer php bin/hyperf.php gen:amqp-producer DemoProducer # consumer php bin/hyperf.php gen:amqp-consumer DemoConsumer # 使用 command 盗用 DemoProducer 进行验证 php bin/hyperf.php gen:command TestCommand
producer 发个消息:
parent::__construct(‘t‘);
@Inject()
注解注入$this->producer->produce(new DemoProducer(‘test‘. date(‘Y-m-d H:i:s‘)));
<?php declare(strict_types=1); namespace App\Command; use App\Amqp\Producer\DemoProducer; use Hyperf\Amqp\Producer; use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Annotation\Command; use Hyperf\Di\Annotation\Inject; use Psr\Container\ContainerInterface; /** * @Command */ class TestCommand extends HyperfCommand { /** * @var ContainerInterface */ protected $container; /** * @Inject() * @var Producer */ protected $producer; public function __construct(ContainerInterface $container) { $this->container = $container; parent::__construct(‘t‘); } public function configure() { $this->setDescription(‘Hyperf Demo Command‘); } public function handle() { $this->producer->produce(new DemoProducer(‘test‘. date(‘Y-m-d H:i:s‘))); } }
愉快的玩耍起来:
# produce
php bin/hyperf.php t
# consume
php bin/hyperf.php start # 会使用 swoole process 启动 DemoConsumer
# 也可以访问 rabbitmq admin 控制台
http://localhost:15672
跟着 rabbitmq 官网 tutorial, 见识一下 hyperf 中的 amqp 有多简单
// consumer /** * @Consumer() */ class DemoConsumer extends ConsumerMessage { protected $exchange = ‘hello‘; protected $type = Type::FANOUT; protected $queue = ‘hello‘; public function consume($data): string { var_dump($data); return Result::ACK; } } // producer /** * @Producer() */ class DemoProducer extends ProducerMessage { protected $exchange = ‘hello‘; protected $type = Type::FANOUT; protected $routingKey = ‘hello‘; public function __construct($data) { $this->payload = $data; } } work queue 设置一下 nums 参数, 就可以多进程. // Consumer /** * @Consumer(nums=2) */ class DemoConsumer extends ConsumerMessage { protected $exchange = ‘task‘; protected $type = Type::FANOUT; protected $queue = ‘task‘; public function consume($data): string { var_dump($data); return Result::ACK; } } // producer /** * @Producer() */ class DemoProducer extends ProducerMessage { protected $exchange = ‘task‘; protected $type = Type::FANOUT; protected $routingKey = ‘task‘; public function __construct($data) { $this->payload = $data; } } pub/sub 和上面的 hello world 一致 routing 终于看到 routing_key 的作用了 // consumer /** * @Consumer() */ class DemoConsumer extends ConsumerMessage { protected $exchange = ‘routing‘; protected $type = Type::DIRECT; // 这个 consumer 只消费 error 级别的日志 protected $queue = ‘routing.error‘; protected $routingKey = ‘error‘; public function consume($data): string { var_dump($data); return Result::ACK; } } /** * @Consumer() */ class Demo2Consumer extends ConsumerMessage { protected $exchange = ‘routing‘; protected $type = Type::DIRECT; // 这个 consumer 消费所有级别的日志 protected $queue = ‘routing.all‘; protected $routingKey = [ ‘info‘, ‘warning‘, ‘error‘, ]; public function consume($data): string { var_dump($data); return Result::ACK; } } // producer /** * @Producer() */ class DemoProducer extends ProducerMessage { protected $exchange = ‘routing‘; protected $type = Type::DIRECT; public function __construct($data, $routingKey) { $this->routingKey = $routingKey; $this->payload = $data; } } // produce $this->producer->produce(new DemoProducer(‘info‘. date(‘Y-m-d H:i:s‘), ‘info‘)); $this->producer->produce(new DemoProducer(‘warning‘. date(‘Y-m-d H:i:s‘), ‘warning‘)); $this->producer->produce(new DemoProducer(‘error‘. date(‘Y-m-d H:i:s‘), ‘error‘)); var_dump(‘done‘); topics 和的, 和上面的 routing 差不多 // consume /** * @Consumer() */ class DemoConsumer extends ConsumerMessage { protected $exchange = ‘topics‘; protected $type = Type::TOPIC; protected $queue = ‘topics.t1‘; // protected $routingKey = ‘#‘; // all // protected $routingKey = ‘kern.*‘; // protected $routingKey = ‘*.critical‘; // protected $routingKey = ‘kern.critical‘; protected $routingKey = [ ‘kern.*‘, ‘*.critical‘, ]; public function consume($data): string { var_dump($data); return Result::ACK; } } // produce /** * @Producer() */ class DemoProducer extends ProducerMessage { protected $exchange = ‘topics‘; protected $type = Type::TOPIC; public function __construct($data, $routingKey) { $this->routingKey = $routingKey; $this->payload = $data; } }
标签:exchange host this moc result 参数 使用 esc data
原文地址:https://www.cnblogs.com/kinwing/p/13605680.html