oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); } public function fire(Job $job, $data) { //任务开始判断 $this->oQueueJob->init($job); // 获取 Redis 任务的原始数据 $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; $this->oQueueJob->log("-----------队列任务开始-----------"); $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); try { // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; //作者邮箱 $email = empty($data['email']) ? '' : $data['email']; //邮件主题 $title = empty($data['title']) ? '' : $data['title']; //发送来源 $from_name = empty($data['from_name']) ? '' : $data['from_name']; //邮件内容 $content = empty($data['content']) ? '' : $data['content']; //邮箱 $memail = empty($data['memail']) ? '' : $data['memail']; //密码 $mpassword = empty($data['mpassword']) ? '' : $data['mpassword']; //文章作者 $article_author_id = empty($data['article_author_id']) ? 0 : $data['article_author_id']; //关联文章ID $related_article_id = empty($data['related_article_id']) ? 0 : $data['related_article_id']; //期刊ID $journal_id = empty($data['journal_id']) ? '' : $data['journal_id']; //期刊issn $journal_issn = empty($data['journal_issn']) ? '' : $data['journal_issn']; if (empty($iArticleId) || empty($email)) { $this->oQueueJob->log("无效的article_id,删除任务"); $job->delete(); return; } // 生成Redis键并尝试获取锁 $sClassName = get_class($this); $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$related_article_id}:{$article_author_id}:{$email}"; $sRedisValue = uniqid() . '_' . getmypid(); if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { return; // 未获取到锁,已处理 } // 执行核心任务 //查询是否发送过邮件 $oJournalArticle = new JournalArticle; $aLog = json_decode($oJournalArticle::getLog(['article_id' => $iArticleId,'article_author_id' => $article_author_id,'related_article_id' => $related_article_id,'is_success' => 1]),true); $sMsg = '邮件已发送'; if(empty($aLog['data'])){ $aResult = sendEmail($email,$title,$from_name,$content,$memail,$mpassword); $iStatus = empty($aResult['status']) ? 1 : $aResult['status']; $iIsSuccess = 2; $sMsg = empty($aResult['data']) ? '失败' : $aResult['data']; if($iStatus == 1){ $iIsSuccess = 1; $sMsg = '成功'; } //记录邮件发送日志 $aEmailLog = ['article_id' => $iArticleId,'article_author_id' => $article_author_id,'related_article_id' => $related_article_id,'email' => $email,'content' => $content,'create_time' => time(),'is_success' => $iIsSuccess,'journal_id' => $journal_id,'journal_issn' => $journal_issn,'msg' => $sMsg]; //添加邮件发送日志 $iId = JournalArticle::addLog($aEmailLog); } // 更新完成标识 $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (RuntimeException $e) { $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } catch (LogicException $e) { $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); } catch (Exception $e) { $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { $this->oQueueJob->finnal(); } } }