xadd 命令

XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,将使用流的条目自动创建 key。

一个条目是由一组键值对组成的,它基本上是一个小的字典。键值对以用户给定的顺序存储,并且读取流的命令(如 XRANGE 或者 XREAD)可以保证按照通过 XADD 添加的顺序返回。


语法

XADD key ID field value [field value ...]

参数 ID

ID 标识流内的给定条目。如果指定的 ID 参数是字符 * (星号 ASCII 字符),XADD 命令会自动为您生成一个唯一的 ID。但是,也可以指定一个良好格式的 ID,以便新的条目以指定的 ID 准确存储,虽然仅在极少数情况下有用。

ID 是由-隔开的两个数字组成的: 1526919030474-55 。两个部分数字都是 64 位的,当自动生成 ID 时,第一部分是生成 ID 的 Redis 实例的毫秒格式的 Unix 时间。第二部分只是一个序列号,以及是用来区分同一毫秒内生成的 ID 的。

ID 保证始终是递增的:如果比较刚插入的条目的 ID,它将大于其他任何过去的 ID,因此条目在流中是完全排序的。为了保证这个特性,如果流中当前最大的 ID 的时间大于实例的当前本地时间,将会使用前者,并将 ID 的序列部分递增。例如,本地始终回调了,或者在故障转移之后新主机具有不同的绝对时间,则可能发生这种情况。

当用户为 XADD 命令指定显式 ID 时,最小有效的 ID 是 0-1 ,并且用户必须指定一个比当前流中的任何 ID 都要大的 ID,否则命令将失败。通常使用特定 ID 仅在您有另一个系统生成唯一 ID(例如 SQL 表),并且您确实希望 Redis 流 ID 与该另一个系统的 ID 匹配时才有用。

上限流

可以使用 MAXLEN 选项来限制流中的最大元素数量。与使用 XADD 添加条目相比较,使用 MAXLEN 修整会很昂贵:流由宏节点表示为基数树,以便非常节省内存。改变由几十个元素组成的单个宏节点不是最佳的。因此可以使用以下特殊形式提供命令:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

在选项 MAXLEN 和实际计数中间的参数 ~ 的意思是,用户不是真的需要精确的 1000 个项目。它可以多几十个条目,但决不能少于 1000 个。通过使用这个参数,仅当我们移除整个节点的时候才执行修整。这使得命令更高效,而且这也是我们通常想要的。

返回值

该命令返回添加的条目的 ID。如果 ID 参数传的是 * ,那么 ID 是自动生成的,否则,命令仅返回用户在插入期间指定的相同的 ID。

示例

redis> XADD mystream * name Sara surname OConnor
"1575742009761-0"

redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1575742009762-0"

redis> XLEN mystream
(integer) 2

#- 是最小,+ 是最大

redis> XRANGE mystream - +
1) 1) "1575742009761-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1575742009762-0"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"

测试

第一步,插入数据 stm 是streams的 key , * 代表的是redis帮生成序列 id, stream的一条记录可以有多个字段及值,所以后面跟了 url和typo.

# xiaorui.cc

redis> xadd stm * url xiaorui.cc typo 1
redis> xadd stm * url xiaorui.cc typo 2
redis> xadd stm * url xiaorui.cc typo 3
redis> xadd stm * url xiaorui.cc typo 4
redis> xadd stm * url xiaorui.cc typo 5
redis> xadd stm * url xiaorui.cc typo 6
redis> xadd stm * url xiaorui.cc typo 7
redis> xadd stm * url xiaorui.cc typo 8
redis> xadd stm * url xiaorui.cc typo 9
redis> xadd stm * url xiaorui.cc typo 10

第二步,查询最小到最大的数据, - 是最小, + 是最大

redis> xrange stm - +
 1) 1) 1528271957975-0
    2) 1) "url"
       2) "xiaorui.cc"
       3) "typo"
       4) "1"
 2) 1) 1528271958716-0
    2) 1) "url"
       2) "xiaorui.cc"
       3) "typo"
       4) "2"

第三步,创建一个消费组

redis> xgroup create stm cg1 0-0
OK

第四步,创建一个消费者并且消费

redis> xreadgroup GROUP cg1 c1 count 1 streams stm >
1) 1) "stm"
   2) 1) 1) 1528271957975-0
         2) 1) "url"
            2) "xiaorui.cc"
            3) "typo"
            4) "1"

第五步,查看redis里的stream消费状态,可以看到当前只有一个消费者c1, pending状态的id只有一个。

redis> xinfo groups stm
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 1
redis> xinfo consumers stm cg1
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 46018

第六步,使用xack来确认该消息id,之后我们在通过xinfo groups发现pending为0了。

redis> xack stm cg1 1528271957975-0
(integer) 1
redis> xinfo groups stm
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

上面说了基本的用法,我们再试试在一个consumer group里,多个消费者并发去消费请求会出现了什么?

# xiaorui.cc

redis> xreadgroup GROUP cg1 c2 count 1 streams stm >
1) 1) "stm"
   2) 1) 1) 1528271958716-0
         2) 1) "url"
            2) "xiaorui.cc"
            3) "typo"
            4) "2"
redis> xreadgroup GROUP cg1 c1 count 1 streams stm >
1) 1) "stm"
   2) 1) 1) 1528271960027-0
         2) 1) "url"
            2) "xiaorui.cc"
            3) "typo"
            4) "3"

通过上面的 result 可以看到,两个消费者不会消费到同一个消息 id, > 表示从当前消费组的last_delivered_id后面开始读,每当消费者读取一条消息,last_delivered_id变量就会前进。这个get消息和set偏移量的过程是原子的,redis会帮你保证该组合指令的原子性。既然是消费队列,当消费端没有数据的时候,肯定不能忙轮询吧?所以说redis streams也支持类似long poll事件唤醒机制。 block 0 等于一直等待读事件,block 10 等待10s的意思。

下篇: xtrim 命令