Files
tougao/application/common/QueueJob.php
2025-07-28 17:55:09 +08:00

110 lines
3.3 KiB
PHP

<?php
namespace app\common;
use think\Db;
use think\Cache;
use app\common\QueueRedis;
use app\common\traits\QueueDbHATrait;
class QueueJob
{
// 必填参数
protected $aField = ['job_id', 'job_class', 'status', 'create_time', 'update_time', 'error', 'params'];
private $logPath;
private $QueueRedis;
private $maxRetries = 2;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
// 引入高可用数据库管理 trait
use QueueDbHATrait;
public function __construct()
{
$this->QueueRedis = QueueRedis::getInstance();
}
/**
* 写入日志到缓冲区
* @param string $message
*/
public function log($message)
{
$log = date("Y-m-d H:i:s") . " " . $message . "\n";
echo $log;
}
/**
* 获取重试延迟时间
* @param string $errorMsg
* @return int
*/
public function getRetryDelay($errorMsg)
{
$delayMap = [
'MySQL server has gone away' => 60,
'timeout' => 30,
'OpenAI' => 45,
'network' => 60
];
foreach ($delayMap as $keyword => $delay) {
if (stripos($errorMsg, $keyword) !== false) { // 不区分大小写匹配
return $delay;
}
}
return 10;
}
/**
* 处理可重试异常
* @param \Exception $e
* @param string $sRedisKey
* @param string $sRedisValue
* @param \think\queue\Job $job
*/
public function handleRetryableException($e,$sRedisKey,$sRedisValue,$job)
{
$sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}");
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->log("超过最大重试次数({$this->maxRetries}),停止重试 | 执行日志:{$sMsg}");
$job->delete();
} else {
$delay = $this->getRetryDelay($sMsg);
$this->log("{$delay}秒后重试({$attempts}/{$this->maxRetries}) | 执行日志:{$sMsg}");
$job->release($delay);
}
}
/**
* 处理不可重试异常
* @param \Exception $e
* @param string $sRedisKey
* @param string $sRedisValue
* @param \think\queue\Job $job
*/
public function handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job)
{
$sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}");
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
$this->log("不可重试错误,直接删除任务 | 执行日志:{$sMsg}");
$job->delete();
}
/**
* 数据库连接检查与重建(高可用版)
* 解决 MySQL server has gone away 等连接超时问题
* @param bool $force 是否强制检查(忽略缓存时间)
* @return bool 连接是否有效
*/
public function checkDbConnection($force = false)
{
return $this->checkDbConnectionTrait();
}
}