Files
tougao/application/common/traits/QueueDbHATrait.php
2025-07-28 17:53:13 +08:00

266 lines
9.4 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\common\traits;
use think\Db;
use think\Exception;
use think\db\PDOException;
use think\facade\Log;
trait QueueDbHATrait
{
// 可配置参数(针对队列场景优化)
private $dbConfig = [
'check_interval' => 20, // 缩短检查间隔,提升连接有效性
'max_attempts' => 4, // 增加重试次数,应对连接波动
'base_wait' => 1, // 基础等待时间(秒)
'reconnect_threshold' => 3, // 连续失败告警阈值
'fatal_error_codes' => [2006, 2013, 1053], // 致命错误码(含服务中断)
];
// 进程内状态记录(避免跨进程干扰)
private $consecutiveFailures = [];
private $lastCheckTime = [];
/**
* 队列任务执行前的连接检查(核心入口)
* @param bool $force 是否强制检查
* @return bool 连接是否可用
*/
public function checkDbConnectionTrait($force = false)
{
$pid = getmypid();
$this->initConsecutiveFailures($pid);
// 非强制检查且未到间隔,直接返回
if (!$force && isset($this->lastCheckTime[$pid])
&& (time() - $this->lastCheckTime[$pid] < $this->dbConfig['check_interval'])) {
return true;
}
$attempt = 0;
$maxAttempts = $this->dbConfig['max_attempts'];
$baseWait = $this->dbConfig['base_wait'];
while ($attempt < $maxAttempts) {
try {
// 执行轻量查询验证使用框架Db方法确保与业务代码一致
$result = $this->safeQuery('SELECT 1 FROM DUAL', 2);
if ($this->isValidResult($result)) {
$this->resetConsecutiveFailures($pid);
$this->lastCheckTime[$pid] = time();
$this->log("进程[{$pid}]数据库连接有效", 'info');
return true;
}
throw new Exception("查询结果无效");
} catch (PDOException $e) {
// 致命错误加速重试
$sCode = empty($e->getCode()) ? 0 : $e->getCode();
$sMsg = empty($e->getMessage()) ? '' : $e->getMessage();
if (in_array($sCode, $this->dbConfig['fatal_error_codes'])) {
$this->log("进程[{$pid}]致命错误({$sCode}{$sMsg}", 'error');
$attempt = $maxAttempts - 1;
}
$this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait);
} catch (Exception $e) {
$this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait);
} finally {
$attempt++;
}
}
// 达到最大重试次数
$this->incrementConsecutiveFailures($pid);
$this->log("进程[{$pid}]连接异常,已达最大重试次数({$maxAttempts})", 'error');
return false;
}
/**
* 处理连接错误确保后续能正常使用Db::insert()
*/
private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait)
{
$errorMsg = empty($e->getMessage()) ? '未知错误' : $e->getMessage();
$errorCode = empty($e->getCode()) ? 0 : $e->getCode();
$this->log("进程[{$pid}]连接失败(尝试{$attempt}/{$maxAttempts}){$errorMsg}(码:{$errorCode}", 'warning');
// 强制清理当前进程的连接缓存关键确保重建的连接能被Db::insert()使用)
$this->cleanupConnections();
// 最后一次尝试无需等待
if ($attempt + 1 >= $maxAttempts) {
return;
}
// 差异化等待策略
$waitTime = $this->calculateWaitTime($errorMsg, $attempt, $baseWait);
$this->log("进程[{$pid}]将在{$waitTime}秒后重试", 'info');
$this->safeSleep($waitTime);
// 重建连接并验证(使用框架方法,确保与业务代码兼容)
try {
// 强制重建框架连接确保Db::insert()能使用新连接
Db::connect(config('database'), true);
$result = $this->safeQuery('SELECT 1 FROM DUAL', 2);
if ($this->isValidResult($result)) {
$this->resetConsecutiveFailures($pid);
$this->lastCheckTime[$pid] = time();
$this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})", 'info');
$attempt = $maxAttempts; // 退出循环
} else {
throw new Exception("重建连接后查询无效");
}
} catch (Exception $e2) {
$sMsg2 = empty($e2->getMessage()) ? '' : $e2->getMessage();
$this->log("进程[{$pid}]重连失败:{$sMsg2}", 'error');
}
}
/**
* 安全执行查询兼容框架Db方法带超时控制
*/
private function safeQuery($sql, $timeout)
{
$start = microtime(true);
// 使用框架Db::query()确保与业务中Db::insert()使用相同的连接机制
$result = Db::query($sql);
// 代码层面控制超时不依赖database.php配置
if (microtime(true) - $start > $timeout) {
throw new Exception("查询超时({$timeout}秒)");
}
return $result;
}
/**
* 清理连接资源(仅影响当前进程,不干扰现有系统)
*/
private function cleanupConnections()
{
// 关闭当前进程的框架连接(不影响其他进程)
Db::close();
// 清除Db类静态缓存仅当前进程
$this->clearDbInstanceCache();
// 保留系统缓存,避免影响现有业务
}
/**
* 清除Db类实例缓存确保新连接能被正确创建
*/
private function clearDbInstanceCache()
{
static $reflection = null;
static $instanceProp = null;
if (!$reflection) {
try {
$reflection = new \ReflectionClass('\think\Db');
$instanceProp = $reflection->getProperty('instance');
$instanceProp->setAccessible(true);
} catch (\ReflectionException $e) {
$this->log("反射初始化失败:{$e->getMessage()}", 'error');
return;
}
}
try {
// 仅清空当前进程的Db实例缓存不影响其他进程如Web请求
$instanceProp->setValue(null, []);
} catch (\ReflectionException $e) {
$this->log("清除Db缓存失败{$e->getMessage()}", 'error');
}
}
/**
* 计算等待时间(针对队列优化)
*/
private function calculateWaitTime($errorMsg, $attempt, $baseWait)
{
$isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false;
$isTimeout = stripos($errorMsg, 'timeout') !== false;
if ($isGoneAway) {
return $baseWait * pow(2, $attempt); // 致命错误1→2→4秒
} elseif ($isTimeout) {
return $baseWait * pow(1.5, $attempt); // 超时错误1→1.5→2.25秒
} else {
return $baseWait * pow(1.2, $attempt); // 普通错误1→1.2→1.44秒
}
}
/**
* 验证查询结果有效性
*/
private function isValidResult($result)
{
return is_array($result) && !empty($result)
&& isset(current($result)['1']) && current($result)['1'] == 1;
}
/**
* 连续失败计数管理
*/
private function initConsecutiveFailures($pid)
{
if (!isset($this->consecutiveFailures[$pid])) {
$this->consecutiveFailures[$pid] = 0;
}
}
private function incrementConsecutiveFailures($pid)
{
$this->consecutiveFailures[$pid]++;
if ($this->consecutiveFailures[$pid] >= $this->dbConfig['reconnect_threshold']) {
$this->alert("进程[{$pid}]连续连接失败{$this->consecutiveFailures[$pid]}次,可能存在数据库隐患");
}
}
private function resetConsecutiveFailures($pid)
{
$this->consecutiveFailures[$pid] = 0;
}
/**
* 日志记录(仅输出到队列控制台,不干扰系统日志)
*/
private function log($message, $level = 'info')
{
$logTime = date('Y-m-d H:i:s');
$content = "[{$logTime}] [{$level}] {$message}";
// 仅在队列Worker控制台输出不写入系统日志文件
echo $content . "\n";
}
/**
* 告警通知(独立日志文件,不干扰现有系统)
*/
private function alert($message)
{
$alertFile = RUNTIME_PATH . "log/queue_db_alert_" . date('Ymd') . ".log";
file_put_contents($alertFile, "[".date('Y-m-d H:i:s')."] ALERT: {$message}\n", FILE_APPEND | LOCK_EX);
}
/**
* 安全睡眠支持Worker正常终止
*/
private function safeSleep($seconds)
{
$interval = 1;
while ($seconds > 0) {
if (connection_aborted() || $this->isWorkerStopped()) {
throw new Exception("队列Worker已终止中断睡眠");
}
$sleep = min($interval, $seconds);
sleep($sleep);
$seconds -= $sleep;
}
}
/**
* 检测Worker是否已停止兼容TP5.0机制)
*/
private function isWorkerStopped()
{
$stopFile = RUNTIME_PATH . 'queue/stop_worker';
return file_exists($stopFile);
}
}