263 lines
9.2 KiB
PHP
263 lines
9.2 KiB
PHP
<?php
|
||
namespace app\common\traits;
|
||
|
||
use think\Db;
|
||
use think\Exception;
|
||
use think\db\PDOException;
|
||
use think\facade\Log;
|
||
|
||
trait QueueDbHATraitBak
|
||
{
|
||
// 可配置参数(针对队列场景优化)
|
||
private $dbConfig = [
|
||
'check_interval' => 20, // 缩短检查间隔,提升连接有效性
|
||
'max_attempts' => 4, // 增加重试次数,应对连接波动
|
||
'base_wait' => 1, // 基础等待时间(秒)
|
||
'reconnect_threshold' => 3, // 连续失败告警阈值
|
||
'fatal_error_codes' => [2006, 2013, 1053], // 致命错误码(含服务中断)
|
||
];
|
||
|
||
// 进程内状态记录(避免跨进程干扰)
|
||
private $consecutiveFailures = [];
|
||
private $lastCheckTime = [];
|
||
|
||
/**
|
||
* 队列任务执行前的连接检查(核心入口)
|
||
* @param bool $force 是否强制检查
|
||
* @return bool 连接是否可用
|
||
*/
|
||
public function checkDbConnectionTrait($force = false)
|
||
{
|
||
$pid = getmypid();
|
||
$this->initConsecutiveFailures($pid);
|
||
|
||
// 非强制检查且未到间隔,直接返回
|
||
if (!$force && isset($this->lastCheckTime[$pid])
|
||
&& (time() - $this->lastCheckTime[$pid] < $this->dbConfig['check_interval'])) {
|
||
return true;
|
||
}
|
||
|
||
$attempt = 0;
|
||
$maxAttempts = $this->dbConfig['max_attempts'];
|
||
$baseWait = $this->dbConfig['base_wait'];
|
||
|
||
while ($attempt < $maxAttempts) {
|
||
try {
|
||
// 执行轻量查询验证(使用框架Db方法,确保与业务代码一致)
|
||
$result = $this->safeQuery('SELECT 1 FROM DUAL', 2);
|
||
if ($this->isValidResult($result)) {
|
||
$this->resetConsecutiveFailures($pid);
|
||
$this->lastCheckTime[$pid] = time();
|
||
$this->log("进程[{$pid}]数据库连接有效", 'info');
|
||
return true;
|
||
}
|
||
throw new Exception("查询结果无效");
|
||
} catch (PDOException $e) {
|
||
// 致命错误加速重试
|
||
if (in_array($e->getCode(), $this->dbConfig['fatal_error_codes'])) {
|
||
$this->log("进程[{$pid}]致命错误({$e->getCode()}):{$e->getMessage()}", 'error');
|
||
$attempt = $maxAttempts - 1;
|
||
}
|
||
$this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait);
|
||
} catch (Exception $e) {
|
||
$this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait);
|
||
} finally {
|
||
$attempt++;
|
||
}
|
||
}
|
||
|
||
// 达到最大重试次数
|
||
$this->incrementConsecutiveFailures($pid);
|
||
$this->log("进程[{$pid}]连接异常,已达最大重试次数({$maxAttempts})", 'error');
|
||
return false;
|
||
}
|
||
|
||
/**
|
||
* 处理连接错误(确保后续能正常使用Db::insert())
|
||
*/
|
||
private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait)
|
||
{
|
||
$errorMsg = $e->getMessage() ?: '未知错误';
|
||
$errorCode = $e->getCode() ?: 0;
|
||
$this->log("进程[{$pid}]连接失败(尝试{$attempt}/{$maxAttempts}):{$errorMsg}(码:{$errorCode})", 'warning');
|
||
|
||
// 强制清理当前进程的连接缓存(关键:确保重建的连接能被Db::insert()使用)
|
||
$this->cleanupConnections();
|
||
|
||
// 最后一次尝试无需等待
|
||
if ($attempt + 1 >= $maxAttempts) {
|
||
return;
|
||
}
|
||
|
||
// 差异化等待策略
|
||
$waitTime = $this->calculateWaitTime($errorMsg, $attempt, $baseWait);
|
||
$this->log("进程[{$pid}]将在{$waitTime}秒后重试", 'info');
|
||
$this->safeSleep($waitTime);
|
||
|
||
// 重建连接并验证(使用框架方法,确保与业务代码兼容)
|
||
try {
|
||
// 强制重建框架连接,确保Db::insert()能使用新连接
|
||
Db::connect(config('database'), true);
|
||
$result = $this->safeQuery('SELECT 1 FROM DUAL', 2);
|
||
if ($this->isValidResult($result)) {
|
||
$this->resetConsecutiveFailures($pid);
|
||
$this->lastCheckTime[$pid] = time();
|
||
$this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})", 'info');
|
||
$attempt = $maxAttempts; // 退出循环
|
||
} else {
|
||
throw new Exception("重建连接后查询无效");
|
||
}
|
||
} catch (Exception $e2) {
|
||
$this->log("进程[{$pid}]重连失败:{$e2->getMessage()}", 'error');
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 安全执行查询(兼容框架Db方法,带超时控制)
|
||
*/
|
||
private function safeQuery($sql, $timeout)
|
||
{
|
||
$start = microtime(true);
|
||
// 使用框架Db::query(),确保与业务中Db::insert()使用相同的连接机制
|
||
$result = Db::query($sql);
|
||
|
||
// 代码层面控制超时,不依赖database.php配置
|
||
if (microtime(true) - $start > $timeout) {
|
||
throw new Exception("查询超时({$timeout}秒)");
|
||
}
|
||
return $result;
|
||
}
|
||
|
||
/**
|
||
* 清理连接资源(仅影响当前进程,不干扰现有系统)
|
||
*/
|
||
private function cleanupConnections()
|
||
{
|
||
// 关闭当前进程的框架连接(不影响其他进程)
|
||
Db::close();
|
||
// 清除Db类静态缓存(仅当前进程)
|
||
$this->clearDbInstanceCache();
|
||
// 保留系统缓存,避免影响现有业务
|
||
}
|
||
|
||
/**
|
||
* 清除Db类实例缓存(确保新连接能被正确创建)
|
||
*/
|
||
private function clearDbInstanceCache()
|
||
{
|
||
static $reflection = null;
|
||
static $instanceProp = null;
|
||
|
||
if (!$reflection) {
|
||
try {
|
||
$reflection = new \ReflectionClass('\think\Db');
|
||
$instanceProp = $reflection->getProperty('instance');
|
||
$instanceProp->setAccessible(true);
|
||
} catch (\ReflectionException $e) {
|
||
$this->log("反射初始化失败:{$e->getMessage()}", 'error');
|
||
return;
|
||
}
|
||
}
|
||
|
||
try {
|
||
// 仅清空当前进程的Db实例缓存,不影响其他进程(如Web请求)
|
||
$instanceProp->setValue(null, []);
|
||
} catch (\ReflectionException $e) {
|
||
$this->log("清除Db缓存失败:{$e->getMessage()}", 'error');
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 计算等待时间(针对队列优化)
|
||
*/
|
||
private function calculateWaitTime($errorMsg, $attempt, $baseWait)
|
||
{
|
||
$isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false;
|
||
$isTimeout = stripos($errorMsg, 'timeout') !== false;
|
||
|
||
if ($isGoneAway) {
|
||
return $baseWait * pow(2, $attempt); // 致命错误:1→2→4秒
|
||
} elseif ($isTimeout) {
|
||
return $baseWait * pow(1.5, $attempt); // 超时错误:1→1.5→2.25秒
|
||
} else {
|
||
return $baseWait * pow(1.2, $attempt); // 普通错误:1→1.2→1.44秒
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 验证查询结果有效性
|
||
*/
|
||
private function isValidResult($result)
|
||
{
|
||
return is_array($result) && !empty($result)
|
||
&& isset(current($result)['1']) && current($result)['1'] == 1;
|
||
}
|
||
|
||
/**
|
||
* 连续失败计数管理
|
||
*/
|
||
private function initConsecutiveFailures($pid)
|
||
{
|
||
if (!isset($this->consecutiveFailures[$pid])) {
|
||
$this->consecutiveFailures[$pid] = 0;
|
||
}
|
||
}
|
||
|
||
private function incrementConsecutiveFailures($pid)
|
||
{
|
||
$this->consecutiveFailures[$pid]++;
|
||
if ($this->consecutiveFailures[$pid] >= $this->dbConfig['reconnect_threshold']) {
|
||
$this->alert("进程[{$pid}]连续连接失败{$this->consecutiveFailures[$pid]}次,可能存在数据库隐患");
|
||
}
|
||
}
|
||
|
||
private function resetConsecutiveFailures($pid)
|
||
{
|
||
$this->consecutiveFailures[$pid] = 0;
|
||
}
|
||
|
||
/**
|
||
* 日志记录(仅输出到队列控制台,不干扰系统日志)
|
||
*/
|
||
private function log($message, $level = 'info')
|
||
{
|
||
$logTime = date('Y-m-d H:i:s');
|
||
$content = "[{$logTime}] [{$level}] {$message}";
|
||
// 仅在队列Worker控制台输出,不写入系统日志文件
|
||
echo $content . "\n";
|
||
}
|
||
|
||
/**
|
||
* 告警通知(独立日志文件,不干扰现有系统)
|
||
*/
|
||
private function alert($message)
|
||
{
|
||
$alertFile = RUNTIME_PATH . "log/queue_db_alert_" . date('Ymd') . ".log";
|
||
file_put_contents($alertFile, "[".date('Y-m-d H:i:s')."] ALERT: {$message}\n", FILE_APPEND | LOCK_EX);
|
||
}
|
||
|
||
/**
|
||
* 安全睡眠(支持Worker正常终止)
|
||
*/
|
||
private function safeSleep($seconds)
|
||
{
|
||
$interval = 1;
|
||
while ($seconds > 0) {
|
||
if (connection_aborted() || $this->isWorkerStopped()) {
|
||
throw new Exception("队列Worker已终止,中断睡眠");
|
||
}
|
||
$sleep = min($interval, $seconds);
|
||
sleep($sleep);
|
||
$seconds -= $sleep;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 检测Worker是否已停止(兼容TP5.0机制)
|
||
*/
|
||
private function isWorkerStopped()
|
||
{
|
||
$stopFile = RUNTIME_PATH . 'queue/stop_worker';
|
||
return file_exists($stopFile);
|
||
}
|
||
} |