标签:channel config body 目的 引擎 文件路径 多个 port 持久
参考文档:
时间轴:
- 2019/10/31 安装centOS7虚拟机
- 2019/11/04 linux下安装rabbitMQ服务
- 2019/11/05 win下给安装php扩展amqp
- 2019/11/12 win下php连接linus下rabbitMQ及生产者产生消息
- 2019/11/13 解决页面访问消费者超时问题
- 后续觉得还是看不懂博客举的例子和文档,在B站看了下牧码人入门视频(基础概念,实例较水)
问题:
- 2019/11/05 php安装amqp扩展,Apache凉了, 扩展包下错了,PHP的architecture
- 2019/11/12 Windows 下 PHP AMQP 链接错误 报错信息:Fatal error: Uncaught AMQPConnectionException: Socket error: could not connect to host. in E:\PHPTutorial\WWW\index.php:11 Stack trace: #0 E:\PHPTutorial\WWW\index.php(11): AMQPConnection->connect() #1 {main} thrown in E:\PHPTutorial\WWW\index.php on line 11(解决:网页使用15672端口,客户端连接使用5672端口, 5672端口未开启)
- 2019/11/12 Deprecated: Function AMQPExchange::declare() is deprecated in E:\PHPTutorial\WWW\index.php on line 31 Exchange Status:1(解决:declare() 方法以及弃用,变为declareExchange())
- 2019/11/13 消费者取数据时,响应时间很长,响应出现500错误 , 但是消息被领走
- 错误信息: Internal Server Error
The server encountered an internal error or misconfiguration and was unable to complete your request.
Please contact the server administrator at admin@php.cn to inform them of the time this error occurred, and the actions you performed just before this error.
More information about this error may be available in the server error log.
- mq后台里队列有个消费者列表,在里面的话一般都能消费https://bbs.csdn.net/topics/392410948?list=9168275
- 原因:页面使用Apache访问,Apache有响应时间
- 解决:请求时通过cmd 进入php文件夹, php.exe 文件路径 ,来执行消费者文件
- 这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。
涉及基础:
1.消息队列解决哪些问题
a.异步处理 : 提高系统执行速度
b.应用解耦 : 分布式(eg:超买超卖,解耦降低粘合度)
c.流量削锋 : 高并发(eg:抢红包,秒杀MQ进行过滤)
d.日志处理 : 大数据(eg:统计广告位数据,异步将数据日志存入文件->日志收集 flume单线程 td-agent单线程 logstash可开多进程->消息中间件->进行流计算 实时统计 || 归档后离线计算后数据清洗导入mysql后可出报表)
...
2.virtual hosts 管理
a.即vhosts,相当于mysql的db
b.一般以/开头
c.对用户进行授权
3.Overview->Ports and contents
协议
|
监听端口
|
备注
|
http
|
15672
|
自身服务器页面访问端口
浏览器与rabbitMQ
服务器通信使用http协议
|
amqp
|
5672
|
通信协议,与rabbitMQ服务器通信使用的协议
|
clustering
|
25672
|
支持集群
|
1.简单队列
P----Queue----C
a.三个对象: 生产者,队列,消费者 11对应
b.发布消息: 链接mq服务->创建通道->声明队列->通过channel发布消息到队列
c.消费消息: 链接mq服务->创建通道->定义队列的消费者channel->监听队列
d.不足: 耦合性高, 生产者一一对应消费者; 不能实现多个消费者消费队列中的消息; 队列名需要同时变更
2.工作队列
|----C1
P----Queue----|----C2
|----C3
a.模型: 一个生产者对应多个消费者
d.出现原因: 简单队列11对应; 实际开发中,生产者发送消息毫不费力, 而消费者一般是要跟业务相 结合的, 消费者接受到消息后需要处理,可能需要花费时间, 队列就可能挤压很多消息
c.生产者: 链接mq->创建通道->声明队列->发布消息->关闭通道
d.消费者: 链接mq->创建通道->通道声明队列->定义消费者接收消息
e.消费者1和2处理消息均分,1偶数2奇数数据,这种现象叫轮询分发,不管谁忙谁闲都不会多给消息,任务消息总是你一个我一个
3.公平分发
|----C1
P----Queue----|----C2
|----C3
a.使用公平分发必须关闭自动应答(ack)改成手动反馈
b.生产者: 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次处理一个消息; 限制发送给同一个消费者不得超过一条消息
c.消费者2处理比消费者1多(能者多劳)
4.消息应答(autoAck布尔值,Message Acknowledge)
a.消息应答默认是打开的,默认为false
b. 为true时,自动确认模式, MQ将消息分发给消费者,该条消息就会将被从内存中消除
这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息!
c. 为false时,手动模式,如果有一个消费者挂掉,就会交付给其他消费者.rabbitMQ支持消息应答,告诉rabbitMQ这个消息已经处理,可以删除了,rabbitMQ将删除内存中的消息,如果MQ服务器挂了,消息仍会丢失!
5.消息持久化(durable布尔型)
a.已定义过的队列修改durable属性,是不可以的;尽管代码正确,也不会运行成功
b.rabbitMQ不允许对已存在的队列进行重新定义不同的参数信息
6.订阅模式
|----Queue1----C1
P----x----|
|----Queue2----C2
a.模型解读:
一个生产者,多个消费者;
每一个消费者都有自己的队列;
生产者没直接把消息发送至队列,而是发送到了交换机(转发器exchange);
每个队列都要绑定带交换机上;
生产者发送的消息经过交换机到达队列,实现一个消息被多个消费者消费
b.应用场景:
注册后需要发邮件和短信等操作
商品修改后搜索引擎及前台都需要变更
c.生产者: 声明交换机->发布消息到交换机->关闭通道->关闭连接
交换机未绑定队列时,发送消息到队列会导致消息丢失,因为交换机没有存储的能力, 在rabbitMQ中只有队列有存储能力
d.消费者: 连接->创建channel->声明队列->绑定队列到交换机->定义消费者
7.Exchange(交换机 转发器)
a.匿名转发,exchange为空(""), 上述订阅模式
b.Fanout(不处理路由键): 消息经过交换机,与其交换机绑定的队列都能收到消息
c.Direct(处理路由键): 发送消息时带一个路由key,若绑定队列key与路由key相匹配,则将消息转指相应队列
8.路由模式
type=direct
| |---error---|---Queue1----C1
P----x--------|
|----info---|
|---error---|---Queue2----C2
|-warning-|
a.生产者:链接->创建通道->声明交换机->发布消息->关闭通道->关闭连接
b.消费者:链接->创建通道->声明队列->绑定交换机->定义消费者
9.话题模式
type=topic
| |---*.orange.*---|---Queue1----C1
P----x--------|
|----*.*.rabbit---|
|------lazy.#-----|---Queue2----C2
a.将路由和某个模式匹配,通配符
b.井号匹配一个或多个; 星号匹配一个
c.例如: Goods.# =(Goods.insert/delete)
d.场景: 生产者管理商品(增删改查)
10.消息确认机制(事务+confirm)
a.rabbitMQ中可以通过持久化数据解决rabbitMQ服务器异常的数据丢失问题
b.背景: 生产者将消息发送出去后,消息到底有没有到达rabbitMQ服务,默认情况下是不知道的
c.两种方式确定: AMQP实现了事务机制;Confirm模式
11.事务机制(AMQP本身协议自带):
a.txSelect(将当前channel设置成transaction模式);txCommit(用于提交事务);txRollback(回滚事务)
b.实现: 使用try catch抛出异常及事务机制实现确保消息发送至rabbitMQ服务器
c.不足: 协议通信次数太多,会降低消息的吞吐量且耗时,每个信息都要提交一次相当于一次请求
12.Confirm模式(事务模式队列不能重复设置)
a.生产者端confirm模式的实现原理: 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使生产者知道消息已经正确到达目的队列了,如果消息和队列时可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息已经得到了处理
b.confirm模式的最大好处在于其是异步的;
c.开启confirm模式: channel.confirmSelect();
d.编程模式:
普通 发一条 waitForConfirms(),等待服务端确认,串行方法;
批量 发一批 waitForConfirms();
异步 提供一个回调方法
Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Channel发出的消息序号),我们需要自己为每个Channel维护一个unconfirm的消息序号集合,每发布一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录.从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构
生产者:链接->创建通道->声明队列->(通道添加监听)声明未确认的消息标识集合->发布成功后回调handleAck()函数,回执有问题执行handleNack()函数->关闭通道->关闭连接
rabbitMQ安装及基础
标签:channel config body 目的 引擎 文件路径 多个 port 持久
原文地址:https://www.cnblogs.com/qiusanqi/p/11934674.html