标签:consumer 后台 The shared 延迟 消息发布 密钥 rpo 模式
Pulsar官方文档 概念和架构-Messaging Concepts中主要内容1 消息组成
组成 | 说明 |
---|---|
Value / data payload | 消息携带的数据,所有pulsar的消息携带原始bytes,但是消息数据也需要遵循数据shcema |
Key | 消息可以被Key打标签。这可以对topic压缩之类的事情起作用 |
Properties | 用户定义属性的可选键/值映射 |
Producer name | 生成消息的生产者的名称(生产者自动被赋予默认名称,但您也可以显式地应用自己的名称) |
Sequence ID | 每一个消息在其主题上都属于一个有序序列。消息的序列ID是它在该序列中的顺序 |
Publish time | 消息发布时的时间戳(由生产者自动应用) |
Event time | 一个可选的时间戳,应用程序可以附加到消息上,表示某个事件发生的时间,例如消息被处理的时间。如果未显式设置,则消息的事件时间为0。 |
2 生产者 发送模式
模式 | 说明 |
---|---|
同步发送 | 生产者将在发送每个消息后等待代理的确认。如果未收到确认,则生产者将认为发送操作失败 |
异步发送 | 生产者将把消息放入阻塞队列并立即返回。客户端库随后将消息发送到后台的代理。如果队列已满(最大大小可配置,则在调用API时,生产者可能会被阻止或立即失败,具体取决于传递给生产者的参数 |
2.1 消息压缩支持
LZ4
ZLIB
ZSTD
SNAPPY
2.2 支持批处理
如果启用批处理,则生产者将在单个请求中累积并发送一批消息。批处理大小是由消息的最大数量和最大发布延迟定义的。
3 消费者 接收模式
模式 | 说明 |
---|---|
同步接收 | 同步接收将被阻止,直到有消息可用 |
异步接收 | 异步接收将立即返回一个future值,例如Java中的CompletableFuture,该值在新消息可用时完成 |
3.1 监听
客户端库为用户提供侦听器实现。例如,Java客户机提供了一个MessageListener接口。在这个接口中,只要接收到新消息,就会调用received方法。
3.2 确认
当使用者成功使用消息时,使用者会向代理发送确认请求,以便代理丢弃该消息。否则,它将存储消息。
消息可以逐个确认,也可以累积确认。对于累积确认,消费者只需要确认它收到的最后一条消息。流中直至(包括)所提供消息的所有消息将不会重新传递给该使用者
( 累积确认不能与共享订阅模式一起使用,因为共享模式涉及多个对同一订阅具有访问权限的使用者)
在共享订阅模式下,可以单独确认消息。
3.3 否定确认
当使用者一次未成功使用消息,并且希望再次使用该消息时,使用者可以向代理发送否定的确认,然后代理将重新传递该消息。
消息可以被一个接一个地否定或累积地承认,这取决于消费订阅模式。
在独占订阅模式和故障转移订阅模式中,消费者只会消极地确认他们收到的最后一条消息。
在“共享”和“密钥共享”订阅模式中,您可以分别对消息进行否定性确认
3.4 确认超时
当消息未成功使用,并且您希望触发代理自动重新传递消息时,可以采用未确认消息自动重新传递机制。客户端将在整个确认超时时间范围内跟踪未确认消息,并在指定确认超时时自动向代理发送重新传递未确认消息请求
注意
在确认超时之前使用否定确认。否定确认以更精确的方式控制单个消息的重新传递,并在消息处理时间超过确认超时时避免无效的重新传递。
4 死信主题
在某些消息无法由消费者使用时,会成生新的消息。在这种机制中,无法使用的消息存储在一个单独的主题中,称为死信主题。您可以决定如何处理死信主题中的消息
下面的示例演示如何使用默认的死信主题在Java客户机中启用死信主题
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
默认的死信主题使用以下格式:
<topicname>-<subscriptionname>-DLQ
如果要指定死信主题的名称,请使用以下Java客户端示例:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("your-topic-name")
.build())
.subscribe();
死信主题取决于邮件的重新传递。由于确认超时或否定确认,消息将重新传递。如果要对消息使用否定确认,请确保在确认超时之前对其进行否定确认。
注意
目前,死信主题仅在共享订阅模式下启用
标签:consumer 后台 The shared 延迟 消息发布 密钥 rpo 模式
原文地址:https://blog.51cto.com/14602923/2454647