Files
tougao/application/common/QueueJob.php
chengxl 7b78fab4e1 调整
2025-08-21 10:37:08 +08:00

285 lines
11 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\common;
use think\Db;
use think\Cache;
use app\common\QueueRedis;
class QueueJob
{
// 必填参数
protected $aField = ['job_id', 'job_class', 'status', 'create_time', 'update_time', 'error', 'params'];
private $logPath;
private $QueueRedis;
private $maxRetries = 3;//最大重试次数
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
private $warningThreshold = 120; // 进程超时预警阈值(秒)
// 进程最大运行时间(秒)
protected $maxRunTime = 21600; // 6个小时短于数据库wait_timeout
private $lockExpire = 180; // 锁过期时间(3分钟根据任务实际耗时调整)
private $maxDelay = 300; // 最大重试延迟(5分钟)
// 静态变量:记录进程启动时间(跨任务共享,每个进程仅初始化一次)
protected static $processStartTime = null;
private $startTime;
public function __construct()
{
$this->QueueRedis = QueueRedis::getInstance();
// 初始化进程启动时间(仅在进程首次启动时执行)
if (is_null(self::$processStartTime)) {
self::$processStartTime = time();
// 增加进程ID标识
$pid = getmypid();
$this->log("队列进程启动 [PID:{$pid}],启动时间:" . date('Y-m-d H:i:s', self::$processStartTime));
}
//任务开始时间
$this->startTime = microtime(true);
}
/**
* 任务初始化验证
* @return bool
*/
public function init($job){
// 检查进程是否已超时,提前退出
if ($this->isProcessTimeout(self::$processStartTime)) {
$this->log("进程已超时,放弃处理任务");
$job->release(15); // 短延迟后重新入队
return;
}
// 进程超时预警
$this->checkProcessTimeoutWarning(self::$processStartTime);
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->log("Redis连接失败10秒后重试");
$job->release(15);
return;
}
}
/**
* 任务结束
* @return bool
*/
public function finnal(){
$executionTime = microtime(true) - $this->startTime;
$this->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "");
gc_collect_cycles();
// 任务完成后,检查进程是否超时
if ($this->isProcessTimeout(self::$processStartTime)) {
$this->log("进程已运行超过{$this->maxRunTime}秒,任务完成后自动退出以刷新连接");
exit(1); // 退出进程触发supervisor重启
}
}
/**
* 写入日志到缓冲区
* @param string $message
*/
public function log($message)
{
$log = date("Y-m-d H:i:s") . " " . $message . "\n";
echo $log;
}
/**
* 获取重试延迟时间
* @param string $errorMsg
* @return int
*/
public function getRetryDelay($errorMsg)
{
$delayMap = [
'MySQL server has gone away' => 60,
'timeout' => 30,
'OpenAI' => 45,
'network' => 60
];
foreach ($delayMap as $keyword => $delay) {
if (stripos($errorMsg, $keyword) !== false) { // 不区分大小写匹配
return $delay;
}
}
return 10;
}
/**
* 处理可重试异常
* @param \Exception $e
* @param string $sRedisKey
* @param string $sRedisValue
* @param \think\queue\Job $job
*/
public function handleRetryableException($e, $sRedisKey, $sRedisValue, $job)
{
$sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}");
if ($this->isFatalDatabaseError($e)) {
$this->handleDatabaseErrorAndRestartWithRetry($e, $sRedisKey, $sRedisValue, $job);
return;
}
// 原子化更新任务状态
$this->QueueRedis->atomicJobUpdate($sRedisKey, 'failed', 3600, $sRedisValue);
$attempts = $job->attempts();
// 双重限制:次数
if ($attempts >= $this->maxRetries) {
$this->log("超过最大重试次数({$this->maxRetries}),停止重试 | 执行日志:{$sMsg}");
$job->delete();
} else {
$delay = $this->getRetryDelay($sMsg, $attempts); // 动态延迟
$this->log("{$delay}秒后重试({$attempts}/{$this->maxRetries})");
$job->release($delay);
}
}
/**
* 处理不可重试异常
* @param \Exception $e
* @param string $sRedisKey
* @param string $sRedisValue
* @param \think\queue\Job $job
*/
public function handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job)
{
$sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}");
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
$this->log("不可重试错误,直接删除任务 | 执行日志:{$sMsg}");
$job->delete();
}
// 判断是否为需要重启的致命数据库错误
private function isFatalDatabaseError(\Exception $e)
{
// 1. 检查是否为PDO异常
if ($e instanceof \PDOException) {
$fatalCodes = [2006, 2013, 2002]; // 2006=连接断开2013=连接丢失2002=无法连接
$errorCode = (int)$e->getCode();
if (in_array($errorCode, $fatalCodes)) {
return true;
}
}
// 2. 检查错误消息关键词覆盖非PDOException的数据库错误
$errorMsg = strtolower($e->getMessage());
$fatalKeywords = [
'mysql server has gone away',
'lost connection to mysql server',
'error while sending stmt_prepare packet',
'database connection failed',
'sqlstate[hy000]' // 通用数据库错误前缀
];
foreach ($fatalKeywords as $keyword) {
if (strpos($errorMsg, $keyword) !== false) {
return true;
}
}
return false;
}
//处理数据库错误,释放任务后重启(保留任务)
private function handleDatabaseErrorAndRestartWithRetry($e, $sRedisKey, $sRedisValue, $job)
{
$this->log("检测到致命数据库错误,释放任务后重启队列 | 错误: {$e->getMessage()}");
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->log("数据库错误重试达上限,标记任务失败");
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600, $sRedisValue);
$job->delete();
exit(1);
}
// 1. 释放Redis锁必须先释放否则新进程无法获取锁
if (!empty($sRedisKey) && !empty($sRedisValue)) {
$this->QueueRedis->forceReleaseLock($sRedisKey, $sRedisValue);
}
// 2. 释放任务回队列(设置短延迟,避免重启前被其他进程处理)
$job->release(60); // 60秒后重新入队给进程重启留时间
$this->log("任务已释放回队列,等待新进程处理");
// 3. 强制退出进程触发Supervisor重启
$this->log("数据库错误,重启进程以刷新连接,新进程将处理释放的任务");
exit(1);
}
/**
* 获取分布式锁
* @param string $sRedisKey
* @param string $sRedisValue
* @param Job $job
* @return bool
*/
public function acquireLock($sRedisKey, $sRedisValue, $job)
{
// 1. 前置校验:先检查是否已完成/失败(优先于锁操作)
$jobStatus = $this->QueueRedis->getJobStatus($sRedisKey);
if (in_array($jobStatus, ['completed', 'failed'])) {
$this->log("任务已终止,删除任务 | 键: {$sRedisKey} | 状态: {$jobStatus}");
$job->delete();
return false;
}
// 2. 尝试获取锁
$isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $this->lockExpire);
if ($isLocked) {
$this->log("成功获取锁 | 键: {$sRedisKey} | 锁值: {$sRedisValue}");
return true;
}
// 3. 锁竞争处理:二次校验状态+超时抢占
$currentLockValue = $this->QueueRedis->getRedisValue($sRedisKey);
$jobStatus = $this->QueueRedis->getJobStatus($sRedisKey);
// 3.1 若状态已终止,直接删除任务
if (in_array($jobStatus, ['completed', 'failed'])) {
$this->log("任务已终止,删除任务 | 键: {$sRedisKey} | 状态: {$jobStatus}");
$job->delete();
return false;
}
// 3.2 锁已过期值为空或TTL<=0强制抢占
$lockTtl = $this->QueueRedis->getLockTtl($sRedisKey);
if (empty($currentLockValue) || $lockTtl <= 0) {
$this->log("锁已过期,强制抢占 | 键: {$sRedisKey} | TTL: {$lockTtl}");
$isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $this->lockExpire);
if ($isLocked) {
return true;
}
}
// 3.3 未获取到锁,按重试策略处理
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->log("超过最大重试次数({$this->maxRetries}),删除任务 | 键: {$sRedisKey}");
$job->delete();
} else {
// 动态计算延迟时间(基于当前锁剩余时间)
$delay = $lockTtl > 0 ? $lockTtl + 5 : 30;
$delay = min($delay, $this->maxDelay);
$this->log("锁竞争,延迟{$delay}秒重试({$attempts}/{$this->maxRetries}) | 键: {$sRedisKey}");
$job->release($delay);
}
return false;
}
/**
* 检查进程是否超时
* @return bool
*/
public function isProcessTimeout($processStartTime)
{
return time() - $processStartTime > $this->maxRunTime;
}
/**
* 检查进程超时预警
*/
public function checkProcessTimeoutWarning($processStartTime)
{
$remainingTime = $this->maxRunTime - (time() - $processStartTime);
if ($remainingTime > 0 && $remainingTime < $this->warningThreshold) {
$this->log("进程即将超时,剩余时间:{$remainingTime}");
}
}
}