diff --git a/application/common/QueueJob.php b/application/common/QueueJob.php index 756bc7e..c83e558 100644 --- a/application/common/QueueJob.php +++ b/application/common/QueueJob.php @@ -4,7 +4,7 @@ namespace app\common; use think\Db; use think\Cache; use app\common\QueueRedis; - +use app\common\traits\QueueDbHATrait; class QueueJob { // 必填参数 @@ -13,7 +13,8 @@ class QueueJob private $QueueRedis; private $maxRetries = 2; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; - + // 引入高可用数据库管理 trait + use QueueDbHATrait; public function __construct() { $this->QueueRedis = QueueRedis::getInstance(); @@ -95,50 +96,6 @@ class QueueJob $job->delete(); } - /** - * 检查并重建数据库连接 - */ - public function checkDbConnectionBak() - { - return true; - $maxAttempts = 2; // 最大重试次数 - $attempt = 0; - while ($attempt < $maxAttempts) { - try { - // 尝试查询以验证连接 - Db::query('SELECT 1'); - return true; // 连接有效,直接返回 - } catch (\Exception $e) { - $attempt++; - $waitTime = pow(2, $attempt); // 指数退避:2s, 4s, 8s... - // 记录连接失败日志 - $sMsg = empty($e->getMessage()) ? '检查失败' : $e->getMessage(); - $this->log("数据库连接检查失败(尝试{$attempt}/{$maxAttempts}): {$sMsg}"); - // 关闭所有连接 - Db::close(); - - // 最后一次尝试不需要等待 - if ($attempt < $maxAttempts) { - $this->log("{$waitTime}秒后尝试重新连接..."); - sleep($waitTime); // 等待一段时间再重试 - } - // 尝试重新连接 - try { - Db::connect(); - Db::query('SELECT 1'); // 验证重连成功 - $this->log("数据库连接已重建(尝试{$attempt}/{$maxAttempts})"); - return true; - } catch (\Exception $e2) { - $this->log("数据库重连尝试{$attempt}/{$maxAttempts}失败: {$e2->getMessage()}"); - } - } - } - - // 所有重试都失败 - $this->log("数据库连接异常,已达到最大重试次数({$maxAttempts})"); - return false; - } - /** * 数据库连接检查与重建(高可用版) @@ -148,122 +105,6 @@ class QueueJob */ public function checkDbConnection($force = false) { - return true; - // 1. 用进程ID隔离静态变量,避免多Worker进程互相干扰 - // 每个队列Worker是独立进程,静态变量需进程隔离 - static $lastCheckTime = []; - $pid = getmypid(); // 获取当前进程ID - $checkInterval = 60; // 自动检查间隔(秒) - - // 非强制检查且未到间隔时间,直接返回有效(减少性能消耗) - if (!$force && isset($lastCheckTime[$pid]) && (time() - $lastCheckTime[$pid] < $checkInterval)) { - return true; - } - - // 2. 配置重试参数 - $maxAttempts = 3; // 最大重试次数 - $attempt = 0; // 当前尝试次数 - $baseWait = 2; // 基础等待时间(秒) - - // 3. 循环重试连接 - while ($attempt < $maxAttempts) { - try { - // 执行轻量查询验证连接(DUAL是MySQL伪表,效率极高) - $result = Db::query('SELECT 1 FROM DUAL'); - // 验证查询结果是否有效 - if (is_array($result) && !empty($result)) { - $lastCheckTime[$pid] = time(); - $this->log("进程[{$pid}]数据库连接有效"); - return true; - } else { - throw new Exception("连接验证失败:查询结果异常"); - } - } - // 优先捕获PDO底层异常(数据库连接错误多为此类) - catch (PDOException $e) { - $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime); - } - // 捕获框架层异常 - catch (Exception $e) { - $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime); - } finally { - $attempt++; // 无论成功失败,计数+1 - } - } - - // 4. 达到最大重试次数,返回失败 - $this->log("进程[{$pid}]数据库连接异常,已达最大重试次数({$maxAttempts})"); - return false; - } - - /** - * 处理连接错误的统一逻辑 - * @param Exception $e 异常对象 - * @param int $pid 进程ID - * @param int $attempt 当前尝试次数 - * @param int $maxAttempts 最大尝试次数 - * @param int $baseWait 基础等待时间 - * @param array $lastCheckTime 检查时间记录 - */ - private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait, &$lastCheckTime) - { - $errorMsg = empty($e->getMessage()) ? '未知数据库错误' : $e->getMessage(); - $errorCode = empty($e->getCode()) ? 0 : $e->getCode(); - - // 记录错误详情(含进程ID便于排查多进程问题) - $this->log("进程[{$pid}]连接检查失败(尝试{$attempt}/{$maxAttempts}):{$errorMsg}(错误码:{$errorCode})"); - - // 5. 强制清理当前进程的无效连接 - Db::close(); // 关闭框架层面的连接 - $this->clearDbInstanceCache(); // 清除框架连接缓存(关键步骤) - cache('db_connection_status', null); - - // 最后一次尝试无需等待,直接重试 - if ($attempt + 1 >= $maxAttempts) { - return; - } - - // 6. 差异化等待策略(针对特定错误延长等待) - $isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false; - // 普通错误:2^1=2s → 2^2=4s;致命错误:3^1=3s → 3^2=9s - $waitTime = $isGoneAway ? $baseWait * pow(3, $attempt) : $baseWait * pow(2, $attempt); - - $this->log("进程[{$pid}]将在{$waitTime}秒后重试..."); - sleep($waitTime); // 等待指定时间 - - // 7. 尝试重建连接并二次验证 - try { - // 强制重建连接(第二个参数true表示忽略缓存) - Db::connect(config('database'), true); - - // 执行验证查询 - $result = Db::query('SELECT 1 FROM DUAL'); - // 检查结果是否有效 - if (is_array($result) && !empty($result)) { - $lastCheckTime[$pid] = time(); - $this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})"); - $attempt = $maxAttempts; // 标记成功并退出循环 - } else { - throw new Exception("重建连接后查询结果异常"); - } - } catch (Exception $e2) { - $this->log("进程[{$pid}]重连失败(尝试{$attempt}/{$maxAttempts}):{$e2->getMessage()}"); - } - } - - /** - * 清除ThinkPHP5的Db类实例缓存(解决框架连接缓存问题) - * 核心原理:通过反射突破私有属性访问限制 - */ - private function clearDbInstanceCache() - { - try { - $reflection = new \ReflectionClass('\think\Db'); - $instanceProp = $reflection->getProperty('instance'); // 获取Db类的instance属性 - $instanceProp->setAccessible(true); // 设为可访问 - $instanceProp->setValue(null, []); // 清空静态缓存的连接实例 - } catch (\ReflectionException $e) { - $this->log("清除数据库实例缓存失败:{$e->getMessage()}"); - } + return $this->checkDbConnectionTrait(); } } \ No newline at end of file