AI相关调整

This commit is contained in:
chengxl
2025-08-15 15:17:52 +08:00
parent 603deadbb4
commit f16ac88ec4
6 changed files with 942 additions and 647 deletions

View File

@@ -4,22 +4,73 @@ namespace app\common;
use think\Db;
use think\Cache;
use app\common\QueueRedis;
use app\common\traits\QueueDbHATrait;
class QueueJob
{
// 必填参数
protected $aField = ['job_id', 'job_class', 'status', 'create_time', 'update_time', 'error', 'params'];
private $logPath;
private $QueueRedis;
private $maxRetries = 2;
private $maxRetries = 3;//最大重试次数
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
// 引入高可用数据库管理 trait
use QueueDbHATrait;
private $warningThreshold = 120; // 进程超时预警阈值(秒)
// 进程最大运行时间(秒)
protected $maxRunTime = 21600; // 6个小时短于数据库wait_timeout
private $lockExpire = 180; // 锁过期时间(3分钟根据任务实际耗时调整)
private $maxDelay = 300; // 最大重试延迟(5分钟)
// 静态变量:记录进程启动时间(跨任务共享,每个进程仅初始化一次)
protected static $processStartTime = null;
private $startTime;
public function __construct()
{
$this->QueueRedis = QueueRedis::getInstance();
// 初始化进程启动时间(仅在进程首次启动时执行)
if (is_null(self::$processStartTime)) {
self::$processStartTime = time();
// 增加进程ID标识
$pid = getmypid();
$this->log("队列进程启动 [PID:{$pid}],启动时间:" . date('Y-m-d H:i:s', self::$processStartTime));
}
//任务开始时间
$this->startTime = microtime(true);
}
/**
* 任务初始化验证
* @return bool
*/
public function init($job){
// 检查进程是否已超时,提前退出
if ($this->isProcessTimeout(self::$processStartTime)) {
$this->log("进程已超时,放弃处理任务");
$job->release(15); // 短延迟后重新入队
return;
}
// 进程超时预警
$this->checkProcessTimeoutWarning(self::$processStartTime);
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->log("Redis连接失败10秒后重试");
$job->release(15);
return;
}
}
/**
* 任务结束
* @return bool
*/
public function finnal(){
$executionTime = microtime(true) - $this->startTime;
$this->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "");
gc_collect_cycles();
// 任务完成后,检查进程是否超时
if ($this->isProcessTimeout(self::$processStartTime)) {
$this->log("进程已运行超过{$this->maxRunTime}秒,任务完成后自动退出以刷新连接");
exit(1); // 退出进程触发supervisor重启
}
}
/**
* 写入日志到缓冲区
* @param string $message
@@ -43,13 +94,11 @@ class QueueJob
'OpenAI' => 45,
'network' => 60
];
foreach ($delayMap as $keyword => $delay) {
if (stripos($errorMsg, $keyword) !== false) { // 不区分大小写匹配
return $delay;
}
}
return 10;
}
@@ -60,25 +109,32 @@ class QueueJob
* @param string $sRedisValue
* @param \think\queue\Job $job
*/
public function handleRetryableException($e,$sRedisKey,$sRedisValue,$job)
public function handleRetryableException($e, $sRedisKey, $sRedisValue, $job)
{
$sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}");
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
if ($this->isFatalDatabaseError($e)) {
$this->handleDatabaseErrorAndRestartWithRetry($e, $sRedisKey, $sRedisValue, $job);
return;
}
// 原子化更新任务状态
$this->QueueRedis->atomicJobUpdate($sRedisKey, 'failed', 3600, $sRedisValue);
$attempts = $job->attempts();
// 双重限制:次数
if ($attempts >= $this->maxRetries) {
$this->log("超过最大重试次数({$this->maxRetries}),停止重试 | 执行日志:{$sMsg}");
$job->delete();
} else {
$delay = $this->getRetryDelay($sMsg);
$this->log("{$delay}秒后重试({$attempts}/{$this->maxRetries}) | 执行日志:{$sMsg}");
$delay = $this->getRetryDelay($sMsg, $attempts); // 动态延迟
$this->log("{$delay}秒后重试({$attempts}/{$this->maxRetries})");
$job->release($delay);
}
}
/**
* 处理不可重试异常
* @param \Exception $e
@@ -96,15 +152,118 @@ class QueueJob
$job->delete();
}
// 判断是否为需要重启的致命数据库错误
private function isFatalDatabaseError(\Exception $e)
{
// 1. 检查是否为PDO异常
if ($e instanceof \PDOException) {
$fatalCodes = [2006, 2013, 2002]; // 2006=连接断开2013=连接丢失2002=无法连接
$errorCode = (int)$e->getCode();
if (in_array($errorCode, $fatalCodes)) {
return true;
}
}
// 2. 检查错误消息关键词覆盖非PDOException的数据库错误
$errorMsg = strtolower($e->getMessage());
$fatalKeywords = [
'mysql server has gone away',
'lost connection to mysql server',
'error while sending stmt_prepare packet',
'database connection failed',
'sqlstate[hy000]' // 通用数据库错误前缀
];
foreach ($fatalKeywords as $keyword) {
if (strpos($errorMsg, $keyword) !== false) {
return true;
}
}
return false;
}
//处理数据库错误,释放任务后重启(保留任务)
private function handleDatabaseErrorAndRestartWithRetry($e, $sRedisKey, $sRedisValue, $job)
{
$this->log("检测到致命数据库错误,释放任务后重启队列 | 错误: {$e->getMessage()}");
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->log("数据库错误重试达上限,标记任务失败");
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600, $sRedisValue);
$job->delete();
exit(1);
}
// 1. 释放Redis锁必须先释放否则新进程无法获取锁
if (!empty($sRedisKey) && !empty($sRedisValue)) {
$this->QueueRedis->forceReleaseLock($sRedisKey, $sRedisValue);
}
// 2. 释放任务回队列(设置短延迟,避免重启前被其他进程处理)
$job->release(60); // 60秒后重新入队给进程重启留时间
$this->log("任务已释放回队列,等待新进程处理");
// 3. 强制退出进程触发Supervisor重启
$this->log("数据库错误,重启进程以刷新连接,新进程将处理释放的任务");
exit(1);
}
/**
* 数据库连接检查与重建(高可用版)
* 解决 MySQL server has gone away 等连接超时问题
* @param bool $force 是否强制检查(忽略缓存时间)
* @return bool 连接是否有效
* 获取分布式锁
* @param string $sRedisKey
* @param string $sRedisValue
* @param Job $job
* @return bool
*/
public function checkDbConnection($force = false)
public function acquireLock($sRedisKey, $sRedisValue, $job)
{
return $this->checkDbConnectionTrait();
$isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $this->lockExpire);
if (!$isLocked) {
$currentLockValue = $this->QueueRedis->getRedisValue($sRedisKey); // 获取当前锁值
$jobStatus = $this->QueueRedis->getJobStatus($sRedisKey);
// 若锁值为空或过期,强制抢占锁
if (empty($currentLockValue) || $jobStatus === false) {
$this->log("数据为空 | 状态: {$currentLockValue} | 键: {$jobStatus}");
$isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $this->lockExpire);
if ($isLocked) return true;
}
if (in_array($jobStatus, ['completed', 'failed'])) {
$this->log("任务已完成或失败,删除任务 | 状态: {$jobStatus} | 键: {$sRedisKey}");
$job->delete();
} else {
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->log("超过最大重试次数({$this->maxRetries}),停止重试 | 键: {$sRedisKey}");
$job->delete();
} else {
$lockTtl = $this->QueueRedis->getLockTtl($sRedisKey);
$delay = $lockTtl > 0 ? $lockTtl + 5 : 30;
// 限制最大延迟时间
$delay = min($delay, $this->maxDelay);
$this->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries}) | 键: {$sRedisKey}");
$job->release($delay);
}
}
return false;
}
$this->log("写入成功 | 状态: {$sRedisKey} | 键: {$sRedisValue}");
return true;
}
/**
* 检查进程是否超时
* @return bool
*/
public function isProcessTimeout($processStartTime)
{
return time() - $processStartTime > $this->maxRunTime;
}
/**
* 检查进程超时预警
*/
public function checkProcessTimeoutWarning($processStartTime)
{
$remainingTime = $this->maxRunTime - (time() - $processStartTime);
if ($remainingTime > 0 && $remainingTime < $this->warningThreshold) {
$this->log("进程即将超时,剩余时间:{$remainingTime}");
}
}
}