QueueRedis = QueueRedis::getInstance(); // 初始化进程启动时间(仅在进程首次启动时执行) if (is_null(self::$processStartTime)) { self::$processStartTime = time(); // 增加进程ID标识 $pid = getmypid(); $this->log("队列进程启动 [PID:{$pid}],启动时间:" . date('Y-m-d H:i:s', self::$processStartTime)); } //任务开始时间 $this->startTime = microtime(true); } /** * 任务初始化验证 * @return bool */ public function init($job){ // 检查进程是否已超时,提前退出 if ($this->isProcessTimeout(self::$processStartTime)) { $this->log("进程已超时,放弃处理任务"); $job->release(15); // 短延迟后重新入队 return; } // 进程超时预警 $this->checkProcessTimeoutWarning(self::$processStartTime); // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->log("Redis连接失败,10秒后重试"); $job->release(15); return; } } /** * 任务结束 * @return bool */ public function finnal(){ $executionTime = microtime(true) - $this->startTime; $this->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); gc_collect_cycles(); // 任务完成后,检查进程是否超时 if ($this->isProcessTimeout(self::$processStartTime)) { $this->log("进程已运行超过{$this->maxRunTime}秒,任务完成后自动退出以刷新连接"); exit(1); // 退出进程,触发supervisor重启 } } /** * 写入日志到缓冲区 * @param string $message */ public function log($message) { $log = date("Y-m-d H:i:s") . " " . $message . "\n"; echo $log; } /** * 获取重试延迟时间 * @param string $errorMsg * @return int */ public function getRetryDelay($errorMsg) { $delayMap = [ 'MySQL server has gone away' => 60, 'timeout' => 30, 'OpenAI' => 45, 'network' => 60 ]; foreach ($delayMap as $keyword => $delay) { if (stripos($errorMsg, $keyword) !== false) { // 不区分大小写匹配 return $delay; } } return 10; } /** * 处理可重试异常 * @param \Exception $e * @param string $sRedisKey * @param string $sRedisValue * @param \think\queue\Job $job */ public function handleRetryableException($e, $sRedisKey, $sRedisValue, $job) { $sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage(); $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); $this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}"); if ($this->isFatalDatabaseError($e)) { $this->handleDatabaseErrorAndRestartWithRetry($e, $sRedisKey, $sRedisValue, $job); return; } // 原子化更新任务状态 $this->QueueRedis->atomicJobUpdate($sRedisKey, 'failed', 3600, $sRedisValue); $attempts = $job->attempts(); // 双重限制:次数 if ($attempts >= $this->maxRetries) { $this->log("超过最大重试次数({$this->maxRetries}),停止重试 | 执行日志:{$sMsg}"); $job->delete(); } else { $delay = $this->getRetryDelay($sMsg, $attempts); // 动态延迟 $this->log("{$delay}秒后重试({$attempts}/{$this->maxRetries})"); $job->release($delay); } } /** * 处理不可重试异常 * @param \Exception $e * @param string $sRedisKey * @param string $sRedisValue * @param \think\queue\Job $job */ public function handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job) { $sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage(); $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); $this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}"); $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue); $this->log("不可重试错误,直接删除任务 | 执行日志:{$sMsg}"); $job->delete(); } // 判断是否为需要重启的致命数据库错误 private function isFatalDatabaseError(\Exception $e) { // 1. 检查是否为PDO异常 if ($e instanceof \PDOException) { $fatalCodes = [2006, 2013, 2002]; // 2006=连接断开,2013=连接丢失,2002=无法连接 $errorCode = (int)$e->getCode(); if (in_array($errorCode, $fatalCodes)) { return true; } } // 2. 检查错误消息关键词(覆盖非PDOException的数据库错误) $errorMsg = strtolower($e->getMessage()); $fatalKeywords = [ 'mysql server has gone away', 'lost connection to mysql server', 'error while sending stmt_prepare packet', 'database connection failed', 'sqlstate[hy000]' // 通用数据库错误前缀 ]; foreach ($fatalKeywords as $keyword) { if (strpos($errorMsg, $keyword) !== false) { return true; } } return false; } //处理数据库错误,释放任务后重启(保留任务) private function handleDatabaseErrorAndRestartWithRetry($e, $sRedisKey, $sRedisValue, $job) { $this->log("检测到致命数据库错误,释放任务后重启队列 | 错误: {$e->getMessage()}"); $attempts = $job->attempts(); if ($attempts >= $this->maxRetries) { $this->log("数据库错误重试达上限,标记任务失败"); $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600, $sRedisValue); $job->delete(); exit(1); } // 1. 释放Redis锁(必须先释放,否则新进程无法获取锁) if (!empty($sRedisKey) && !empty($sRedisValue)) { $this->QueueRedis->forceReleaseLock($sRedisKey, $sRedisValue); } // 2. 释放任务回队列(设置短延迟,避免重启前被其他进程处理) $job->release(60); // 60秒后重新入队,给进程重启留时间 $this->log("任务已释放回队列,等待新进程处理"); // 3. 强制退出进程,触发Supervisor重启 $this->log("数据库错误,重启进程以刷新连接,新进程将处理释放的任务"); exit(1); } /** * 获取分布式锁 * @param string $sRedisKey * @param string $sRedisValue * @param Job $job * @return bool */ public function acquireLock($sRedisKey, $sRedisValue, $job) { // 1. 前置校验:先检查是否已完成/失败(优先于锁操作) $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); if (in_array($jobStatus, ['completed', 'failed'])) { $this->log("任务已终止,删除任务 | 键: {$sRedisKey} | 状态: {$jobStatus}"); $job->delete(); return false; } // 2. 尝试获取锁 $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $this->lockExpire); if ($isLocked) { $this->log("成功获取锁 | 键: {$sRedisKey} | 锁值: {$sRedisValue}"); return true; } // 3. 锁竞争处理:二次校验状态+超时抢占 $currentLockValue = $this->QueueRedis->getRedisValue($sRedisKey); $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); // 3.1 若状态已终止,直接删除任务 if (in_array($jobStatus, ['completed', 'failed'])) { $this->log("任务已终止,删除任务 | 键: {$sRedisKey} | 状态: {$jobStatus}"); $job->delete(); return false; } // 3.2 锁已过期(值为空或TTL<=0),强制抢占 $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); if (empty($currentLockValue) || $lockTtl <= 0) { $this->log("锁已过期,强制抢占 | 键: {$sRedisKey} | TTL: {$lockTtl}"); $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $this->lockExpire); if ($isLocked) { return true; } } // 3.3 未获取到锁,按重试策略处理 $attempts = $job->attempts(); if ($attempts >= $this->maxRetries) { $this->log("超过最大重试次数({$this->maxRetries}),删除任务 | 键: {$sRedisKey}"); $job->delete(); } else { // 动态计算延迟时间(基于当前锁剩余时间) $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; $delay = min($delay, $this->maxDelay); $this->log("锁竞争,延迟{$delay}秒重试({$attempts}/{$this->maxRetries}) | 键: {$sRedisKey}"); $job->release($delay); } return false; } /** * 检查进程是否超时 * @return bool */ public function isProcessTimeout($processStartTime) { return time() - $processStartTime > $this->maxRunTime; } /** * 检查进程超时预警 */ public function checkProcessTimeoutWarning($processStartTime) { $remainingTime = $this->maxRunTime - (time() - $processStartTime); if ($remainingTime > 0 && $remainingTime < $this->warningThreshold) { $this->log("进程即将超时,剩余时间:{$remainingTime}秒"); } } }