From 7d43e1a4dd66268fb9632944a4211827cf6890a4 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:08:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E9=85=8D=E7=BD=AE=E8=B0=83?= =?UTF-8?q?=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/common/QueueJob.php | 428 +++++++++++++------------------- 1 file changed, 176 insertions(+), 252 deletions(-) diff --git a/application/common/QueueJob.php b/application/common/QueueJob.php index d4b194f..756bc7e 100644 --- a/application/common/QueueJob.php +++ b/application/common/QueueJob.php @@ -2,6 +2,7 @@ namespace app\common; use think\Db; +use think\Cache; use app\common\QueueRedis; class QueueJob @@ -11,116 +12,11 @@ class QueueJob private $logPath; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - } - - /** - * 记录任务开始 - * @param array $aParam - * @return int 日志ID,失败返回0 - */ - public function addLog($aParam = []) - { - - $sJobId = empty($aParam['job_id']) ? uniqid() : $aParam['job_id']; - return $sJobId; - - // 数据过滤(只保留必填字段) - $aInsert = []; - foreach ($this->aField as $field) { - if (isset($aParam[$field])) { - $aInsert[$field] = $aParam[$field]; - } - } - - // 补充默认值 - if (!isset($aInsert['create_time'])) { - $aInsert['create_time'] = time(); - } - if (!isset($aInsert['update_time'])) { - $aInsert['update_time'] = $aInsert['create_time']; - } - - try { - return Db::name('wechat_queue_logs')->insertGetId($aInsert); - } catch (\Exception $e) { - $this->log("添加任务日志失败: " . $e->getMessage() . " | 参数: " . json_encode($aInsert, self::JSON_OPTIONS)); - return 0; - } - } - - /** - * 记录任务状态更新 - * @param array $aParam - * @return bool - */ - public function updateLog($aParam = []) - { - return true; - $iLogId = empty($aParam['log_id']) ? 0 : $aParam['log_id']; - if (empty($iLogId)) { - $this->log("更新日志失败: 缺少log_id"); - return false; - } - - // 数据过滤 - $aUpdate = []; - foreach ($this->aField as $field) { - if (isset($aParam[$field])) { - $aUpdate[$field] = $aParam[$field]; - } - } - - // 强制更新时间 - $aUpdate['update_time'] = time(); - - try { - return Db::name('wechat_queue_logs') - ->where('log_id', $iLogId) - ->limit(1) - ->update($aUpdate) > 0; - } catch (\Exception $e) { - $this->log("更新任务日志失败 [ID:{$iLogId}]: " . $e->getMessage()); - return false; - } - } - - /** - * 设置日志路径并确保目录存在 - * @param string $logPath - * @throws \RuntimeException - */ - public function ensureLogDirExists($logPath = '') - { - if (empty($logPath)) { - $error = "日志路径不能为空"; - $this->log($error); - return $error; - } - - $this->logPath = $logPath; - $logDir = dirname($this->logPath); - - // 检查并创建目录(处理权限问题) - if (!is_dir($logDir)) { - $oldUmask = umask(0); - $created = mkdir($logDir, 0755, true); - umask($oldUmask); - - if (!$created || !is_dir($logDir)) { - $error = "无法创建日志目录: {$logDir} (权限不足)"; - $this->log($error); - return $error; - } - } } /** @@ -129,127 +25,8 @@ class QueueJob */ public function log($message) { - // 防止缓冲区溢出 - if (count($this->logBuffer) >= 1000) { - $this->flushLog(); - } - - $time = date('H:i:s'); - $this->logBuffer[] = "[$time] $message\n"; - - // 缓冲区满或超时则刷新 - if (count($this->logBuffer) >= 50 || time() - $this->lastLogTime > 10) { - $this->flushLog(); - } - } - - /** - * 刷新日志缓冲区到文件 - */ - public function flushLog() - { - if (empty($this->logBuffer)) { - return; - } - - // 检查日志路径是否设置 - if (empty($this->logPath)) { - $this->logBuffer = []; - return; - } - - // 检查文件大小并处理 - $this->checkAndTruncateLog(); - - $fp = fopen($this->logPath, 'a'); - if ($fp === false) { - // 紧急写入失败日志(避免递归) - $errorMsg = "[" . date('H:i:s') . "] 错误: 无法打开日志文件 {$this->logPath}\n"; - error_log($errorMsg); // 写入系统日志 - $this->logBuffer = []; - return; - } - - try { - // 尝试获取文件锁 - if (flock($fp, LOCK_EX)) { - fwrite($fp, implode('', $this->logBuffer)); - flock($fp, LOCK_UN); - } else { - // 无锁情况下尝试写入 - fwrite($fp, implode('', $this->logBuffer)); - $this->logBuffer[] = "[" . date('H:i:s') . "] 警告: 日志写入未加锁,可能存在冲突风险\n"; - } - } catch (\Exception $e) { - $errorMsg = "[" . date('H:i:s') . "] 错误: 写入日志失败: {$e->getMessage()}\n"; - fwrite($fp, $errorMsg); - } finally { - fclose($fp); - } - - $this->logBuffer = []; - $this->lastLogTime = time(); - } - - /** - * 检查日志文件大小,超过限制则清空 - */ - public function checkAndTruncateLog() - { - if (empty($this->logPath) || !file_exists($this->logPath)) { - return; - } - - // 清除文件状态缓存并获取大小 - clearstatcache(true, $this->logPath); - $fileSize = @filesize($this->logPath); - - if ($fileSize === false) { - $this->log("错误: 无法获取日志文件大小 {$this->logPath}"); - return; - } - - if ($fileSize >= $this->logMaxSize) { - $fp = fopen($this->logPath, 'w'); - if ($fp === false) { - $this->log("错误: 无法清空日志文件 {$this->logPath}"); - return; - } - - try { - if (flock($fp, LOCK_EX)) { - // 二次检查文件大小(避免竞态条件) - clearstatcache(true, $this->logPath); - if (filesize($this->logPath) >= $this->logMaxSize) { - ftruncate($fp, 0); - $this->log("日志文件超过" . $this->formatFileSize($this->logMaxSize) . ",已清空"); - } - flock($fp, LOCK_UN); - } - } catch (\Exception $e) { - $this->log("错误: 清空日志文件失败: {$e->getMessage()}"); - } finally { - fclose($fp); - } - } - } - - /** - * 格式化文件大小(字节转人类可读格式) - * @param int $bytes - * @return string - */ - public function formatFileSize($bytes) - { - if ($bytes <= 0) { - return '0 B'; - } - - $units = ['B', 'KB', 'MB', 'GB', 'TB']; - $unitIndex = min(floor(log($bytes, 1024)), count($units) - 1); - $size = $bytes / pow(1024, $unitIndex); - - return number_format($size, 2) . ' ' . $units[$unitIndex]; + $log = date("Y-m-d H:i:s") . " " . $message . "\n"; + echo $log; } /** @@ -278,25 +55,17 @@ class QueueJob /** * 处理可重试异常 * @param \Exception $e - * @param int $iLogId * @param string $sRedisKey + * @param string $sRedisValue * @param \think\queue\Job $job */ - public function handleRetryableException($e, $iLogId, $sRedisKey, $job) + public function handleRetryableException($e,$sRedisKey,$sRedisValue,$job) { $sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage(); $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); $this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}"); - if ($iLogId > 0) { - $this->updateLog([ - 'log_id' => $iLogId, - 'status' => 2, - 'error' => $sMsg . ':' . $sTrace, - ]); - } - - $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600); + $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue); $attempts = $job->attempts(); if ($attempts >= $this->maxRetries) { @@ -312,34 +81,189 @@ class QueueJob /** * 处理不可重试异常 * @param \Exception $e - * @param int $iLogId * @param string $sRedisKey + * @param string $sRedisValue * @param \think\queue\Job $job */ - public function handleNonRetryableException($e, $iLogId, $sRedisKey, $job) + public function handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job) { $sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage(); $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); $this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}"); - - if ($iLogId > 0) { - $this->updateLog([ - 'log_id' => $iLogId, - 'status' => 3, // 3:不可重试失败 - 'error' => $sMsg . ':' . $sTrace, - ]); - } - - $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600); + $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue); $this->log("不可重试错误,直接删除任务 | 执行日志:{$sMsg}"); $job->delete(); } /** - * 析构函数:确保最后日志被写入 + * 检查并重建数据库连接 */ - public function __destruct() + public function checkDbConnectionBak() { - $this->flushLog(); + return true; + $maxAttempts = 2; // 最大重试次数 + $attempt = 0; + while ($attempt < $maxAttempts) { + try { + // 尝试查询以验证连接 + Db::query('SELECT 1'); + return true; // 连接有效,直接返回 + } catch (\Exception $e) { + $attempt++; + $waitTime = pow(2, $attempt); // 指数退避:2s, 4s, 8s... + // 记录连接失败日志 + $sMsg = empty($e->getMessage()) ? '检查失败' : $e->getMessage(); + $this->log("数据库连接检查失败(尝试{$attempt}/{$maxAttempts}): {$sMsg}"); + // 关闭所有连接 + Db::close(); + + // 最后一次尝试不需要等待 + if ($attempt < $maxAttempts) { + $this->log("{$waitTime}秒后尝试重新连接..."); + sleep($waitTime); // 等待一段时间再重试 + } + // 尝试重新连接 + try { + Db::connect(); + Db::query('SELECT 1'); // 验证重连成功 + $this->log("数据库连接已重建(尝试{$attempt}/{$maxAttempts})"); + return true; + } catch (\Exception $e2) { + $this->log("数据库重连尝试{$attempt}/{$maxAttempts}失败: {$e2->getMessage()}"); + } + } + } + + // 所有重试都失败 + $this->log("数据库连接异常,已达到最大重试次数({$maxAttempts})"); + return false; + } + + + /** + * 数据库连接检查与重建(高可用版) + * 解决 MySQL server has gone away 等连接超时问题 + * @param bool $force 是否强制检查(忽略缓存时间) + * @return bool 连接是否有效 + */ + public function checkDbConnection($force = false) + { + return true; + // 1. 用进程ID隔离静态变量,避免多Worker进程互相干扰 + // 每个队列Worker是独立进程,静态变量需进程隔离 + static $lastCheckTime = []; + $pid = getmypid(); // 获取当前进程ID + $checkInterval = 60; // 自动检查间隔(秒) + + // 非强制检查且未到间隔时间,直接返回有效(减少性能消耗) + if (!$force && isset($lastCheckTime[$pid]) && (time() - $lastCheckTime[$pid] < $checkInterval)) { + return true; + } + + // 2. 配置重试参数 + $maxAttempts = 3; // 最大重试次数 + $attempt = 0; // 当前尝试次数 + $baseWait = 2; // 基础等待时间(秒) + + // 3. 循环重试连接 + while ($attempt < $maxAttempts) { + try { + // 执行轻量查询验证连接(DUAL是MySQL伪表,效率极高) + $result = Db::query('SELECT 1 FROM DUAL'); + // 验证查询结果是否有效 + if (is_array($result) && !empty($result)) { + $lastCheckTime[$pid] = time(); + $this->log("进程[{$pid}]数据库连接有效"); + return true; + } else { + throw new Exception("连接验证失败:查询结果异常"); + } + } + // 优先捕获PDO底层异常(数据库连接错误多为此类) + catch (PDOException $e) { + $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime); + } + // 捕获框架层异常 + catch (Exception $e) { + $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime); + } finally { + $attempt++; // 无论成功失败,计数+1 + } + } + + // 4. 达到最大重试次数,返回失败 + $this->log("进程[{$pid}]数据库连接异常,已达最大重试次数({$maxAttempts})"); + return false; + } + + /** + * 处理连接错误的统一逻辑 + * @param Exception $e 异常对象 + * @param int $pid 进程ID + * @param int $attempt 当前尝试次数 + * @param int $maxAttempts 最大尝试次数 + * @param int $baseWait 基础等待时间 + * @param array $lastCheckTime 检查时间记录 + */ + private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait, &$lastCheckTime) + { + $errorMsg = empty($e->getMessage()) ? '未知数据库错误' : $e->getMessage(); + $errorCode = empty($e->getCode()) ? 0 : $e->getCode(); + + // 记录错误详情(含进程ID便于排查多进程问题) + $this->log("进程[{$pid}]连接检查失败(尝试{$attempt}/{$maxAttempts}):{$errorMsg}(错误码:{$errorCode})"); + + // 5. 强制清理当前进程的无效连接 + Db::close(); // 关闭框架层面的连接 + $this->clearDbInstanceCache(); // 清除框架连接缓存(关键步骤) + cache('db_connection_status', null); + + // 最后一次尝试无需等待,直接重试 + if ($attempt + 1 >= $maxAttempts) { + return; + } + + // 6. 差异化等待策略(针对特定错误延长等待) + $isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false; + // 普通错误:2^1=2s → 2^2=4s;致命错误:3^1=3s → 3^2=9s + $waitTime = $isGoneAway ? $baseWait * pow(3, $attempt) : $baseWait * pow(2, $attempt); + + $this->log("进程[{$pid}]将在{$waitTime}秒后重试..."); + sleep($waitTime); // 等待指定时间 + + // 7. 尝试重建连接并二次验证 + try { + // 强制重建连接(第二个参数true表示忽略缓存) + Db::connect(config('database'), true); + + // 执行验证查询 + $result = Db::query('SELECT 1 FROM DUAL'); + // 检查结果是否有效 + if (is_array($result) && !empty($result)) { + $lastCheckTime[$pid] = time(); + $this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})"); + $attempt = $maxAttempts; // 标记成功并退出循环 + } else { + throw new Exception("重建连接后查询结果异常"); + } + } catch (Exception $e2) { + $this->log("进程[{$pid}]重连失败(尝试{$attempt}/{$maxAttempts}):{$e2->getMessage()}"); + } + } + + /** + * 清除ThinkPHP5的Db类实例缓存(解决框架连接缓存问题) + * 核心原理:通过反射突破私有属性访问限制 + */ + private function clearDbInstanceCache() + { + try { + $reflection = new \ReflectionClass('\think\Db'); + $instanceProp = $reflection->getProperty('instance'); // 获取Db类的instance属性 + $instanceProp->setAccessible(true); // 设为可访问 + $instanceProp->setValue(null, []); // 清空静态缓存的连接实例 + } catch (\ReflectionException $e) { + $this->log("清除数据库实例缓存失败:{$e->getMessage()}"); + } } } \ No newline at end of file