您的当前位置:首页>全部文章>文章详情

消息队列Queue

发表于:2025-09-18 14:20:22浏览:11次TAG: #ThinkPHP

一、引言

1、任务的「原子性获取」机制

当消费者进程(queue:listen 或 queue:work)从队列中获取任务时,队列驱动(如 Redis、Database 等)会通过「原子操作」确保一个任务同一时间只会被一个进程获取。

2、以 Redis 驱动为例:

进程从队列取任务时,会执行 BRPOPLPUSH 命令(原子操作),将任务从「待执行队列」移到「临时处理队列」(如 queue:default:reserved),并标记任务的「处理中」状态。
其他进程此时再检查队列,不会看到这个已被取走的任务,自然不会重复执行。

3、重试

以下情况会重试
① 执行失败
② 执行超时
任务重试很关键,必须要做”事务操作”,否则可能同一个数据会被重复写入等问题

二、安装使用

版本查询

默认composer安装会直接安装最新版,我们需要根据自己的php版本或是其他环境选择我们需要的版本进行安装
查询包:https://packagist.org/packages/topthink/think-queue

安装队列

若您的系统中安装多个php版本,需指定版本,例如我这边指定了php8.0版本。
建议是指定版本,后续若是有安装别的php版本就不会影响到

sudo -u www /www/server/php/80/bin/php /usr/bin/composer require topthink/think-queue

配置文件

安装成功后,会自动生成 config\queue.php 配置文件
建议:默认是没有给出执行超时的时间,我们可以手动配置 ‘expire’ => 300 单位:秒

return [
    'default'     => 'redis', // 驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动
    'connections' => [
        'sync'     => [ // 同步
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
            'expire' => 600,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
            'expire' => 600,          // 关键:任务超时时间,超过此时长未完成会被重试
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

守护进程

格式

/www/server/php/80/bin/php think queue:listen --queue=队列名1,队列名2,队列名3 --sleep=间隔秒数 --worker=开启进程数

示例

// 启动3个工作进程,同时处理任务,每个任务间隔1秒执行email,order任务
php think queue:listen --queue=email,order --sleep=1 --worker=3

命令解释
--queue 指定队列名,多个队列,以英文逗号隔开
--sleep 间隔几秒执行,若是要毫秒,那么可以配置成小数,例如:0.1=100毫秒
--worker 启动多个个进程,同时处理任务

发布任务(生产者)

格式

单任务-立即执行:

think\facade\Queue::push($job, 参数, $queue = null)

多任务-立即执行:

think\facade\Queue::push($job,['@方法名', 参数], $queue = null)

单任务-延迟执行:

think\facade\Queue::later($delay, $job, 参数, $queue = null)

多任务-延迟执行:

think\facade\Queue::later($delay, $job,['@方法名', 参数], $queue = null)

示例

单任务-立即执行:

think\facade\Queue::push(\app\job\EmailJob::class, ['to'=>'test@qq.com','content'=>'测试邮件'], 'email')

多任务-立即执行

think\facade\Queue::push(\app\job\OrderJob::class,['@settled', ['sn'=>'P2tu9asdnfli2u0asmkdf8203','status'=>'SUCCESS']], 'order')

单任务-延迟执行

think\facade\Queue::later(3, \app\job\EmailJob::class, ['to'=>'test@qq.com','content'=>'测试邮件'], 'email')

多任务-延迟执行

think\facade\Queue::later(3, \app\job\OrderJob::class,['@settled',['sn'=>'P2tu9asdnfli2u0asmkdf8203','status'=>'SUCCESS']], 'order')

执行任务(消费者)

消费者意思则是执行任务的各种类,例如:EmailJobOrderJob

Job基类

<?php
namespace app\job;

use think\Exception;
use think\queue\Job;
use think\facade\Log;

/**
 * 队列任务基类
 * 方法内必须返回true,否则会重复执行
 * 支持:
 * 1. 单任务模式(继承该类,则必须实现handle方法)
 *  Queue::push(OrderJob::class,参数(可以是任何数据类型), '任务名');
 * Queue::push(OrderJob::class, ['order_id'=>100,'pay_status'=>'SUCCESS'], 'order');
 * 2. 多任务模式(子类定义多个业务方法)
 * Queue::push(OrderJob::class, ['@方法名称',参数(可以是任何数据类型)], '任务名');
 * Queue::push(OrderJob::class, ['@settled',['order_id'=>100,'pay_status'=>'SUCCESS']], 'order');
 */
abstract class BaseJob
{
    /** @var Job 队列任务对象 */
    protected $job;

    /** @var array 任务数据 */
    protected $data;

    /** @var int 最大重试次数 */
    protected $maxAttempts = 3;

    /** @var string 当前执行的方法名(多任务模式用) */
    private $method = 'handle';

    /**
     * 运行消息队列
     * @param Job $job
     * @param $input
     */
    public function fire(Job $job, $input): void
    {
        try {
            // 赋值
            $this->job = $job;
            // 解析参数
            [$this->method,$this->data] = $this->parse($input);
            // 开始
            $this->log('START');
            // 执行
            if (!method_exists($this, $this->method)) {
                // 删除任务
                $this->log('ERROR',"消息队列被删除了:方法不存在");
                $job->delete();
            }
            if ($this->{$this->method}($this->data)) {
                //删除任务
                $this->log('SUCCESS',"消息队列被删除了:执行成功");
                $job->delete();
            } else {
                if ($job->attempts() >= $this->maxAttempts) {
                    //删除任务
                    $this->log('FAIL',"消息队列被删除了:超过最大错误次数{$this->maxAttempts}次");
                    $job->delete();
                } else {
                    //从新放入队列
                    $this->log('INFO',"从新放入队列");
                    $job->release();
                }
            }

        } catch (\Throwable $e) {
            // 记入文件日志
            $this->log('ERROR',"消息队列被删除了:".$e->getMessage());
            $job->delete();
        }
    }

    /**
     * 任务彻底失败回调(子类可重写)
     * @param array $data
     * @param Exception $e
     */
    protected function failed(array $data, Exception $e)
    {
        // 子类可实现此方法处理最终失败(如发送告警)
    }


    /**
     * 日志记录
     * @param string $status 状态:START/SUCCESS/FAIL/ERROR/INFO
     * @param string $message 附加信息
     */
    private function log(string $status, string $message = '')
    {
        $jobClass = get_called_class();
        $taskInfo = [
            'class' => $jobClass,
            'method' => $this->method ?: 'handle',
            'queue' => $this->job->getQueue(),
            'attempts' => $this->job->attempts(),
            'data' => $this->data,
            'message' => $message
        ];
        $logMsg = "[队列任务-{$status}] " . json_encode($taskInfo, JSON_UNESCAPED_UNICODE);
        Log::error($logMsg);
    }
    /*
     * 解析参数
     */
    private function parse($input)
    {
        $method = $this->method;
        // 处理非数组输入(如字符串、数字等)
        if (!is_array($input)) {
            return [$method,$input];
        }

        // 数组为空时直接返回默认值
        if (empty($input)) {
            return [$method,$input];
        }

        // 获取数组第一个元素(不依赖索引)
        $first = reset($input);

        // 情况1:第一个元素是字符串且以@开头
        if (is_string($first) && strpos($first, '@') === 0) {
            // 提取method(去除@)
            $method = ltrim($first, '@');

            // 处理data:取剩余元素
            $remaining = $input[1]??[];
            return [$method,$remaining];
        }
        // 情况2:第一个元素不以@开头
        else {
            return [$method,$input];
        }
    }
}