From 55dca7ce79e1e0fbe9ec8f2a96924dc8fb434b87 Mon Sep 17 00:00:00 2001 From: chengxl Date: Fri, 22 Aug 2025 13:16:27 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=A2=E9=97=AEAI=E8=AF=9D=E6=9C=AF=E8=B0=83?= =?UTF-8?q?=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/traits/QueueDbHATraitBak.php | 263 ++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 application/common/traits/QueueDbHATraitBak.php diff --git a/application/common/traits/QueueDbHATraitBak.php b/application/common/traits/QueueDbHATraitBak.php new file mode 100644 index 0000000..84e4a89 --- /dev/null +++ b/application/common/traits/QueueDbHATraitBak.php @@ -0,0 +1,263 @@ + 20, // 缩短检查间隔,提升连接有效性 + 'max_attempts' => 4, // 增加重试次数,应对连接波动 + 'base_wait' => 1, // 基础等待时间(秒) + 'reconnect_threshold' => 3, // 连续失败告警阈值 + 'fatal_error_codes' => [2006, 2013, 1053], // 致命错误码(含服务中断) + ]; + + // 进程内状态记录(避免跨进程干扰) + private $consecutiveFailures = []; + private $lastCheckTime = []; + + /** + * 队列任务执行前的连接检查(核心入口) + * @param bool $force 是否强制检查 + * @return bool 连接是否可用 + */ + public function checkDbConnectionTrait($force = false) + { + $pid = getmypid(); + $this->initConsecutiveFailures($pid); + + // 非强制检查且未到间隔,直接返回 + if (!$force && isset($this->lastCheckTime[$pid]) + && (time() - $this->lastCheckTime[$pid] < $this->dbConfig['check_interval'])) { + return true; + } + + $attempt = 0; + $maxAttempts = $this->dbConfig['max_attempts']; + $baseWait = $this->dbConfig['base_wait']; + + while ($attempt < $maxAttempts) { + try { + // 执行轻量查询验证(使用框架Db方法,确保与业务代码一致) + $result = $this->safeQuery('SELECT 1 FROM DUAL', 2); + if ($this->isValidResult($result)) { + $this->resetConsecutiveFailures($pid); + $this->lastCheckTime[$pid] = time(); + $this->log("进程[{$pid}]数据库连接有效", 'info'); + return true; + } + throw new Exception("查询结果无效"); + } catch (PDOException $e) { + // 致命错误加速重试 + if (in_array($e->getCode(), $this->dbConfig['fatal_error_codes'])) { + $this->log("进程[{$pid}]致命错误({$e->getCode()}):{$e->getMessage()}", 'error'); + $attempt = $maxAttempts - 1; + } + $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait); + } catch (Exception $e) { + $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait); + } finally { + $attempt++; + } + } + + // 达到最大重试次数 + $this->incrementConsecutiveFailures($pid); + $this->log("进程[{$pid}]连接异常,已达最大重试次数({$maxAttempts})", 'error'); + return false; + } + + /** + * 处理连接错误(确保后续能正常使用Db::insert()) + */ + private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait) + { + $errorMsg = $e->getMessage() ?: '未知错误'; + $errorCode = $e->getCode() ?: 0; + $this->log("进程[{$pid}]连接失败(尝试{$attempt}/{$maxAttempts}):{$errorMsg}(码:{$errorCode})", 'warning'); + + // 强制清理当前进程的连接缓存(关键:确保重建的连接能被Db::insert()使用) + $this->cleanupConnections(); + + // 最后一次尝试无需等待 + if ($attempt + 1 >= $maxAttempts) { + return; + } + + // 差异化等待策略 + $waitTime = $this->calculateWaitTime($errorMsg, $attempt, $baseWait); + $this->log("进程[{$pid}]将在{$waitTime}秒后重试", 'info'); + $this->safeSleep($waitTime); + + // 重建连接并验证(使用框架方法,确保与业务代码兼容) + try { + // 强制重建框架连接,确保Db::insert()能使用新连接 + Db::connect(config('database'), true); + $result = $this->safeQuery('SELECT 1 FROM DUAL', 2); + if ($this->isValidResult($result)) { + $this->resetConsecutiveFailures($pid); + $this->lastCheckTime[$pid] = time(); + $this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})", 'info'); + $attempt = $maxAttempts; // 退出循环 + } else { + throw new Exception("重建连接后查询无效"); + } + } catch (Exception $e2) { + $this->log("进程[{$pid}]重连失败:{$e2->getMessage()}", 'error'); + } + } + + /** + * 安全执行查询(兼容框架Db方法,带超时控制) + */ + private function safeQuery($sql, $timeout) + { + $start = microtime(true); + // 使用框架Db::query(),确保与业务中Db::insert()使用相同的连接机制 + $result = Db::query($sql); + + // 代码层面控制超时,不依赖database.php配置 + if (microtime(true) - $start > $timeout) { + throw new Exception("查询超时({$timeout}秒)"); + } + return $result; + } + + /** + * 清理连接资源(仅影响当前进程,不干扰现有系统) + */ + private function cleanupConnections() + { + // 关闭当前进程的框架连接(不影响其他进程) + Db::close(); + // 清除Db类静态缓存(仅当前进程) + $this->clearDbInstanceCache(); + // 保留系统缓存,避免影响现有业务 + } + + /** + * 清除Db类实例缓存(确保新连接能被正确创建) + */ + private function clearDbInstanceCache() + { + static $reflection = null; + static $instanceProp = null; + + if (!$reflection) { + try { + $reflection = new \ReflectionClass('\think\Db'); + $instanceProp = $reflection->getProperty('instance'); + $instanceProp->setAccessible(true); + } catch (\ReflectionException $e) { + $this->log("反射初始化失败:{$e->getMessage()}", 'error'); + return; + } + } + + try { + // 仅清空当前进程的Db实例缓存,不影响其他进程(如Web请求) + $instanceProp->setValue(null, []); + } catch (\ReflectionException $e) { + $this->log("清除Db缓存失败:{$e->getMessage()}", 'error'); + } + } + + /** + * 计算等待时间(针对队列优化) + */ + private function calculateWaitTime($errorMsg, $attempt, $baseWait) + { + $isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false; + $isTimeout = stripos($errorMsg, 'timeout') !== false; + + if ($isGoneAway) { + return $baseWait * pow(2, $attempt); // 致命错误:1→2→4秒 + } elseif ($isTimeout) { + return $baseWait * pow(1.5, $attempt); // 超时错误:1→1.5→2.25秒 + } else { + return $baseWait * pow(1.2, $attempt); // 普通错误:1→1.2→1.44秒 + } + } + + /** + * 验证查询结果有效性 + */ + private function isValidResult($result) + { + return is_array($result) && !empty($result) + && isset(current($result)['1']) && current($result)['1'] == 1; + } + + /** + * 连续失败计数管理 + */ + private function initConsecutiveFailures($pid) + { + if (!isset($this->consecutiveFailures[$pid])) { + $this->consecutiveFailures[$pid] = 0; + } + } + + private function incrementConsecutiveFailures($pid) + { + $this->consecutiveFailures[$pid]++; + if ($this->consecutiveFailures[$pid] >= $this->dbConfig['reconnect_threshold']) { + $this->alert("进程[{$pid}]连续连接失败{$this->consecutiveFailures[$pid]}次,可能存在数据库隐患"); + } + } + + private function resetConsecutiveFailures($pid) + { + $this->consecutiveFailures[$pid] = 0; + } + + /** + * 日志记录(仅输出到队列控制台,不干扰系统日志) + */ + private function log($message, $level = 'info') + { + $logTime = date('Y-m-d H:i:s'); + $content = "[{$logTime}] [{$level}] {$message}"; + // 仅在队列Worker控制台输出,不写入系统日志文件 + echo $content . "\n"; + } + + /** + * 告警通知(独立日志文件,不干扰现有系统) + */ + private function alert($message) + { + $alertFile = RUNTIME_PATH . "log/queue_db_alert_" . date('Ymd') . ".log"; + file_put_contents($alertFile, "[".date('Y-m-d H:i:s')."] ALERT: {$message}\n", FILE_APPEND | LOCK_EX); + } + + /** + * 安全睡眠(支持Worker正常终止) + */ + private function safeSleep($seconds) + { + $interval = 1; + while ($seconds > 0) { + if (connection_aborted() || $this->isWorkerStopped()) { + throw new Exception("队列Worker已终止,中断睡眠"); + } + $sleep = min($interval, $seconds); + sleep($sleep); + $seconds -= $sleep; + } + } + + /** + * 检测Worker是否已停止(兼容TP5.0机制) + */ + private function isWorkerStopped() + { + $stopFile = RUNTIME_PATH . 'queue/stop_worker'; + return file_exists($stopFile); + } +} \ No newline at end of file