xread 命令
从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项,用于等待可用的项目,类似于
BRPOP
或者
BZPOPMIN
等等。
语法
XREAD [COUNT count][BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count:数量。
- milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式。
- key :队列名。
- id:消息 ID。
返回值
该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组(键名和为该键报告的条目)。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用
XADD
添加它们的顺序完全一致。
当使用
BLOCK
时,超时时将返回一个空回复(
nil
)。
非阻塞使用
如果未提供
BLOCK
选项,此命令是同步的,并可以认为与
XRANGE
有些相关:它将会返回流中的一系列项目,但与
XRANGE
相比它有两个基本差异(如果我们只考虑同步使用):
-
如果我们想要从多个键同时读取,则可以使用多个流调用此命令。这是
XREAD
的一个关键特性,因为特别是在使用 BLOCK 进行阻塞时,能够通过单个连接监听多个键是一个至关重要的特性。 -
XRANGE
返回一组ID中的项目,XREAD
更适合用于从第一个条目(比我们到目前为止看到的任何其他条目都要大)开始使用流。因此,我们传递给XREAD
的是,对于每个流,我们从该流接收的最后一个条目的ID。
例如,如果我有两个流
mystream
和
writers
,并且我希望同时从这两个流中读取数据(从它们的第一个元素开始),我可以像下面这样调用
XREAD
:
请注意:我们在例子中使用了 COUNT 选项,因此对于每一个流,调用将返回每个流最多两个元素。
redis> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) "1532" 3) "event-id" 4) "5" 5) "user-id" 6) "7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) "812" 3) "event-id" 4) "9" 5) "user-id" 6) "388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
STREAMS 选项是强制的,并且必须是最后一个选项,因为此选项以下列格式获取可变长度的参数:
STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N
所以我们以一组流的key开始,并在后面跟着所有关联的ID,表示 我们从该流中获取的最后ID ,以便调用仅为我们提供同一流中具有更大ID的条目。
例如,在上面的例子中,我们从流
mystream
中接收的最后项目的ID是
1526999352406-0
,而对于流
writers
,我们接收的最后项目的ID是
1526985685298-0
。
要继续迭代这两个流,我将调用:
redis> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0 1) 1) "mystream" 2) 1) 1) 1526999626221-0 2) 1) "duration" 2) "911" 3) "event-id" 4) "7" 5) "user-id" 6) "9488232" 2) 1) "writers" 2) 1) 1) 1526985691746-0 2) 1) "name" 2) "Toni" 3) "surname" 4) "Morris" 2) 1) 1526985712947-0 2) 1) "name" 2) "Agatha" 3) "surname" 4) "Christie"
以此类推,最终,调用不再返回任何项目,只返回一个空数组,然后我们就知道我们的流中没有更多数据可以获取了(我们必须重试该操作,因此该命令也支持阻塞模式)。
不完全ID
使用不完整的ID是有效的,就像它对
XRANGE
一样有效。但是这里ID的序列号部分,如果缺少,将总是被解释为0,所以命令:
redis> XREAD COUNT 2 STREAMS mystream writers 0 0
完全等同于
redis> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
阻塞数据
在同步形式中,只要有更多可用项,该命令就可以获取新数据。但是,有些时候,我们不得不等待数据生产者使用
XADD
向我们消费的流中推送新条目。为了避免使用固定或自适应间隔获取数据,如果命令根据指定的流和ID不能返回任何数据,则该命令能够阻塞,并且一旦请求的key之一接收了数据,就会自动解除阻塞。
重要的是需要理解这个命令是 扇形分发 到所有正在等待相同ID范围的客户端,因此每个消费者都将得到一份数据副本,这与使用阻塞列表pop操作时发生的情况不同。
为了阻塞,使用 BLOCK 选项,以及我们希望在超时前阻塞的毫秒数。通常Redis阻塞命令的超时时间单位是秒,但此命令拥有一个毫秒超时时间,虽然通常服务器的超时时间精度大概在0.1秒左右。这可以在某些用例中阻塞更短的时间,并且如果服务器内部结构随着时间的推移而改善,它的超时精度可能也会有提升。
当传递了 BLOCK 选项,但是在传递的流中没有任何流有数据返回,那么该命令是同步执行的, 就像没有指定BLOCK选项一样 。
这是阻塞调用的例子,其中命令稍后将返回空回复(
nil
),因为超时时间已到但没有新数据到达:
redis> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0 (nil)
特殊的ID
$
有时阻塞我们只希望接收从我们阻塞的那一刻开始通过
XADD
添加到流的条目。在这种情况下,我们对已经添加条目的历史不感兴趣。对于这个用例,我们不得不检查流的最大元素ID,并使在
XREAD
命令行中使用这个ID。这不纯粹并且需要调用其他命令,因此可以使用特殊的ID
$
来表明我们只想要流中的新条目。
你应该仅在第一次调用
XREAD
时使用
$
,理解这一点
非常重要
。后面的ID你应该使用前一次报告的项目中最后一项的ID,否则你将会丢失所有添加到这中间的条目。
这是典型的
XREAD
调用(在想要仅消费新条目的消费者的第一次迭代)看起来的样子:
redis> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
一旦我们得到了一些回复,下一次调用会是像这样的:
redis> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3
依此类推。
如何在单个流上阻止多个客户端
列表或有序集合上的阻塞列表操作有 pop 行为。基本上,元素将从列表或有序集合中删除,以便返回给客户端。在这种场景下,你希望项目以公平的方式被消费,具体取决于客户端在给定key上阻止的时刻到达。通常在这种用例中,Redis使用FIFO语义。
但请注意,对于流这不是问题:当服务客户端时,不会从流中删除条目,因此只要
XADD
命令向流提供数据,就会提供给每个等待的客户端。