xreadgroup 命令

XREADGROUP 命令是 XREAD 命令的特殊版本,支持消费者组。在阅读本页之前,你可能必须先理解 XREAD 命令才有意义。


语法

XREADGROUP GROUP group consumer[COUNT count][BLOCK milliseconds][NOACK] STREAMS key[key ...] ID[ID ...]

消费组消息(消费的消息会记录在pending列表里,等待xack确认,需要先创建消费组);

  • [COUNT count]:设置获取消费消息数量。
  • [BLOCK milliseconds]:设置阻塞等待时间(ms)(通常跟>一起使用获取新消息)。
  • [ID ...]:读取id之后的消息,id为0可获取已读但未确认的消息(可多次消费,不会获得未读取的消息),id为>表示读取组内未读取的消息(同组内成员会消费不同消息)。


  • group:消费组名。
  • consumer:消费者名。
  • count:读取数量。
  • milliseconds:阻塞毫秒数。
  • key:队列名。
  • ID:消息 ID。


快速了解消费者组

此命令与 XREAD 的区别是它支持 消费者组

如果没有消费者组,仅使用 XREAD ,所有客户端都将获得所有到达流的条目。相反,如果使用带有 XREADGROUP 的消费者组,则可以创建不同的客户端组来消费到达给定流的不同的部分。例如,如果流获得新的条目A,B和C,并且有两个消费者通过消费者组读取流,其中一个客户端将会得到例如,消息A和C,另外一个客户端得到消息B,等等,以此类推。

在消费者组中,给定的消费者(即从流中消费消息的客户端)必须使用唯一的 消费者名称 进行标识。名称只是一个字符串。

消费者组的保证之一是,给定的消费者只能看到发送给它的历史消息,因此每条消息只有一个所有者。然而,还有一个特殊的特性叫做 消息认领 ,其允许其他消费者在某些消费者无法恢复时认领消息。为了实现这样的语义,消费者组要求消费者使用 XACK 命令显式确认已成功处理的消息。这是必要的,因为流将为每个消费者组跟踪哪个消费者正在处理什么消息。

这是如何理解您是否要使用消费者组:

  1. 如果你有一个流和多个客户端,并且你希望所有的客户端都获取到完整的信息,那么你不需要使用消费者组。
  2. 如果你有一个流和多个客户端,并且你希望在你的客户端上对流进行 分区 共享 ,以便每个客户端都能获得一个到达流的消息的子集,那么你需要使用消费者组。

XREAD和XREADGROUP之间的差异

从语法的角度来看,这两个命令几乎是相同的,但是 XREADGROUP 需要 一个特殊和强制的选项:

GROUP <group-name> <consumer-name>

组名只是关联到流的消费者组的名称。该组是使用 XGROUP 命令创建的。消费者名称是客户端用于在消费者组内标识自己的字符串。消费者会在第一次出现在消费者组内时被自动创建。不同的消费者应该选择不同的消费者名称。

当你使用 XREADGROUP 读取时,服务器将会 记住 某个给定的消息已经传递给你:消息会被存储在消费者组内的待处理条目列表(PEL)中,即已送达但尚未确认的消息ID列表。

客户端必须使用 XACK 确认消息处理,以便从待处理条目列表中删除待处理条目。可以使用 XPENDING 命令检查待处理条目列表。

使用 XREADGROUP 时在 STREAMS 选项中指定的ID可以是以下两种之一:

  • 特殊ID > ,意味着消费者希望只接收 从未发送给任何其他消费者 的消息。这意思是说,请给我新的消息。
  • 任意其他的ID,即0或任意其他有效ID或不完整的ID(只有毫秒时间部分),将具有返回发送命令的消费者的待处理条目的效果。所以,基本上如果ID不是 > ,命令将让客户端访问它的待处理条目(已发送给它,但尚未确认的条目)。

就像 XREAD XREADGROUP 命令也可以以阻塞的方式使用。在这方面没有区别。

当消息被传递给消费者时,会发生什么?

两件事:

  1. 如果消息从未被发送给其他消费者,也即,如果我们正在谈论新消息,则创建待处理条目列表(PEL)。
  2. 相反,如果该消息已经发送给该消费者,并且它只是再次重新获取相同的消息,那么 最后送达时间 会被更新为当前时间,并且 送达次数 会加1。你可以使用 XPENDING 命令访问这些消息属性。

用法示例

通常,你使用这样的命令来获取新消息并处理它们。在伪代码中:

WHILE true
    entries = XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
    if entries == nil
        puts "Timeout... try again"
        CONTINUE
    end

    FOREACH entries AS stream_entries
        FOREACH stream_entries as message
            process_message(message.id,message.fields)

            # ACK the message as processed
            XACK mystream $GroupName message.id
        END
    END
END

通过这种方式,例子中的消费者代码将会只获取新消息,处理它们,以及通过 XACK 确认它们。但是以上案例的代码是不完整的,因为它没有处理崩溃后的恢复事宜。如果我们在处理消息的过程中崩溃了,则我们的消息将继续保留在待处理条目列表中,因此我们可以通过给 XREADGROUP 初始ID为0并执行相同的循环来访问我们的消息历史。一旦提供的ID为0并且回复是一组空的消息,我们就知道我们已经处理并确认完了所有的待处理消息:我们可以开始使用 > 作为ID,以便获取新消息并重新加入正在处理新消息的消费者。


消费者组模式

当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有10条消息,三个消费者都可以消费到这10条消息。但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有10条消息,三个消费者分别消费其中的某些消息,比如消费者A消费消息1、2、5、8,消费者B消费消息4、9、10,而消费者C消费消息3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。如下图所示:



消费者组模式的支持主要由两个命令实现:

  • XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息ID等操作。
  • XREADGROUP,分组消费消息操作。

进行演示,演示时使用5个消息,思路是:创建一个Stream消息队列,生产者生成5条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:

# 生产者生成10条消息
redis> MULTI
redis> XADD mq * msg 1 # 生成一个消息:msg 1
redis> XADD mq * msg 2
redis> XADD mq * msg 3
redis> XADD mq * msg 4
redis> XADD mq * msg 5
redis> EXEC
 1) "1553585533795-0"
 2) "1553585533795-1"
 3) "1553585533795-2"
 4) "1553585533795-3"
 5) "1553585533795-4"

# 创建消费组 mqGroup
redis> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
OK

# 消费者A,消费第1条
redis> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
1) 1) "mq"
   2) 1) 1) "1553585533795-0"
         2) 1) "msg"
            2) "1"
# 消费者A,消费第2条
redis> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-1"
         2) 1) "msg"
            2) "2"
# 消费者B,消费第3条
redis> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-2"
         2) 1) "msg"
            2) "3"
# 消费者A,消费第4条
redis> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-3"
         2) 1) "msg"
            2) "4"
# 消费者C,消费第5条
redis> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-4"
         2) 1) "msg"
            2) "5"


上面的例子中,三个在同一组 mpGroup 消费者A、B、C在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:

XGROUP CREATE mq mqGroup 0,用于在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该组从第一条消息开始消费。(意义与XREAD的0一致)。除了支持CREATE外,还支持SETID设置起始ID,DESTROY销毁组,DELCONSUMER删除组内消费者等操作。

XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >,用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。

可以进行组内消费的基本原理是,STREAM类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。

以上就是消费组的基础操作。

上篇: xgroup 命令

下篇: xack 命令