消息队列Queue
一、引言
1、任务的「原子性获取」机制
当消费者进程(queue:listen 或 queue:work)从队列中获取任务时,队列驱动(如 Redis、Database 等)会通过「原子操作」确保一个任务同一时间只会被一个进程获取。
2、以 Redis 驱动为例:
进程从队列取任务时,会执行 BRPOPLPUSH 命令(原子操作),将任务从「待执行队列」移到「临时处理队列」(如 queue:default:reserved),并标记任务的「处理中」状态。
其他进程此时再检查队列,不会看到这个已被取走的任务,自然不会重复执行。
3、重试
以下情况会重试
① 执行失败
② 执行超时
任务重试很关键,必须要做”事务操作”,否则可能同一个数据会被重复写入等问题
二、安装使用
版本查询
默认composer安装会直接安装最新版,我们需要根据自己的php版本或是其他环境选择我们需要的版本进行安装
查询包:https://packagist.org/packages/topthink/think-queue
安装队列
若您的系统中安装多个php版本,需指定版本,例如我这边指定了php8.0版本。
建议是指定版本,后续若是有安装别的php版本就不会影响到
sudo -u www /www/server/php/80/bin/php /usr/bin/composer require topthink/think-queue
配置文件
安装成功后,会自动生成 config\queue.php
配置文件
建议:默认是没有给出执行超时的时间,我们可以手动配置 ‘expire’ => 300 单位:秒
return [
'default' => 'redis', // 驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动
'connections' => [
'sync' => [ // 同步
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
'expire' => 600,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
'expire' => 600, // 关键:任务超时时间,超过此时长未完成会被重试
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
守护进程
格式
/www/server/php/80/bin/php think queue:listen --queue=队列名1,队列名2,队列名3 --sleep=间隔秒数 --worker=开启进程数
示例
// 启动3个工作进程,同时处理任务,每个任务间隔1秒执行email,order任务
php think queue:listen --queue=email,order --sleep=1 --worker=3
命令解释--queue
指定队列名,多个队列,以英文逗号隔开--sleep
间隔几秒执行,若是要毫秒,那么可以配置成小数,例如:0.1=100毫秒--worker
启动多个个进程,同时处理任务
发布任务(生产者)
格式
单任务-立即执行:
think\facade\Queue::push($job, 参数, $queue = null)
多任务-立即执行:
think\facade\Queue::push($job,['@方法名', 参数], $queue = null)
单任务-延迟执行:
think\facade\Queue::later($delay, $job, 参数, $queue = null)
多任务-延迟执行:
think\facade\Queue::later($delay, $job,['@方法名', 参数], $queue = null)
示例
单任务-立即执行:
think\facade\Queue::push(\app\job\EmailJob::class, ['to'=>'test@qq.com','content'=>'测试邮件'], 'email')
多任务-立即执行
think\facade\Queue::push(\app\job\OrderJob::class,['@settled', ['sn'=>'P2tu9asdnfli2u0asmkdf8203','status'=>'SUCCESS']], 'order')
单任务-延迟执行
think\facade\Queue::later(3, \app\job\EmailJob::class, ['to'=>'test@qq.com','content'=>'测试邮件'], 'email')
多任务-延迟执行
think\facade\Queue::later(3, \app\job\OrderJob::class,['@settled',['sn'=>'P2tu9asdnfli2u0asmkdf8203','status'=>'SUCCESS']], 'order')
执行任务(消费者)
消费者意思则是执行任务的各种类,例如:EmailJob
,OrderJob
Job基类
<?php
namespace app\job;
use think\Exception;
use think\queue\Job;
use think\facade\Log;
/**
* 队列任务基类
* 方法内必须返回true,否则会重复执行
* 支持:
* 1. 单任务模式(继承该类,则必须实现handle方法)
* Queue::push(OrderJob::class,参数(可以是任何数据类型), '任务名');
* Queue::push(OrderJob::class, ['order_id'=>100,'pay_status'=>'SUCCESS'], 'order');
* 2. 多任务模式(子类定义多个业务方法)
* Queue::push(OrderJob::class, ['@方法名称',参数(可以是任何数据类型)], '任务名');
* Queue::push(OrderJob::class, ['@settled',['order_id'=>100,'pay_status'=>'SUCCESS']], 'order');
*/
abstract class BaseJob
{
/** @var Job 队列任务对象 */
protected $job;
/** @var array 任务数据 */
protected $data;
/** @var int 最大重试次数 */
protected $maxAttempts = 3;
/** @var string 当前执行的方法名(多任务模式用) */
private $method = 'handle';
/**
* 运行消息队列
* @param Job $job
* @param $input
*/
public function fire(Job $job, $input): void
{
try {
// 赋值
$this->job = $job;
// 解析参数
[$this->method,$this->data] = $this->parse($input);
// 开始
$this->log('START');
// 执行
if (!method_exists($this, $this->method)) {
// 删除任务
$this->log('ERROR',"消息队列被删除了:方法不存在");
$job->delete();
}
if ($this->{$this->method}($this->data)) {
//删除任务
$this->log('SUCCESS',"消息队列被删除了:执行成功");
$job->delete();
} else {
if ($job->attempts() >= $this->maxAttempts) {
//删除任务
$this->log('FAIL',"消息队列被删除了:超过最大错误次数{$this->maxAttempts}次");
$job->delete();
} else {
//从新放入队列
$this->log('INFO',"从新放入队列");
$job->release();
}
}
} catch (\Throwable $e) {
// 记入文件日志
$this->log('ERROR',"消息队列被删除了:".$e->getMessage());
$job->delete();
}
}
/**
* 任务彻底失败回调(子类可重写)
* @param array $data
* @param Exception $e
*/
protected function failed(array $data, Exception $e)
{
// 子类可实现此方法处理最终失败(如发送告警)
}
/**
* 日志记录
* @param string $status 状态:START/SUCCESS/FAIL/ERROR/INFO
* @param string $message 附加信息
*/
private function log(string $status, string $message = '')
{
$jobClass = get_called_class();
$taskInfo = [
'class' => $jobClass,
'method' => $this->method ?: 'handle',
'queue' => $this->job->getQueue(),
'attempts' => $this->job->attempts(),
'data' => $this->data,
'message' => $message
];
$logMsg = "[队列任务-{$status}] " . json_encode($taskInfo, JSON_UNESCAPED_UNICODE);
Log::error($logMsg);
}
/*
* 解析参数
*/
private function parse($input)
{
$method = $this->method;
// 处理非数组输入(如字符串、数字等)
if (!is_array($input)) {
return [$method,$input];
}
// 数组为空时直接返回默认值
if (empty($input)) {
return [$method,$input];
}
// 获取数组第一个元素(不依赖索引)
$first = reset($input);
// 情况1:第一个元素是字符串且以@开头
if (is_string($first) && strpos($first, '@') === 0) {
// 提取method(去除@)
$method = ltrim($first, '@');
// 处理data:取剩余元素
$remaining = $input[1]??[];
return [$method,$remaining];
}
// 情况2:第一个元素不以@开头
else {
return [$method,$input];
}
}
}