1.7.2 task进程使用消息队列

1.7.2新增特性,可将task进程单独设置为消息队列。带来的好处是:

任务排队容量增加

在维持worker进程异步的前提下,task进程可使用消息队列提升任务排队的容量,unix sock受到缓存区尺寸限制,而消息队列不受限制,可以利用到操作系统所有的内存。 如你的机器有32G内存,如果是unix sock一般缓冲区只有8M。如果你的任务很多,会堆积在socket缓存区中。当超过缓冲区时就会无法再投递新的任务。 而消息队列,只要操作系统有剩余内存,那一直可以投递新的任务到队列中。

支持外部程序投递任务

当前的swoole使用Unix Socket,只允许程序内部进行通信。采用消息队列后,拿到消息队列的key。其他程序就可以向此队列投递数据了。

实例

class SwooleTask
{
    protected $queueId;
    protected $workerId;
    protected $taskId = 1;

    const     SW_TASK_TMPFILE = 1;  //tmp file
    const   SW_TASK_SERIALIZE = 2;  //php serialize
    const    SW_TASK_NONBLOCK = 4;  //task

    const SW_EVENT_TASK = 7;

    /**
     * SwooleTask constructor.
     * @param $key
     * @param int $workerId
     * @throws Exception
     */
    function __construct($key, $workerId = 0)
    {
        $this->queueId = msg_get_queue($key);
        if ($this->queueId === false)
        {
            throw new \Exception("msg_get_queue() failed.");
        }
        $this->workerId = $workerId;
    }

    protected static function pack($taskId, $data)
    {
        $flags = self::SW_TASK_NONBLOCK;
        $type = self::SW_EVENT_TASK;
        if (!is_string($data))
        {
            $data = serialize($data);
            $flags |= self::SW_TASK_SERIALIZE;
        }
        if (strlen($data) >= 8180)
        {
            $tmpFile = tempnam('/tmp/', 'swoole.task');
            file_put_contents($tmpFile, $data);
            $data = pack('l', strlen($data)) . $tmpFile . "\0";
            $flags |= self::SW_TASK_TMPFILE;
            $len = 128 + 24;
        }
        else
        {
            $len = strlen($data);
        }

        return pack('lSsCCS', $taskId, $len, 0, $type, 0, $flags) . $data;
    }

    function dispatch($data)
    {
        $taskId = $this->taskId++;
        if (!msg_send($this->queueId, 2, self::pack($taskId, $data), false))
        {
            return false;
        }
        else
        {
            return $taskId;
        }
    }
}

echo "Sending text to msg queue.\n";
$task = new SwooleTask(0x70001002, 1);
//普通字符串
$task->dispatch("Hello from PHP!");

task进程是可以与swoole_server所有的客户端连接进行通信的,所以外部程序使用消息队列作为IPC,就可以与所有客户端连接进行通信。

使用方法

只需设置swoole_server::set参数即可。新增的参数如下:

  • task_ipc_mode => 1 | 2 | 3,默认为1就是普通的unix socket通信方式,2, 3就是使用消息队列模式。模式2和模式3的不同之处是,模式2支持定向投递,$serv->task($data, $task_worker_id) 这里可以指定投递到哪个task进程。模式3是完全争抢模式,task进程会争抢队列,将无法使用定向投递,即使指定了$task_worker_id,在模式3下也是无效的。

  • message_queue_key => 0x72000100 ,指定一个消息队列key。如果需要运行多个swoole_server的实例,务必指定。否则会发生数据错乱



  • ibrahim

    Task使用消息队列 使用场景是什么?

  • 第七星尘

    我想到的一个场景是:假如我需要按顺序地异步发邮件,那我就可以在我的client中提交异步任务,然后server就按顺序发出去

  • qhc39

    task执行时间较长,会大量堆积的场景吧;

  • 追求人生

    设置了以后用命令Ipcs -q看不到系统有这样的消息队列,调用stats(),也看不到task_queue_num' => 10, 'task_queue_bytes' => 65536这样的信息,感觉swoole的文档写的不严谨