Files
tougao/application/common/QueueJob.php
2025-07-28 15:08:43 +08:00

269 lines
10 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\common;
use think\Db;
use think\Cache;
use app\common\QueueRedis;
class QueueJob
{
// 必填参数
protected $aField = ['job_id', 'job_class', 'status', 'create_time', 'update_time', 'error', 'params'];
private $logPath;
private $QueueRedis;
private $maxRetries = 2;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
$this->QueueRedis = QueueRedis::getInstance();
}
/**
* 写入日志到缓冲区
* @param string $message
*/
public function log($message)
{
$log = date("Y-m-d H:i:s") . " " . $message . "\n";
echo $log;
}
/**
* 获取重试延迟时间
* @param string $errorMsg
* @return int
*/
public function getRetryDelay($errorMsg)
{
$delayMap = [
'MySQL server has gone away' => 60,
'timeout' => 30,
'OpenAI' => 45,
'network' => 60
];
foreach ($delayMap as $keyword => $delay) {
if (stripos($errorMsg, $keyword) !== false) { // 不区分大小写匹配
return $delay;
}
}
return 10;
}
/**
* 处理可重试异常
* @param \Exception $e
* @param string $sRedisKey
* @param string $sRedisValue
* @param \think\queue\Job $job
*/
public function handleRetryableException($e,$sRedisKey,$sRedisValue,$job)
{
$sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}");
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->log("超过最大重试次数({$this->maxRetries}),停止重试 | 执行日志:{$sMsg}");
$job->delete();
} else {
$delay = $this->getRetryDelay($sMsg);
$this->log("{$delay}秒后重试({$attempts}/{$this->maxRetries}) | 执行日志:{$sMsg}");
$job->release($delay);
}
}
/**
* 处理不可重试异常
* @param \Exception $e
* @param string $sRedisKey
* @param string $sRedisValue
* @param \think\queue\Job $job
*/
public function handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job)
{
$sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}");
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
$this->log("不可重试错误,直接删除任务 | 执行日志:{$sMsg}");
$job->delete();
}
/**
* 检查并重建数据库连接
*/
public function checkDbConnectionBak()
{
return true;
$maxAttempts = 2; // 最大重试次数
$attempt = 0;
while ($attempt < $maxAttempts) {
try {
// 尝试查询以验证连接
Db::query('SELECT 1');
return true; // 连接有效,直接返回
} catch (\Exception $e) {
$attempt++;
$waitTime = pow(2, $attempt); // 指数退避2s, 4s, 8s...
// 记录连接失败日志
$sMsg = empty($e->getMessage()) ? '检查失败' : $e->getMessage();
$this->log("数据库连接检查失败(尝试{$attempt}/{$maxAttempts}): {$sMsg}");
// 关闭所有连接
Db::close();
// 最后一次尝试不需要等待
if ($attempt < $maxAttempts) {
$this->log("{$waitTime}秒后尝试重新连接...");
sleep($waitTime); // 等待一段时间再重试
}
// 尝试重新连接
try {
Db::connect();
Db::query('SELECT 1'); // 验证重连成功
$this->log("数据库连接已重建(尝试{$attempt}/{$maxAttempts})");
return true;
} catch (\Exception $e2) {
$this->log("数据库重连尝试{$attempt}/{$maxAttempts}失败: {$e2->getMessage()}");
}
}
}
// 所有重试都失败
$this->log("数据库连接异常,已达到最大重试次数({$maxAttempts})");
return false;
}
/**
* 数据库连接检查与重建(高可用版)
* 解决 MySQL server has gone away 等连接超时问题
* @param bool $force 是否强制检查(忽略缓存时间)
* @return bool 连接是否有效
*/
public function checkDbConnection($force = false)
{
return true;
// 1. 用进程ID隔离静态变量避免多Worker进程互相干扰
// 每个队列Worker是独立进程静态变量需进程隔离
static $lastCheckTime = [];
$pid = getmypid(); // 获取当前进程ID
$checkInterval = 60; // 自动检查间隔(秒)
// 非强制检查且未到间隔时间,直接返回有效(减少性能消耗)
if (!$force && isset($lastCheckTime[$pid]) && (time() - $lastCheckTime[$pid] < $checkInterval)) {
return true;
}
// 2. 配置重试参数
$maxAttempts = 3; // 最大重试次数
$attempt = 0; // 当前尝试次数
$baseWait = 2; // 基础等待时间(秒)
// 3. 循环重试连接
while ($attempt < $maxAttempts) {
try {
// 执行轻量查询验证连接DUAL是MySQL伪表效率极高
$result = Db::query('SELECT 1 FROM DUAL');
// 验证查询结果是否有效
if (is_array($result) && !empty($result)) {
$lastCheckTime[$pid] = time();
$this->log("进程[{$pid}]数据库连接有效");
return true;
} else {
throw new Exception("连接验证失败:查询结果异常");
}
}
// 优先捕获PDO底层异常数据库连接错误多为此类
catch (PDOException $e) {
$this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime);
}
// 捕获框架层异常
catch (Exception $e) {
$this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime);
} finally {
$attempt++; // 无论成功失败,计数+1
}
}
// 4. 达到最大重试次数,返回失败
$this->log("进程[{$pid}]数据库连接异常,已达最大重试次数({$maxAttempts})");
return false;
}
/**
* 处理连接错误的统一逻辑
* @param Exception $e 异常对象
* @param int $pid 进程ID
* @param int $attempt 当前尝试次数
* @param int $maxAttempts 最大尝试次数
* @param int $baseWait 基础等待时间
* @param array $lastCheckTime 检查时间记录
*/
private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait, &$lastCheckTime)
{
$errorMsg = empty($e->getMessage()) ? '未知数据库错误' : $e->getMessage();
$errorCode = empty($e->getCode()) ? 0 : $e->getCode();
// 记录错误详情含进程ID便于排查多进程问题
$this->log("进程[{$pid}]连接检查失败(尝试{$attempt}/{$maxAttempts}){$errorMsg}(错误码:{$errorCode}");
// 5. 强制清理当前进程的无效连接
Db::close(); // 关闭框架层面的连接
$this->clearDbInstanceCache(); // 清除框架连接缓存(关键步骤)
cache('db_connection_status', null);
// 最后一次尝试无需等待,直接重试
if ($attempt + 1 >= $maxAttempts) {
return;
}
// 6. 差异化等待策略(针对特定错误延长等待)
$isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false;
// 普通错误2^1=2s → 2^2=4s致命错误3^1=3s → 3^2=9s
$waitTime = $isGoneAway ? $baseWait * pow(3, $attempt) : $baseWait * pow(2, $attempt);
$this->log("进程[{$pid}]将在{$waitTime}秒后重试...");
sleep($waitTime); // 等待指定时间
// 7. 尝试重建连接并二次验证
try {
// 强制重建连接第二个参数true表示忽略缓存
Db::connect(config('database'), true);
// 执行验证查询
$result = Db::query('SELECT 1 FROM DUAL');
// 检查结果是否有效
if (is_array($result) && !empty($result)) {
$lastCheckTime[$pid] = time();
$this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})");
$attempt = $maxAttempts; // 标记成功并退出循环
} else {
throw new Exception("重建连接后查询结果异常");
}
} catch (Exception $e2) {
$this->log("进程[{$pid}]重连失败(尝试{$attempt}/{$maxAttempts}){$e2->getMessage()}");
}
}
/**
* 清除ThinkPHP5的Db类实例缓存解决框架连接缓存问题
* 核心原理:通过反射突破私有属性访问限制
*/
private function clearDbInstanceCache()
{
try {
$reflection = new \ReflectionClass('\think\Db');
$instanceProp = $reflection->getProperty('instance'); // 获取Db类的instance属性
$instanceProp->setAccessible(true); // 设为可访问
$instanceProp->setValue(null, []); // 清空静态缓存的连接实例
} catch (\ReflectionException $e) {
$this->log("清除数据库实例缓存失败:{$e->getMessage()}");
}
}
}