Pulsar消息处理模型

Apache Pulsar是一款支持多租户,高性能的端到端的消息中间件。最早由Yahoo开发,目前该项目已经是Apache基金会的项目之一。当前世面上已经有一些优秀的消息中间件,如Kafka、RabbitMQ,RocketMQ等,为什么Pulsar还会横空出世?很多人对Pulsar有一些误解,认为这只是另一种消息传输方式而已,但每个新鲜事物的诞生,必定有其要解决的场景,了解其背后设计的动机是非常值得的。本文侧重点主要给大家介绍Pulsar的消息处理模型,其更多特性后续另外介绍。

架构简述

Puslar其中一个核心特性便是跨集群同步,所以在了解部署架构时需要考虑多集群的场景。以下简单介绍其中的关键组件:

  • Pulsar Producer:消息生产者。一般为生产消息的业务应用
  • Pulsar Consumer:消息消费者。一般为消费消息的业务应用
  • Pulsar Instance:Pulsar实例。一个实例可以包含一个或多个Pulsar集群。
  • Pulsar Cluster:Pulsar集群。包含了Broker计算节点,Bookie存储节点,以及集群内的Zookeeper协调器
  • Pulsar Broker:Broker计算节点。负责接收消息及路由消息,以及一些核心管理功能,无状态设计方便动态扩缩容。
  • Bookkeeper Bookie:消息存储节点。采用Apache Bookkeeper项目负责数据的高可用存储,并且很方便容量扩容。
  • Zookeeper:负责状态协调及元数据管理。集群内的zk负责集群内的节点数据及协调,全局的zk主要负责多集群相关的功能协调管理。

image-20211117234156852

消息模型

批量消息

在某些场景下,每次单独发送一条消息效率太低了。为了解决这个问题,可以在发送端累积消息后合并这个一个请求,进行批量发送。在Pulsar里,当开启了批量消息的功能后,发送端累积的消息量超过一定的阈值后,会组合成一个批发送出去。在批量模式下,消息处理的过程中批是最小单位,到达消费端后消费者会把批解成多个消息,在单独消费。但是在延时消息的场景下,消息仍然会被单独处理。

通常来说批量处理有个问题,在消费端处理一批消息时有部分消息没有处理成功(处理失败或超过ACK时间),此时会将这批消息NACK,等待下一次重新投递时再次消费,再次处理这个批时会重新消费已经处理过的消息。Pulsar为了避免重复消费同一批消息中已经处理成功的部分消息,维护了一个batch index,记录了这一批消息已经消费成功的消息,并返回给broker端。broker端维护了这个batch index,已经确认过的消息不会再次投递,当这个batch index的所有消息都确认成功后才会移除这个一批消息。

image-20211117221342504

分片消息

在某些场景下,单条消息过大服务端会拒绝接收导致客户端发送失败。Pulsar默认消息体最大限制为5MB,当消息体过大时可以采用分块处理(chucking)。在发送端,当消息体大小超过最大限制时会进行分块切片,并分别发送每一块,及分片元数据信息到broker端。在消费端,需要暂时缓存分块数据在内存里,等待所有分块到齐后再进行组装成一个完整的消息体,然后进入消费端的接收队列中。需要注意的是,如果发送端没有成功发送所有分块,消费端在规定时间后会将这些未完整的分片标记为过期,默认过期时间为1小时。

当消费端成功消费完一个由分片组装当消息后并发送ACK,在消费端内部跟这个消息相关的分片也会标记为已ACK状态。为了避免分片暂用过多内存,Pulsar消费端还可以设置参数maxPendingChunkedMessage来控制消费端能缓存的分片数,当超过这个阈值后,消费端可以发送ACK并自动删除未组装完整的消息,或着发送NACK告诉broker端稍后再消费此消息。

一个生产者与一个顺序消费的消费者的分块消息处理

image-20211117221727373

多个生产者与一个顺序消费的消费者的分块消息处理

image-20211117221736860

Topic的写入模式

生产者在往Topic发送消息时,有3种访问模式(Access Mode)可以设置:

1)Shared

多个生产者都可以往同一个topic发送。这是默认设置。

2)Exclusive

在一个topic上,只有一个生产者可以发送消息。如果已经有一个生产者成功连上这个topic,其他生产者再访问这个topic会快速失败

只有该生产者与broker端形成网络分区时,才会会驱逐,这时候才有可能让其他生产者连接这个topic。

这种方式类似下文的 Pulsar的Topic订阅模式中的exclusive模式。

3)WaitForExclusive

同Exclusive一样,一个topic只能有一个生产者访问。如果该topic已经被一个生产者连接暂用,则其他都生产者的创建会挂起等待,直到其中一个有访问这个topic的权限。

Topic的订阅模式

在Topic与Consumer之前还有一个Subscription的概念。一个Topic可以与多个订阅绑定,满足的不同的消费者的订阅需求。多个订阅一般是在有不同的消费者需要消费同一分消息的情况下。

image-20211117233852488

为了灵活的控制消息投递,满足各类场景。Pulsar目前支持4种订阅模式(Subscription):

1)Exclusive

在这种模式下,只有一个消费者实例能关联这个订阅,其他消费者再关联会立即失败。这点与上文的Pulsar的Topic访问模式的Exclusive模式类似。

image-20211117233926645

2)Failover

在这种模式下,可以有多个消费者实例能关联这个订阅,其中只有有1个消费者会成为master消费消息,当master实例了,才会切换到其他消费者。这种方式类似Pulsar的Topic访问模式 的WaitForExclusive,这里有自动选主的机制。

image-20211118000747152

3)Shared

在这种模式下,绑定到同一个订阅的消费者会消费到同一个topic的消息,并且1条消息只会路由给其中一个消费者实例。这里默认的路由方式是轮询,这是一种负载均衡。注意在这种模式下不支持顺序消息,也不能累计确认

image-20211117234013565

4)Key_Shared

在这种模式下,绑定到统一订阅的消费者可以消费同一个topic的消息,与Shared模式不同的是,这里并不是轮询投递消息,格式根据消息体设置的Key来进行路由,同一个Key的会路由到同一个消费者实例。如果这个消费者实例失联了,这次会触发其他消费者承继消费。可以理解为这是基于Hash的一种负载均衡

image-20211117234036187

分区Topic

常规的Topic只会在1个broker节点上,这会限制这个topic的吞吐。分区Topic将常规的Topic拆分成多个Partition,并存在多个broker上,这能够提升这个Topic的吞吐量。当消息在发送的时候,会路由到某个broker上,在消费端会自动分配分区主题给消费者。

image-20211117234048787

路由方式

  1. RoundRobinPartition: 轮询方式。如果消息没有指定key,会轮询发送到各分区,注意不是单个消息轮询,而是跟批的数量保持一致,已保证批处理的高效性;如果指定了key,会hash到对应的分区。
  2. SinglePartition:单个分区。如果没有指定key,会随机挑选一个分区发送;如果指定key,会hash到对应的分区。
  3. CustomPartition:自定义策略,支持扩展。

顺序保证

消息的顺序依赖路由方式消息key的设置。顺序有两种方式:

  1. Per-key-partition:同一个key的消息会路由到同一个分区,保证顺序。支持RoundRobinPartition模式与SinglePartition模式
  2. Per-producer:同一个生产者的消息会路由到同一个分区,保证顺序。支持SinglePartition模式

总结

本文主要简单的介绍了Pulsar的消息处理模型,不同于Kafka或RocketMQ的Broker,Pulsar Broker没有存储消息数据,使得其功能特性变得更加的强大且清晰灵活。类似分片处理这种特性在很多消息中间件都没有支持到,在Pulsar却可以灵活支持。文章的消息处理模型有助于在开发过程中对一些概念的理解,在实现原理有总体的认识,再深入部分功能细节就方便得多。本文主要参考官方英文文档,一些使用细节可以自行了解。

References