logPath = ROOT_PATH . 'public/queue_log/SendRelatedArticleEmail_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); $this->lastLogTime = time(); // 确保日志目录存在 $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) { $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); $this->oQueueJob->flushLog(); return; } // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; //作者邮箱 $email = empty($data['email']) ? '' : $data['email']; //邮件主题 $title = empty($data['title']) ? '' : $data['title']; //发送来源 $from_name = empty($data['from_name']) ? '' : $data['from_name']; //邮件内容 $content = empty($data['content']) ? '' : $data['content']; //邮箱 $memail = empty($data['memail']) ? '' : $data['memail']; //密码 $mpassword = empty($data['mpassword']) ? '' : $data['mpassword']; //文章作者 $article_author_id = empty($data['article_author_id']) ? 0 : $data['article_author_id']; //关联文章ID $related_article_id = empty($data['related_article_id']) ? 0 : $data['related_article_id']; //期刊ID $journal_id = empty($data['journal_id']) ? '' : $data['journal_id']; //期刊issn $journal_issn = empty($data['journal_issn']) ? '' : $data['journal_issn']; if (empty($iArticleId) || empty($email)) { $this->oQueueJob->log("无效的article_id,删除任务"); $job->delete(); return; } // 生成唯一任务标识 $sClassName = get_class($this); $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$related_article_id}:{$article_author_id}:{$email}"; $sRedisValue = uniqid() . '_' . getmypid(); $lockExpire = $this->lockExpire; $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); if (!$isLocked) { $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); if (in_array($jobStatus, ['completed', 'failed'])) { $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); $job->delete(); } else { $attempts = $job->attempts(); if ($attempts >= $this->maxRetries) { $this->oQueueJob->log("超过最大重试次数,停止重试"); $job->delete(); } else { $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); $job->release($delay); } } $this->oQueueJob->flushLog(); return; } $aParam = [ 'job_id' => $sRedisKey, 'job_class' => $sClassName, 'status' => 0, 'create_time' => time(), 'params' => json_encode($data, self::JSON_OPTIONS) ]; $iLogId = $this->oQueueJob->addLog($aParam); if (!$iLogId) { $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); $job->delete(); $this->oQueueJob->flushLog(); return; } try { //查询是否发送过邮件 $oJournalArticle = new JournalArticle; $aLog = json_decode($oJournalArticle::getLog(['article_id' => $iArticleId,'article_author_id' => $article_author_id,'related_article_id' => $related_article_id,'is_success' => 1]),true); $sMsg = '邮件已发送'; if(empty($aLog['data'])){ $aResult = sendEmail($email,$title,$from_name,$content,$memail,$mpassword); $iStatus = empty($aResult['status']) ? 1 : $aResult['status']; $iIsSuccess = 2; $sMsg = empty($aResult['data']) ? '失败' : $aResult['data']; if($iStatus == 1){ $iIsSuccess = 1; $sMsg = '成功'; } //记录邮件发送日志 $aEmailLog = ['article_id' => $iArticleId,'article_author_id' => $article_author_id,'related_article_id' => $related_article_id,'email' => $email,'content' => $content,'create_time' => time(),'is_success' => $iIsSuccess,'journal_id' => $journal_id,'journal_issn' => $journal_issn,'msg' => $sMsg]; //添加邮件发送日志 $iId = JournalArticle::addLog($aEmailLog); } //更新日志 $this->oQueueJob->updateLog([ 'log_id' => $iLogId, 'status' => 1, 'update_time' => time(), 'error' => $sMsg ]); $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}"); } catch (\RuntimeException $e) { $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\LogicException $e) { $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); } catch (\Exception $e) { $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); $this->oQueueJob->flushLog(); gc_collect_cycles(); } } }