110 lines
3.3 KiB
PHP
110 lines
3.3 KiB
PHP
<?php
|
|
namespace app\common;
|
|
|
|
use think\Db;
|
|
use think\Cache;
|
|
use app\common\QueueRedis;
|
|
use app\common\traits\QueueDbHATrait;
|
|
class QueueJob
|
|
{
|
|
// 必填参数
|
|
protected $aField = ['job_id', 'job_class', 'status', 'create_time', 'update_time', 'error', 'params'];
|
|
private $logPath;
|
|
private $QueueRedis;
|
|
private $maxRetries = 2;
|
|
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
|
|
// 引入高可用数据库管理 trait
|
|
use QueueDbHATrait;
|
|
public function __construct()
|
|
{
|
|
$this->QueueRedis = QueueRedis::getInstance();
|
|
}
|
|
|
|
/**
|
|
* 写入日志到缓冲区
|
|
* @param string $message
|
|
*/
|
|
public function log($message)
|
|
{
|
|
$log = date("Y-m-d H:i:s") . " " . $message . "\n";
|
|
echo $log;
|
|
}
|
|
|
|
/**
|
|
* 获取重试延迟时间
|
|
* @param string $errorMsg
|
|
* @return int
|
|
*/
|
|
public function getRetryDelay($errorMsg)
|
|
{
|
|
$delayMap = [
|
|
'MySQL server has gone away' => 60,
|
|
'timeout' => 30,
|
|
'OpenAI' => 45,
|
|
'network' => 60
|
|
];
|
|
|
|
foreach ($delayMap as $keyword => $delay) {
|
|
if (stripos($errorMsg, $keyword) !== false) { // 不区分大小写匹配
|
|
return $delay;
|
|
}
|
|
}
|
|
|
|
return 10;
|
|
}
|
|
|
|
/**
|
|
* 处理可重试异常
|
|
* @param \Exception $e
|
|
* @param string $sRedisKey
|
|
* @param string $sRedisValue
|
|
* @param \think\queue\Job $job
|
|
*/
|
|
public function handleRetryableException($e,$sRedisKey,$sRedisValue,$job)
|
|
{
|
|
$sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage();
|
|
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
|
|
$this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}");
|
|
|
|
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
|
|
|
|
$attempts = $job->attempts();
|
|
if ($attempts >= $this->maxRetries) {
|
|
$this->log("超过最大重试次数({$this->maxRetries}),停止重试 | 执行日志:{$sMsg}");
|
|
$job->delete();
|
|
} else {
|
|
$delay = $this->getRetryDelay($sMsg);
|
|
$this->log("{$delay}秒后重试({$attempts}/{$this->maxRetries}) | 执行日志:{$sMsg}");
|
|
$job->release($delay);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 处理不可重试异常
|
|
* @param \Exception $e
|
|
* @param string $sRedisKey
|
|
* @param string $sRedisValue
|
|
* @param \think\queue\Job $job
|
|
*/
|
|
public function handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job)
|
|
{
|
|
$sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage();
|
|
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
|
|
$this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}");
|
|
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
|
|
$this->log("不可重试错误,直接删除任务 | 执行日志:{$sMsg}");
|
|
$job->delete();
|
|
}
|
|
|
|
|
|
/**
|
|
* 数据库连接检查与重建(高可用版)
|
|
* 解决 MySQL server has gone away 等连接超时问题
|
|
* @param bool $force 是否强制检查(忽略缓存时间)
|
|
* @return bool 连接是否有效
|
|
*/
|
|
public function checkDbConnection($force = false)
|
|
{
|
|
return $this->checkDbConnectionTrait();
|
|
}
|
|
} |