xreadgroup 命令
XREADGROUP
命令是
XREAD
命令的特殊版本,支持消费者组。在阅读本页之前,你可能必须先理解
XREAD
命令才有意义。
语法
XREADGROUP GROUP group consumer[COUNT count][BLOCK milliseconds][NOACK] STREAMS key[key ...] ID[ID ...]
消费组消息(消费的消息会记录在pending列表里,等待xack确认,需要先创建消费组);
- [COUNT count]:设置获取消费消息数量。
- [BLOCK milliseconds]:设置阻塞等待时间(ms)(通常跟>一起使用获取新消息)。
- [ID ...]:读取id之后的消息,id为0可获取已读但未确认的消息(可多次消费,不会获得未读取的消息),id为>表示读取组内未读取的消息(同组内成员会消费不同消息)。
- group:消费组名。
- consumer:消费者名。
- count:读取数量。
- milliseconds:阻塞毫秒数。
- key:队列名。
- ID:消息 ID。
快速了解消费者组
此命令与
XREAD
的区别是它支持
消费者组
。
如果没有消费者组,仅使用
XREAD
,所有客户端都将获得所有到达流的条目。相反,如果使用带有
XREADGROUP
的消费者组,则可以创建不同的客户端组来消费到达给定流的不同的部分。例如,如果流获得新的条目A,B和C,并且有两个消费者通过消费者组读取流,其中一个客户端将会得到例如,消息A和C,另外一个客户端得到消息B,等等,以此类推。
在消费者组中,给定的消费者(即从流中消费消息的客户端)必须使用唯一的 消费者名称 进行标识。名称只是一个字符串。
消费者组的保证之一是,给定的消费者只能看到发送给它的历史消息,因此每条消息只有一个所有者。然而,还有一个特殊的特性叫做
消息认领
,其允许其他消费者在某些消费者无法恢复时认领消息。为了实现这样的语义,消费者组要求消费者使用
XACK
命令显式确认已成功处理的消息。这是必要的,因为流将为每个消费者组跟踪哪个消费者正在处理什么消息。
这是如何理解您是否要使用消费者组:
- 如果你有一个流和多个客户端,并且你希望所有的客户端都获取到完整的信息,那么你不需要使用消费者组。
- 如果你有一个流和多个客户端,并且你希望在你的客户端上对流进行 分区 或 共享 ,以便每个客户端都能获得一个到达流的消息的子集,那么你需要使用消费者组。
XREAD和XREADGROUP之间的差异
从语法的角度来看,这两个命令几乎是相同的,但是
XREADGROUP
需要
一个特殊和强制的选项:
GROUP <group-name> <consumer-name>
组名只是关联到流的消费者组的名称。该组是使用
XGROUP
命令创建的。消费者名称是客户端用于在消费者组内标识自己的字符串。消费者会在第一次出现在消费者组内时被自动创建。不同的消费者应该选择不同的消费者名称。
当你使用
XREADGROUP
读取时,服务器将会
记住
某个给定的消息已经传递给你:消息会被存储在消费者组内的待处理条目列表(PEL)中,即已送达但尚未确认的消息ID列表。
客户端必须使用
XACK
确认消息处理,以便从待处理条目列表中删除待处理条目。可以使用
XPENDING
命令检查待处理条目列表。
使用
XREADGROUP
时在
STREAMS
选项中指定的ID可以是以下两种之一:
-
特殊ID
>
,意味着消费者希望只接收 从未发送给任何其他消费者 的消息。这意思是说,请给我新的消息。 -
任意其他的ID,即0或任意其他有效ID或不完整的ID(只有毫秒时间部分),将具有返回发送命令的消费者的待处理条目的效果。所以,基本上如果ID不是
>
,命令将让客户端访问它的待处理条目(已发送给它,但尚未确认的条目)。
就像
XREAD
,
XREADGROUP
命令也可以以阻塞的方式使用。在这方面没有区别。
当消息被传递给消费者时,会发生什么?
两件事:
- 如果消息从未被发送给其他消费者,也即,如果我们正在谈论新消息,则创建待处理条目列表(PEL)。
-
相反,如果该消息已经发送给该消费者,并且它只是再次重新获取相同的消息,那么
最后送达时间
会被更新为当前时间,并且
送达次数
会加1。你可以使用
XPENDING
命令访问这些消息属性。
用法示例
通常,你使用这样的命令来获取新消息并处理它们。在伪代码中:
WHILE true entries = XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream > if entries == nil puts "Timeout... try again" CONTINUE end FOREACH entries AS stream_entries FOREACH stream_entries as message process_message(message.id,message.fields) # ACK the message as processed XACK mystream $GroupName message.id END END END
通过这种方式,例子中的消费者代码将会只获取新消息,处理它们,以及通过
XACK
确认它们。但是以上案例的代码是不完整的,因为它没有处理崩溃后的恢复事宜。如果我们在处理消息的过程中崩溃了,则我们的消息将继续保留在待处理条目列表中,因此我们可以通过给
XREADGROUP
初始ID为0并执行相同的循环来访问我们的消息历史。一旦提供的ID为0并且回复是一组空的消息,我们就知道我们已经处理并确认完了所有的待处理消息:我们可以开始使用
>
作为ID,以便获取新消息并重新加入正在处理新消息的消费者。
消费者组模式
当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有10条消息,三个消费者都可以消费到这10条消息。但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有10条消息,三个消费者分别消费其中的某些消息,比如消费者A消费消息1、2、5、8,消费者B消费消息4、9、10,而消费者C消费消息3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。如下图所示:
消费者组模式的支持主要由两个命令实现:
- XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息ID等操作。
- XREADGROUP,分组消费消息操作。
进行演示,演示时使用5个消息,思路是:创建一个Stream消息队列,生产者生成5条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:
# 生产者生成10条消息 redis> MULTI redis> XADD mq * msg 1 # 生成一个消息:msg 1 redis> XADD mq * msg 2 redis> XADD mq * msg 3 redis> XADD mq * msg 4 redis> XADD mq * msg 5 redis> EXEC 1) "1553585533795-0" 2) "1553585533795-1" 3) "1553585533795-2" 4) "1553585533795-3" 5) "1553585533795-4" # 创建消费组 mqGroup redis> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup OK # 消费者A,消费第1条 redis> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息 1) 1) "mq" 2) 1) 1) "1553585533795-0" 2) 1) "msg" 2) "1" # 消费者A,消费第2条 redis> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-1" 2) 1) "msg" 2) "2" # 消费者B,消费第3条 redis> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-2" 2) 1) "msg" 2) "3" # 消费者A,消费第4条 redis> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-3" 2) 1) "msg" 2) "4" # 消费者C,消费第5条 redis> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-4" 2) 1) "msg" 2) "5"
上面的例子中,三个在同一组 mpGroup 消费者A、B、C在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:
XGROUP CREATE mq mqGroup 0,用于在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该组从第一条消息开始消费。(意义与XREAD的0一致)。除了支持CREATE外,还支持SETID设置起始ID,DESTROY销毁组,DELCONSUMER删除组内消费者等操作。
XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >,用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。
可以进行组内消费的基本原理是,STREAM类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。
以上就是消费组的基础操作。