redis 实现消息队列的4种方案

Redis作为内存中的数据结构存储,常用作数据库、缓存和消息代理。Redis具有内置复制,Lua脚本,LRU驱逐,事务和不同级别的磁盘持久性,并通过哨兵模式(Sentinel)和集群模式(Cluster)自动分区。

  • 支持数据结构:字符串(string)、哈希散列(hash)、列表(list)、集合(set)、有序集合(sorted sets)、数据流(stream)。
  • 支持查询方式:位图(bitmaps),超级日志(hyperloglogs),地理信息(geo)。

MQ

消息队列(Message Queue,简称 MQ)。简单理解,生产者先将消息投递一个叫做【队列】的容器中,然后再从这个容器中取出消息,最后再转发给消费者。


消息队列使用场景

系统解耦

在分布式环境下,系统间的相互依赖,最终会会导致整个依赖关系混乱,特别在微服务环境下,会出现相互依赖,甚至是循环依赖的情况,对后期系统的拆分和优化都带来极大负担。那么我们就可以用MQ来进行处理。上游系统将数据投递到MQ,下游系统取MQ的数据进行消费,投递和消费可以用同步的方式处理,不会影响上游系统的性能。

异步处理

如果采用同步的方式,系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?引入消息队列,将不必要的业务逻辑异步处理。

流量削峰

消息中间件的存储能力能够有效的帮助消费者进行缓冲。试想下,正常流量下消费者能够愉快的进行消费,瞬时高峰流量来的时候,消费者消费能力跟不上,刚好阻塞在消息中间件,等峰值过后,消费者又能很快的将阻塞的消息进行消费。流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛!

数据分发

大部分开源的MQ中间件基本都支持一对多或者广播的模式,而且都可以根据规则选择分发的对象。这样上游的一份数据,众多下游系统中,可以根据规则选择是否接收这些数据,这样扩展性就很强了。

消息队列特点

先进先出

不能先进先出,都不能说是队列了。消息队列的顺序在入队的时候就基本已经确定了,一般是不需人工干预的。而且,最重要的是,数据是只有一条数据在使用中。这也是MQ在诸多场景被使用的原因。

发布/订阅

发布/订阅是一种很高效的处理方式,如果不发生阻塞,基本可以当做是同步操作。这种处理方式能非常有效的提升服务器利用率,这样的应用场景非常广泛。

持久化

持久化确保MQ的使用不只是一个部分场景的辅助工具,而是让MQ能像数据库一样存储核心的数据。

分布式

在现在大流量、大数据的使用场景下,只支持单体应用的服务器软件基本是无法使用的,支持分布式的部署,才能被广泛使用。而且,MQ的定位就是一个高性能的中间件。

消息队列模式

在JMS标准中,有两种消息模型P2P(Point toPoint)和Publish/Sub(Pub/Sub)。

P2P

点对点,一个发,一个消费。涉及到的角色发布者(Publisher)、消费者(Consumer)、消息队列(Queue)。

  • 一个消息只能被一个消费者消费,消费后会从队列里移除。
  • 发布者和消费者无关系,发布者发送消息的行为不会随消费者而改变。
  • 消费者消费完成消息,需要向队列Ack,消息队列发现消息消费成功即做消息移除。

Pub/Sub

发布订阅模式,一个发布,多方订阅。涉及到的角色有发布者(Publisher)、主题(Topic)、订阅者(Subscriber)。

  • 每个消息可以有多个消费者。
  • 针对某个主题(Topic)的订阅者,必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者必须保持运行的状态。


Redis消息队列

MQ应用有很多,比如 ActiveMQ、RabbitMQ、RocketMQ、Kafka等,但是也可以基于redis来实现。

  • 基于 Stream (流)数据类型。
  • 基于 List (列表)数据类型。
  • 基于 Sorted Set (有序集合)类型。
  • 使用 pub/sub (订阅/发布)模式。


基于stream

Redis Stream的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,消息是持久化的,Redis重启后,内容还在。

  • 每个 stream 都有唯一的名称,它就是 Redis的 key ,使用 xadd 指令追加消息时创建。
  • 每个消息都有唯一的 ID ,使用 xadd 指令追加消息时创建。
  • 每个 stream 都可以挂多个消费组,每个消费组会有个游标 last_delivered_id 在 stream 之上往前移动,表示当前消费组已经消费到哪条消息了。消费组使用 xgroup create 创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化 last_delivered_id 变量。每个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者者有一个组内唯一名称。
  • 每个消费者(Consumer)内部会有个状态变量 pending_ids ,它记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个 pending_ids 变量在Redis官方被称之为 PEL(Pending Entries List),这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。


增删改查

  • xadd :追加消息。
  • xdel :删除消息。这里的删除仅仅是设置了标志位,不影响消息总长度
  • xrange :获取消息列表,会自动过滤已经删除的消息。
  • xlen :消息长度。
  • del :删除Stream。


独立消费

我们可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有新消息时,甚至可以阻塞等待。Redis设计了一个单独的消费指令 xread ,可以将Stream当成普通的消息队列(list)来使用,可以完全忽略消费组(Consumer Group)的存在。


创建消费组

Stream通过 xgroup create 指令创建消费组(Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。


消费

Stream提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除。

Stream消息太多怎么办

读者很容易想到,要是消息积累太多,Stream的链表岂不是很长,内容会不会爆掉就是个问题了。xdel指令又不会删除消息,它只是给消息做了个标志位。Redis自然考虑到了这一点,所以它提供了一个定长Stream功能。在 xadd 的指令提供一个定长长度 maxlen ,就可以将老的消息干掉,确保最多不超过指定长度。

127.0.0.1:6379> xlen codehole
(integer)5

127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 1
1527855160273-0

127.0.0.1:6379> xlen codehole
(integer)3

我们看到Stream的长度被砍掉了。


消息如果忘记ACK会怎样?

Stream在每个消费者结构中保存了正在处理中的消息ID列表PEL,如果消费者收到了消息处理完了但是没有回复ack,就会导致PEL列表不断增长,如果有很多消费组的话,那么这个PEL占用的内存就会放大。


PEL如何避免消息丢失?

在客户端消费者读取Stream消息时,Redis服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是PEL里已经保存了发出去的消息ID。待客户端重新连上之后,可以再次收到PEL中的消息ID列表。不过此时xreadgroup的起始消息必须是任意有效的消息ID,一般将参数设为0-0,表示读取所有的PEL消息以及自last_delivered_id之后的新消息。


分区Partition

Redis没有原生支持分区的能力,想要使用分区,需要分配多个Stream,然后在客户端使用一定的策略来讲消息放入不同的stream。


<?php 

    //连接reids
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6379);

    //xadd:追加消息
    //xdel:删除消息,删除标志位,不影响消息总长度
    //xrange:消息列表,过滤已删除的消息
    //xlen:消息长度
    //del: 删除所有消息

    $redis->rawCommand('del','codehole');

    // 星号表示自动生成id,后面参数key,value
    $redis->rawCommand('xadd','codehole','*','name','user1','age','20');
    $redis->rawCommand('xadd','codehole','*','name','user2','age','18');
    $redis->rawCommand('xadd','codehole','*','name','user3','age','19');
    $redis->rawCommand('xadd','codehole','*','name','user4','age','19');

    //maxlen 定长长度,将老消息干掉,确保链表不会超过指定长度
    $redis->rawCommand('xadd','codehole','maxlen','3','*','name','user5','age','19');

    //XDEL codehole  id
    //$redis->rawCommand('xdel','codehole','1538561700640-0');

    $res = $redis->rawCommand('xlen','codehole');
    echo "<pre>";
    var_dump($res);
    echo '<br />';

    // -最小值 +最大值
    $res = $redis->rawCommand('xrange','codehole','-','+');
    print_r($res);
    echo '<br />';

    $id = $res[1][0];

    // 指定最小消息列表
    $res = $redis->rawCommand('xrange','codehole',$id,'+');
    // var_dump($res);
    // echo '<br />';

    // 指定最大消息列表
    $res = $redis->rawCommand('xrange','codehole','-',$id);
    // var_dump($res);
    // echo '<br />';
    
    // 指定最大消息列表
    $res = $redis->rawCommand('xrange','codehole','-',$id);
    // var_dump($res);
    // echo '<br />';

    /************************独立消费************************/

    //从stream中头部读取两条消息
    $res = $redis->rawCommand('xread','count','2','streams','codehole','0-0');
    // var_dump($res);
    // echo '<br />';

    //从尾部读取一条消息,这里不会返回任何消息
    $res = $redis->rawCommand('xread','count','1','streams','codehole','$');
    // var_dump($res);
    // echo '<br />';

    //block 0 表示永久阻塞,直到消息到来,block 1000表示阻塞1秒,如果1秒没新消息,返回null
    //从尾部阻塞等待消息到来,然后新开一个窗口塞消息,这时候阻塞解除返回新消息内容
    // $res = $redis->rawCommand('xread','block','0','count','1','streams','codehole','$');
    // var_dump($res);
    // echo '<br />';


    /************************消费组************************/


  // 星号表示自动生成id,后面参数key,value
    $redis->rawCommand('xadd','mq','*','msg','1');
    $redis->rawCommand('xadd','mq','*','msg','2');
    $redis->rawCommand('xadd','mq','*','msg','3');
    $redis->rawCommand('xadd','mq','*','msg','4');
    $redis->rawCommand('xadd','mq','*','msg','5');
    

    //创建消费组mqGroup  为消息队列 mq 从第一条开始消费
    $redis->rawCommand('xgroup','create','mq','mqGroup','0');

    //从从尾部开始消费
    //$redis->rawCommand('xgroup','create','mq','mqGroup','$');

    //消费者A,消费第1条
    $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');

    //消费者A,消费第2条
    $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');

    //消费者B,消费第3条
    $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerB', 'count', '1' ,'streams', 'mq', '>');

    //消费者A,消费第4条
    $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');

    //消费者c,消费第5条
    $res = $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerC', 'count', '1' ,'streams', 'mq', '>');

    //获取strarm信息
    $res = $redis->rawCommand('xinfo','stream','mq');
    echo "<pre>";
    print_r($res);
    echo '<br />';

    //获取strarm消费组信息
    $res = $redis->rawCommand('xinfo','groups','mq');
    print_r($res);
    echo '<br />';

    //同一个消费组有多个消费者,观察每个消费者的状态
    $res = $redis->rawCommand('xinfo','consumers','mq','mqGroup');
    print_r($res);
    echo '<br />';

    //mpGroup的Pending等待列表情况  + 0 10
    //使用 -:start +:end 10:count 选项可以获取详细信息
    //$res = $redis->rawCommand('xpending','mq','mqGroup');
    //$res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10');
    $res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA');
    print_r($res);
    echo '<br />';

    //通知消息处理结束,用消息ID标识
    $msg_id = $res[0][0];
    $res = $redis->rawCommand('xack','mq','mqGroup',$msg_id);
    print_r($res);
    echo '<br />';

    //再次查看Pending列表
    $res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA');
    print_r($res);
    echo '<br />';

    //转移超过36s的消息id到消费者B的Pending列表
    $redis->rawCommand('xclaim','mq','mqGroup','consumerB','36000',$msg_id);
   
  


基于List

使用 rpush lpush 操作入队列, lpop rpop 操作出队列。List支持多个生产者和消费者并发进出消息,每个消费者拿到都是 不同 的列表元素。

但是当队列为空时,lpop和rpop会一直空轮训,消耗资源;所以引入阻塞读 blpop brpop (b代表blocking),阻塞读在队列没有数据的时候进入休眠状态。一旦数据到来则立刻醒过来,消息延迟几乎为零。


空闲连接

如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常,还有重试。


缺点:

  • 做消费者确认ACK麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个Pending列表,保证消息处理确认。
  • 不能做广播模式,如pub/sub,消息发布/订阅模型
  • 不能重复消费,一旦消费就会被删除
  • 不支持分组消费


$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

//发送消息
$redis->lPush($list, $value);

//消费消息
while (true) {
    try {
        $msg = $redis->rPop($list);
        if (!$msg) {
            sleep(1);
        }
        //业务处理
     
    } catch (Exception $e) {
        echo $e->getMessage();
    }
}

上面代码会有个问题如果队列长时间是空的,那pop就不会不断的循环,这样会导致redis的QPS升高,影响性能。所以我们使用sleep来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是sleep会导致消息的处理延迟增加。这个问题我们可以通过blpop/brpop 来阻塞读取队列。blpop/brpop在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。还有一个需要注意的点是我们需要是用try/catch来进行异常捕获,如果一直阻塞在那里,Redis服务器一般会主动断开掉空链接,来减少闲置资源的占用。


使用pub/sub

  • subscribe :用于订阅信道。
  • publish :向信道发送消息。
  • unsubscribe :取消订阅。

此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。

优点

  • 典型的广播模式,一个消息可以发布到多个消费者。
  • 多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息。
  • 消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息。

缺点

  • 消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回。
  • 不能保证每个消费者接收的时间是一致的。
  • 若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时。
  • 可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。


    基于 Sorted Set

    Sortes Set(有序列表),类似于java的SortedSet和HashMap的结合体,一方面她是一个set,保证内部value的唯一性,另一方面它可以给每个value赋予一个score,代表这个value的

    排序权重。内部实现是“跳跃表”。

    有序集合的方案是在自己确定消息顺ID时比较常用,使用集合成员的Score来作为消息ID,保证顺序,还可以保证消息ID的单调递增。通常可以使用时间戳+序号的方案。确保了消息ID的单调递增,利用SortedSet的依据

    Score排序的特征,就可以制作一个有序的消息队列了。

    优点

    就是可以自定义消息ID,在消息ID有意义时,比较重要。

    缺点

    缺点也明显,不允许重复消息(因为是集合),同时消息ID确定有错误会导致消息的顺序出错。