分析Laravel队列实现原理解决问题

问题

公司项目使用Laravel的开发的两个项目在同一个测试服务器部署,公用同一个redis。在使用laravel中的队列时,产生冲突干扰。

查找问题原因

在laravel 队列的操作类Illuminate\Queue\RedisQueue.php中可以看到pushRaw()方法:

// 将一任务推入队列中
public function pushRaw($payload, $queue = null, array $options = [])
    {
        $this->getConnection()->rpush($this->getQueue($queue), $payload);

        return Arr::get(json_decode($payload, true), 'id');
    }
`</pre>

从该方法中可以看出Lrarvel队列的redis实现是通过list结构实现的,`rpush(key, value)`是将value推入键值为key的redis队列,key的值则是通过`$this-&gt;getQueue($queue)` 获取到的

<pre class="line-numbers prism-highlight" data-start="1">`protected function getQueue($queue)
    {
        return 'queues:'.($queue ?: $this-&gt;default);
    }
`</pre>

所以的redis中list中的key是 `'queues:'.($queue ?: $this-&gt;default);`拼接的,`$this-&gt;default` 的值是 `RedisQueue` 实例化的时候从`config\queue.php`配置中加载的 `'queue'      =&gt; 'default'`,`$queue` 是添加队列时`$this-&gt;dispatch( new jobClass()-&gt;onQueue($queue) )`传入的。

<pre class="line-numbers prism-highlight" data-start="1">`// config\queue.php 文件中的redis配置部分
'redis' =&gt; [
            'driver'     =&gt; 'redis',
            'connection' =&gt; 'default',
            'queue'      =&gt; 'default',
            'expire'     =&gt; 60,
        ],
`</pre>

至此,两个项目的队列冲突原因就找到了。因为redis队列配置中 `'queue'      =&gt; 'default'` 都使用的默认的default,所以当共用redis时,默认的队列list 都是'queue:default',所以导致了冲突。

因为队列监听 监听的队列名称是由 --queue参数决定的,如果不传就是我们上面设置的默认值,若传了就会根据传入的队列名从前往后优先依次处理,具体见代码`Illuminate\Queue\Worker.php`中:

<pre class="line-numbers prism-highlight" data-start="1">`protected function getNextJob($connection, $queue)
    {
        if (is_null($queue)) {
            return $connection-&gt;pop();
        }

        foreach (explode(',', $queue) as $queue) {
            if (! is_null($job = $connection-&gt;pop($queue))) {
                return $job;
            }
        }
    }
`</pre>

`$queue`就是`--queue=`传入的参数,当 $queue不存在是直接调用`$connection-&gt;pop()`当参数存在时会将参数解析,优先处理排在前面的队列名称,将队列名称传入`pop($queue)`, `pop()`会尝试从指定队列或默认队列中获取队列任务

<pre class="line-numbers prism-highlight" data-start="1">`// Illuminate\Queue\RedisQueue.php
public function pop($queue = null)
    {
        $original = $queue ?: $this-&gt;default;

        $queue = $this-&gt;getQueue($queue);

        if (! is_null($this-&gt;expire)) {
            $this-&gt;migrateAllExpiredJobs($queue);
        }

        $job = $this-&gt;getConnection()-&gt;lpop($queue);

        if (! is_null($job)) {
            $this-&gt;getConnection()-&gt;zadd($queue.':reserved', $this-&gt;getTime() + $this-&gt;expire, $job);

            return new RedisJob($this-&gt;container, $this, $job, $original);
        }
    }

至此搞清了队列执行的原理。

解决方法

将queue的配置文件中默认队列修改为不同的名称,比如: ‘queue’ => laravel1’,’queue’ => laravel2’。

队列监听 php artisan queue:listen redis --queue=laravel1,syncExpress

最后

遇到问题,莫要病急乱投医。从代码入手,分析理解实现原理,找对点,解决方法也许很简单,^_^。

欢迎关注我的博客