× 警告!旧版文档已经暂停维护,请查看新版文档。点击前往新版文档

Process->useQueue

启用消息队列作为进程间通信。

bool swoole_process->useQueue(int $msgkey = 0, int $mode = 2, int $capacity = 8192);

useQueue方法接受2个可选参数。

  • $msgkey是消息队列的key,默认会使用ftok(__FILE__, 1)作为KEY
  • $mode通信模式,默认为2,表示争抢模式,所有创建的子进程都会从队列中取数据
  • $capacity单个消息长度,长度受限于操作系统内核参数的限制,默认为8192,最大不超过65536
  • 如果创建消息队列失败,会返回false。可使用swoole_strerror(swoole_errno()) 得到错误码和错误信息。

使用模式2后,创建的子进程无法进行单独通信,比如发给特定子进程。
$process对象并未执行start,也可以执行push/pop向队列推送/提取数据
消息队列通信方式与管道不可共用。消息队列不支持EventLoop,使用消息队列后只能使用同步阻塞模式

CygWin 环境不支持消息队列,请勿在此环境下使用

非阻塞

1.9.2或更高版本中增加了swoole_process::IPC_NOWAIT的支持,可将队列设置为非阻塞。在非阻塞模式下,队列已满调用push方法、队列已空调用pop方法时将不再阻塞立即返回。

//设置为非阻塞模式
$process->useQueue($key, $mode | swoole_process::IPC_NOWAIT);

示例

<?php
$worker_num = 2;
$process_pool = [];

$process= null;
$pid = posix_getpid();

function sub_process(swoole_process $worker)
{
    sleep(1); //防止父进程还未往消息队列中加入内容直接退出
    echo "worker ".$worker->pid." started".PHP_EOL;
    while($msg = $worker->pop()){
        if ($msg === false) {
            break;
        }
        $sub_pid = $worker->pid;
        echo "[$sub_pid] msg : $msg".PHP_EOL;
        sleep(1);//这里的sleep模拟任务耗时,否则可能1个worker就把所有信息全接受了
    }
    echo "worker ".$worker->pid." exit".PHP_EOL;
    $worker->exit(0);
}

$customMsgKey = 1;
$mod = 2 | swoole_process::IPC_NOWAIT;//这里设置消息队列为非阻塞模式

//创建worker进程
for($i=0;$i<$worker_num; $i++) {
    $process=new swoole_process('sub_process');
    $process->useQueue($customMsgKey, $mod);
    $process->start();
    $pid = $process->pid;
    $process_pool[$pid] = $process;
}

$messages = [
    "Hello World!",
    "Hello Cat!",
    "Hello King",
    "Hello Leon",
    "Hello Rose"
];
//由于所有进程是共享使用一个消息队列,所以只需向一个子进程发送消息即可
$process = current($process_pool);
foreach ($messages as $msg) {
    $process->push($msg);
}

swoole_process::wait();
swoole_process::wait();

echo "master exit".PHP_EOL;

  • NewbMiao

    参数列表有没有。。。

  • 小eyes

    swoole_process->useQueue,消息队列可以与事件循环结合? swoole_event_add($worker->pipe, function($pipe) use($worker) { $recv= $worker->pop(); $res = getInfo($recv); // send data to master $worker->push("{$recv} result is:".json_encode($res).PHP_EOL); }); 这样写不起作用

  • 左树文

    在下一节

  • jonliu

    发现swoole消息队列无法清空,请问有没有清空的函数。

  • 妖杀者

    freeQueue

  • hyb

    $this->process=new swoole_process([$this,'run'],false,true);

    if(!$this->process->useQueue(123)){
        print_r(swoole_strerror(swoole_errno()));exit;
    }
    $this->process->start();
    

    启动服务报错

    [2017-12-12 20:40:03 @9156.0] ERROR swMsgQueue_create(:48): msgget() failed. Error: Function not implemented[88]

  • 一场白雪

    我也报这个错误 ,什么解决的 ?

  • 15013532319

    swoole_process::wait(); 这句为什么要写两次?

  • 新用户(手机注册)

    我想问给为老铁你们看懂没

  • 15093565100

    swoole_process::wait(); 这句为什么要写两次?因为有两个子进程,$worker->exit(0);结束后需要回收2个子进程

  • 15072771268

    看的不太懂