× 警告!旧版文档已经暂停维护,请查看新版文档。点击前往新版文档

在多个协程间共用同一个协程客户端

与同步阻塞程序不同,协程是并发处理请求的,因此同一时间可能会有很多个请求在并行处理,一旦共用客户端连接,就会导致不同协程之间发生数据错乱。

❌错误的代码

$server = new Swoole\Http\Server('127.0.0.1', 9501);

$server->on('Receive', function ($serv, $fd, $rid, $data) {
    $redis  = RedisFactory::getRedis();
    $result = $redis->hgetall('key');
    $resp->end(var_export($result, true));
});

$server->start();

class RedisFactory
{
    private static $_redis  = null;

    public static function getRedis()
    {
        if (null === self::$_redis) {
            $redis  = new \Swoole\Coroutine\Redis();
            $redis->connect('127.0.0.1', 6379);
            self::$_redis   = $redis;
        }
        return self::$_redis;
    }
}

✅正确的代码

基于SplQueue实现协程客户端的连接池,可以复用协程客户端,实现长连接。

$pool = new RedisPool();
$server = new Swoole\Http\Server('127.0.0.1', 9501);
$server->set([
    // 如开启异步安全重启, 需要在workerExit释放连接池资源
    'reload_async' => true
]);
$server->on('start', function (swoole_http_server $server) {
    var_dump($server->master_pid);
});
$server->on('workerExit', function (swoole_http_server $server) use ($pool) {
    $pool->destruct();
});
$server->on('request', function (swoole_http_request $req, swoole_http_response $resp) use ($pool) {
    //从连接池中获取一个Redis协程客户端
    $redis = $pool->get();
    //连接失败
    if ($redis === false) {
        $resp->end("ERROR");
        return;
    }
    $result = $redis->hgetall('key');
    $resp->end(var_export($result, true));
    //释放客户端,其他协程可复用此对象
    $pool->put($redis);
});
$server->start();

class RedisPool
{
    protected $available = true;
    protected $pool;

    public function __construct()
    {
        $this->pool = new SplQueue;
    }

    public function put($redis)
    {
        $this->pool->push($redis);
    }

    /**
     * @return bool|mixed|\Swoole\Coroutine\Redis
     */
    public function get()
    {
        //有空闲连接且连接池处于可用状态
        if ($this->available && count($this->pool) > 0) {
            return $this->pool->pop();
        }

        //无空闲连接,创建新连接
        $redis = new Swoole\Coroutine\Redis();
        $res = $redis->connect('127.0.0.1', 6379);
        if ($res == false) {
            return false;
        } else {
            return $redis;
        }
    }

    public function destruct()
    {
        // 连接池销毁, 置不可用状态, 防止新的客户端进入常驻连接池, 导致服务器无法平滑退出
        $this->available = false;
        while (!$this->pool->isEmpty()) {
            $this->pool->pop();
        }
    }
}

  • 小六HI

    $this->pool = new SplQueue;
    

    不应该使用单例模式吗?