From 207fce28e43fe36a9dc4d8f3bc40602eba07b9af Mon Sep 17 00:00:00 2001 From: chengxl Date: Fri, 25 Jul 2025 17:26:33 +0800 Subject: [PATCH 01/21] =?UTF-8?q?AI=E7=94=9F=E6=88=90=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/controller/Aiarticle.php | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/application/api/controller/Aiarticle.php b/application/api/controller/Aiarticle.php index ed5d4e6..bdfde04 100644 --- a/application/api/controller/Aiarticle.php +++ b/application/api/controller/Aiarticle.php @@ -85,10 +85,8 @@ class Aiarticle extends Base } //查询AI内容是否生成 - $aAiArticle = json_decode($this->getAiArticle(['article_id' => $iArticleId]),true); - $aAiArticleContent = empty($aAiArticle['data']) ? [] : $aAiArticle['data']; - $aAiArticle = empty($aAiArticleContent['ai_article']) ? [] : $aAiArticleContent['ai_article']; - + $aWhere = ['is_delete' => 2,'article_id' => $iArticleId]; + $aAiArticle = Db::name('ai_article')->where($aWhere)->find(); $iId = empty($aAiArticle['ai_article_id']) ? 0 : $aAiArticle['ai_article_id']; if(empty($aAiArticle)){ //插入t_ai_article数据 @@ -100,6 +98,7 @@ class Aiarticle extends Base } $aAiArticle = array_merge(['ai_article_id' => $iId,'article_id' => $iArticleId,'is_generate' => 2],$aInsert); } + //判断是否生成 if(!empty($aAiArticle['is_generate']) && $aAiArticle['is_generate'] == 1){ return json_encode(['status' => 5,'msg' => 'The data has been generated, please proceed with the next steps']); From eaea167b214fcee5f0966b07eb4008fd9a4d6f23 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 14:29:51 +0800 Subject: [PATCH 02/21] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/controller/Reviewer.php | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/application/api/controller/Reviewer.php b/application/api/controller/Reviewer.php index 8a337bb..7c732fd 100644 --- a/application/api/controller/Reviewer.php +++ b/application/api/controller/Reviewer.php @@ -2182,11 +2182,19 @@ class Reviewer extends Base if (isset($data['major_id'])&&$data['major_id']!=0){ $where['t_user_reviewer_info.major'] = ['in',$this->majorids($data['major_id'])]; } + + // 计算10天之后的时间戳(10天 = 10 * 24 * 60 * 60秒) + $iTeenDaysLater = strtotime('-10 days'); + + //获取邀请时间超过10天的 chengxiaoling 20250728 start //获取总条数 $count = $this->reviewer_to_journal_obj ->join("t_user", "t_user.user_id = t_reviewer_to_journal.reviewer_id", "left") ->join("t_user_reviewer_info", "t_user_reviewer_info.reviewer_id = t_reviewer_to_journal.reviewer_id", "left") - ->where($where) + ->where($where)->where(function($query) use ($iTeenDaysLater) { + $query->where('t_user_reviewer_info.last_invite_time', '<', $iTeenDaysLater) + ->whereOr('t_user_reviewer_info.last_invite_time', '=', 0); + }) ->count(); if(empty($count)){ return jsonSuccess(['reviewers' => [],'count' => 0]); @@ -2197,10 +2205,14 @@ class Reviewer extends Base ->join("t_user", "t_user.user_id = t_reviewer_to_journal.reviewer_id", "left") ->join("t_user_reviewer_info", "t_user_reviewer_info.reviewer_id = t_reviewer_to_journal.reviewer_id", "left") ->field('t_user.account,t_user.email,t_user.realname,t_user_reviewer_info.company,t_user_reviewer_info.field,t_user.user_id,t_user.rs_num') - ->where($where) + ->where($where)->where(function($query) use ($iTeenDaysLater) { + $query->where('t_user_reviewer_info.last_invite_time', '<', $iTeenDaysLater) + ->whereOr('t_user_reviewer_info.last_invite_time', '=', 0); + }) ->order('t_user.rs_num desc') ->limit($limit_start, $data['pageSize']) ->select(); + //获取邀请时间超过10天的 chengxiaoling 20250728 end if(!empty($list)){ $aUserId = array_column($list, 'user_id'); $aWhere = ['state' => 0,'reviewer_id' => ['in',$aUserId]]; From 3b00b48aef82ade1283a15fa8a7834cdbbe9ca9e Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 14:30:03 +0800 Subject: [PATCH 03/21] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/controller/Article.php | 55 ++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/application/api/controller/Article.php b/application/api/controller/Article.php index 6061ea5..9e0b370 100644 --- a/application/api/controller/Article.php +++ b/application/api/controller/Article.php @@ -2005,6 +2005,21 @@ class Article extends Base return jsonError("The article can only be added in state with editor at least"); } + //获取审稿人最后一次邀请时间 chengxiaoling 20250724 start + $iUserId = empty($data['uid']) ? 0 : $data['uid']; + if(empty($iUserId)){ + return jsonError("Please select the reviewers to invite!"); + } + //判断距离上次邀请审稿是否超过10天 + $aWhere = ['reviewer_id' => $iUserId]; + $iTeenDaysLater = strtotime('-10 days');// 计算10天之前的时间戳 + $aUserInfo = Db::name('user_reviewer_info')->field('last_invite_time')->where($aWhere)->where('t_user_reviewer_info.last_invite_time', '<', $iTeenDaysLater) + ->whereOr('t_user_reviewer_info.last_invite_time', '=', 0)->find(); + if(empty($aUserInfo)){ + return jsonError("The time since the last invitation for review by the reviewer has not exceeded 10 days!"); + } + //获取审稿人最后一次邀请时间 chengxiaoling 20250724 end + //增加信息到文章审稿表 $insert_data['reviewer_id'] = $data['uid']; $insert_data['article_id'] = $data['articleId']; @@ -2013,6 +2028,14 @@ class Article extends Base $insert_data['state'] = 5; $res = $this->article_reviewer_obj->insertGetId($insert_data); + //更新审稿人最后一次审稿时间 chengxiaoling 20250724 start + if(!empty($res) && !empty($insert_data['reviewer_id'])){ + $aUpdate = ['last_invite_time'=>time()]; + $aWhere = ['reviewer_id' => $iUserId]; + $updateResult = Db::name('user_reviewer_info')->where($aWhere)->limit(1)->update($aUpdate); + } + //更新审稿人最后一次审稿时间 chengxiaoling 20250724 end + //修改文章状态->审稿中 $this->article_obj->where('article_id', $data['articleId'])->update(['state' => 2]); @@ -3114,6 +3137,12 @@ class Article extends Base $inset_data['ctime'] = time(); $inset_data['state'] = -1; + + //新增字段是否使用AI及使用说明 chengxiaoling 20250725 start + $inset_data['is_use_ai'] = empty($data['is_use_ai']) ? 2 : $data['is_use_ai']; //是否使用AI1是2否 + $inset_data['use_ai_explain'] = isset($data['use_ai_explain']) ? $data['use_ai_explain'] : ''; + //新增字段是否使用AI及使用说明 chengxiaoling 20250725 end + $article_id = $this->article_obj->insertGetId($inset_data); } else { @@ -3141,9 +3170,20 @@ class Article extends Base $up["approval"] = 0; $up['approval_content'] = isset($data["approval_content"]) ? $data["approval_content"] : '';//trim($data['approval_content']); } + + //新增字段是否使用AI及使用说明 chengxiaoling 20250725 start + $up['is_use_ai'] = empty($data['is_use_ai']) ? 2 : $data['is_use_ai']; //是否使用AI1是2否 + $up['use_ai_explain'] = isset($data['use_ai_explain']) ? $data['use_ai_explain'] : '';//使用AI说明 + //新增字段是否使用AI及使用说明 chengxiaoling 20250725 end + $this->article_obj->where('article_id', $article_id)->update($up); } - changeArticleMajor($article_id, $data['major']); + //注释文章筛选领域添加修改为AI推荐领域,在第四步可以查看修改 chengxiaoling 20250722 + // changeArticleMajor($article_id,$data['major']); + if(!empty($article_id)){//AI推荐领域队列执行 + $sQueueId = \think\Queue::push('app\api\job\RecommendArticleField@fire',['article_id' => $article_id], 'RecommendArticleField'); + } + //注释文章筛选领域添加修改为AI推荐领域,在第四步可以查看修改 chengxiaoling 20250722 return jsonSuccess(['article_id' => $article_id]); } @@ -3451,6 +3491,15 @@ class Article extends Base } $this->article_obj->where('article_id', $data['article_id'])->update($update_l); $this->ai_scor($data['article_id']); + + //判断是否有文章领域 进行更新操作 chengxiaoling 20250722 start + $sMajorData = empty($data['article_field']) ? '' : $data['article_field'];//文章领域 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id'];//文章ID + if(!empty($sMajorData) && !empty($iArticleId)){ + $this->updateArticleField(['article_id' => $iArticleId,'article_field' => $sMajorData]); + } + //判断是否有文章领域 进行更新操作 chengxiaoling 20250722 end + return json(['code' => 0]); } @@ -4614,7 +4663,7 @@ class Article extends Base public function getArticleField($aParam = []) { - $aParam = empty($aParam) ? $this->request->post() : $this->request->post(); + $aParam = empty($aParam) ? $this->request->post() : $aParam; if (empty($aParam['article_id'])) { return json_encode(['status' => 2, 'msg' => 'Please select a Article']); @@ -4643,7 +4692,7 @@ class Article extends Base public function updateArticleField($aParam = []) { - $aParam = empty($aParam) ? $this->request->post() : $this->request->post(); + $aParam = empty($aParam) ? $this->request->post() : $aParam; $iArticleId = empty($aParam['article_id']) ? 0 : $aParam['article_id']; if (empty($iArticleId)) { return json_encode(['status' => 2, 'msg' => 'Please select a article']); From d7ef8cb57ddd756cd541a30813102e3a49de12a7 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 14:37:22 +0800 Subject: [PATCH 04/21] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/controller/Article.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/api/controller/Article.php b/application/api/controller/Article.php index 9e0b370..2bc6e34 100644 --- a/application/api/controller/Article.php +++ b/application/api/controller/Article.php @@ -3086,7 +3086,7 @@ class Article extends Base 'journal' => 'require', 'title' => 'require', 'type' => 'require', - 'major' => 'require', + // 'major' => 'require', 'abstrart' => 'require' ]); if (!$rule->check($data)) { From 3efcab006aa2edf5a9c67e05b4dfb121d3673d0b Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:06:21 +0800 Subject: [PATCH 05/21] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/common/OpenAi.php | 114 +++++++++++++++++++--------------- 1 file changed, 63 insertions(+), 51 deletions(-) diff --git a/application/common/OpenAi.php b/application/common/OpenAi.php index 7d8e2bd..e2219b6 100644 --- a/application/common/OpenAi.php +++ b/application/common/OpenAi.php @@ -24,7 +24,7 @@ class OpenAi protected $sTmrUrl = "http://journalapi.tmrjournals.com/public/index.php";//"http://zmzm.journal.dev.com/";//; protected $aArticleImportantPrompt = [ "journal_scope" => [ - 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。 + 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。 请针对问题提供客观、专业的评估,并给出简要的理由。请返回中文解释!返回格式必须严格遵循以下JSON结构:{ "journal_scope": { "assessment": "是/否", @@ -34,19 +34,19 @@ class OpenAi "criteria" => "根据文章的标题:{title};摘要:{abstrart}以及期刊范围:{scope}来判断文章是否符合目标期刊{journal_name}" ], "attribute" => [ - 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。 + 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。 请针对问题提供客观、专业的评估,并给出简要的理由。请返回中文解释!返回格式必须严格遵循以下JSON结构:{ "attribute": { "assessment": "是/否", "explanation": "请总结归纳分析" } }', - "criteria" => "请结合以下几点【研究内容的原创性:论文中的研究内容是否与已有的研究重复?是否在同样的领域提出了类似的结论,但在方法或结果上有所创新?如果有,作者是否清楚地解释了如何与之前的研究不同,或者如何在原有基础上进行扩展或改进?如果是综述文章,汇总并综合最新的研究成果,尤其是近几年内的重要发现,展示领域内最新的进展成果。作者可以识别出未被充分讨论的问题或提出新的研究问题,而不是简单文献堆砌。文章中的图表创新能否将信息的清晰呈现,方便读者理解复杂研究问题。论文方法创新性评估要点:是否采用了新的实验模型或创新的实验设计,能有效解决当前研究中的难点或空白?是否有合理的对照组和多组实验设计,确保研究结果的可靠性?是否使用了当前前沿的技术(如高通量测序、CRISPR基因编辑等),提高了实验精度或数据分析能力?是否结合了跨学科的方法(如生物信息学、人工智能等)?是否应用了多种验证手段或统计方法,确保结果的可信度?是否通过细胞实验、动物模型等多重验证,确保实验结果的可靠性?结论与数据的创新性:研究结论是否提出了新观点或新见解?是否提供了新的实验数据或观察结果,能够突破当前的研究局限?例如,发现了新的生物标志物,或对已知生物通路的作用机制提供了全新的解释】评估文章内容{content}是否有科学前沿性和创新性?" + "criteria" => "请结合以下几点【研究内容的原创性:论文中的研究内容是否与已有的研究重复?是否在同样的领域提出了类似的结论,但在方法或结果上有所创新?如果有,作者是否清楚地解释了如何与之前的研究不同,或者如何在原有基础上进行扩展或改进?如果是综述文章,汇总并综合最新的研究成果,尤其是近几年内的重要发现,展示领域内最新的进展成果。作者可以识别出未被充分讨论的问题或提出新的研究问题,而不是简单文献堆砌。文章中的图表创新能否将信息的清晰呈现,方便读者理解复杂研究问题。论文方法创新性评估要点:是否采用了新的实验模型或创新的实验设计,能有效解决当前研究中的难点或空白?是否有合理的对照组和多组实验设计,确保研究结果的可靠性?是否使用了当前前沿的技术(如高通量测序、CRISPR基因编辑等),提高了实验精度或数据分析能力?是否结合了跨学科的方法(如生物信息学、人工智能等)?是否应用了多种验证手段或统计方法,确保结果的可信度?是否通过细胞实验、动物模型等多重验证,确保实验结果的可靠性?结论与数据的创新性:研究结论是否提出了新观点或新见解?是否提供了新的实验数据或观察结果,能够突破当前的研究局限?例如,发现了新的生物标志物,或对已知生物通路的作用机制提供了全新的解释】评估文章内容{content}是否有科学前沿性和创新性?" ] //, // "contradiction" => [ - // 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。 + // 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。 // 请针对问题提供客观、专业的评估,并给出简要的理由。请返回中文解释!返回格式必须严格遵循以下JSON结构:{ // "contradiction": { // "assessment": "是/否", @@ -57,7 +57,7 @@ class OpenAi // ], // "unreasonable" => [ - // 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。 + // 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。 // 请针对问题提供客观、专业的评估,并给出简要的理由。请返回中文解释!返回格式必须严格遵循以下JSON结构:{ // "unreasonable": { // "assessment": "是/否", @@ -70,33 +70,33 @@ class OpenAi //公微问题模版 protected $aWechatQuestion = [ - 'system_message' => '您是一位医学期刊的医学科普转化专家,严格遵循用户要求的结构、语言和专业约束,不编造数据,不夸大结论,擅长将复杂的医学研究论文转化为适合微信公众号推送的专业科普内容。请根据提供的医学论文信息,按照以下严格格式生成结构化[JSON结构]输出[中文]:', + 'system_message' => '您是一位医学期刊的医学科普转化专家,严格遵循用户要求的结构、语言和专业约束,不编造数据,不夸大结论,擅长将复杂的医学研究论文转化为适合微信公众号推送的专业科普内容。请根据提供的医学论文信息,按照以下严格格式生成结构化[JSON结构]输出[中文]:', 'public_message' => [ - "covered" => "[列出文章涵盖的学科及研究方法,总字数不超过100字,学科和方法之间用逗号分隔,例如:肿瘤学,分子生物学,基因组测序,生物信息学分析]", + "covered" => "[列出文章涵盖的学科及研究方法,总字数不超过100字,学科和方法之间用逗号分隔,例如:肿瘤学,分子生物学,基因组测序,生物信息学分析]", "title_chinese" => "[将标题翻译成中文:内容需自然流畅、口语化、连贯性、学术性]" // , - // "content" => "将内容翻译成中文,需自然流畅、口语化、连贯性、学术性,保留原文的章节结构和图表编号" + // "content" => "将内容翻译成中文,需自然流畅、口语化、连贯性、学术性,保留原文的章节结构和图表编号" ], 'default' => [ - "digest" => "[学术规范翻译并提炼摘要,强调逻辑性、科学术语准确性和表达严谨性,采用段落形式,总字数不超过500字]", - "research_background" => "[提炼研究背景,采用连贯的段落形式,总字数超过200字]", - "discussion_results" => "[针对文章简单总结讨论和结果,采用连贯的段落形式,总字数超过450字]", - "research_method" => "[总结文章的研究方法,采用连贯的段落形式,总字数超过300字]", - "prospect" => "[针对稿件内容进行展望撰写,采用连贯的段落形式]", - "highlights" => "[总结归纳亮点,至少3点,每点用分号分隔]" + "digest" => "[学术规范翻译并提炼摘要,强调逻辑性、科学术语准确性和表达严谨性,采用段落形式,注意内容不要和文章内容有严重重复,总字数不超过500字]", + "research_background" => "[提炼研究背景,采用连贯的段落形式,注意内容不要和文章内容有严重重复,总字数超过200字]", + "discussion_results" => "[针对文章简单总结讨论和结果,采用连贯的段落形式,注意内容不要和文章内容有严重重复,总字数超过450字]", + "research_method" => "[总结文章的研究方法,采用连贯的段落形式,注意内容不要和文章内容有严重重复,总字数超过300字]", + "prospect" => "[针对稿件内容进行展望撰写,注意内容不要和文章内容有严重重复,采用连贯的段落形式]", + "highlights" => "[总结归纳亮点,至少3点,每点用分号分隔]" ], 'review' => [ - "overview" => "按照学术规范翻译并提炼文章概述,整体内容应大于1200字,其中应包含文章背景(不少于400字),其他内容提炼更强调逻辑性、科学术语准确性和表达的严谨性,注意内容不要有严重重复,采用连贯的段落形式", - "summary" => "针对文章结论生成一个简单总结,内容不要和文章概述重复,字数150以内", + "overview" => "按照学术规范翻译并提炼文章概述,整体内容应大于1200字,其中应包含文章背景(不少于400字),其他内容提炼更强调逻辑性、科学术语准确性和表达的严谨性,注意内容不要和文章内容有严重重复,采用连贯的段落形式", + "summary" => "针对文章结论生成一个简单总结,内容不要和文章概述重复,字数150以内", ] ]; //AI审稿提示词 protected $aReviewQuestion = [ - 'system_message' => '您是一位资深的医学期刊学术评审专家,请负责严谨、客观地评估学术文章。请根据提供的医学论文信息,按照以下严格格式生成结构化[JSON结构]输出[中文]:', + 'system_message' => '您是一位资深的医学期刊学术评审专家,请负责严谨、客观地评估学术文章。请根据提供的医学论文信息,按照以下严格格式生成结构化[JSON结构]输出[中文]:', 'default' => [ "journal_scope" => "结合标题和摘要以及期刊范围来判断文章是否符合目标期刊?", - "attribute" => "内容是否有科学前沿性和创新性?参照维度[研究内容的原创性:论文中的研究内容是否与已有的研究重复?是否在同样的领域提出了类似的结论,但在方法或结果上有所创新?如果有,作者是否清楚地解释了如何与之前的研究不同,或者如何在原有基础上进行扩展或改进?如果是综述文章,汇总并综合最新的研究成果,尤其是近几年内的重要发现,展示领域内最新的进展成果。作者可以识别出未被充分讨论的问题或提出新的研究问题,而不是简单文献堆砌。文章中的图表创新能否将信息的清晰呈现,方便读者理解复杂研究问题。论文方法创新性评估要点:是否采用了新的实验模型或创新的实验设计,能有效解决当前研究中的难点或空白?是否有合理的对照组和多组实验设计,确保研究结果的可靠性?是否使用了当前前沿的技术(如高通量测序、CRISPR基因编辑等),提高了实验精度或数据分析能力?是否结合了跨学科的方法(如生物信息学、人工智能等)?是否应用了多种验证手段或统计方法,确保结果的可信度?是否通过细胞实验、动物模型等多重验证,确保实验结果的可靠性?结论与数据的创新性:研究结论是否提出了新观点或新见解?是否提供了新的实验数据或观察结果,能够突破当前的研究局限?例如,发现了新的生物标志物,或对已知生物通路的作用机制提供了全新的解释]", + "attribute" => "内容是否有科学前沿性和创新性?参照维度[研究内容的原创性:论文中的研究内容是否与已有的研究重复?是否在同样的领域提出了类似的结论,但在方法或结果上有所创新?如果有,作者是否清楚地解释了如何与之前的研究不同,或者如何在原有基础上进行扩展或改进?如果是综述文章,汇总并综合最新的研究成果,尤其是近几年内的重要发现,展示领域内最新的进展成果。作者可以识别出未被充分讨论的问题或提出新的研究问题,而不是简单文献堆砌。文章中的图表创新能否将信息的清晰呈现,方便读者理解复杂研究问题。论文方法创新性评估要点:是否采用了新的实验模型或创新的实验设计,能有效解决当前研究中的难点或空白?是否有合理的对照组和多组实验设计,确保研究结果的可靠性?是否使用了当前前沿的技术(如高通量测序、CRISPR基因编辑等),提高了实验精度或数据分析能力?是否结合了跨学科的方法(如生物信息学、人工智能等)?是否应用了多种验证手段或统计方法,确保结果的可信度?是否通过细胞实验、动物模型等多重验证,确保实验结果的可靠性?结论与数据的创新性:研究结论是否提出了新观点或新见解?是否提供了新的实验数据或观察结果,能够突破当前的研究局限?例如,发现了新的生物标志物,或对已知生物通路的作用机制提供了全新的解释]", "contradiction" => "内容是否前后矛盾或存在逻辑不一致的问题?", "unreasonable" => "内容是否有明显的不合理之处?", "ethics" => "内容是否存在伦理号缺失或明显伦理问题?", @@ -188,8 +188,8 @@ class OpenAi if(empty($aSearch)){ return []; } - $sSysMessagePrompt = '你是一位专业的医学翻译专家,请将用户提供的内容准确、流畅地翻译成中文。翻译需自然流畅、口语化、连贯性、学术性,保留原文的专业术语和逻辑结构'; - $sUserPrompt = '"将以下内容翻译为中文,仅返回翻译结果,不要解释:\n {#content#}"'; + $sSysMessagePrompt = '你是一位专业的医学翻译专家,请将用户提供的内容准确、流畅地翻译成中文。翻译需自然流畅、口语化、连贯性、学术性,保留原文的专业术语和逻辑结构'; + $sUserPrompt = '"将以下内容翻译为中文,仅返回翻译结果,不要解释:\n {#content#}"'; $sUserPrompt = str_replace(array_keys($aSearch), array_values($aSearch), $sUserPrompt); $aMessage = [ ['role' => 'system', 'content' => $sSysMessagePrompt], @@ -199,7 +199,7 @@ class OpenAi $aMessage = [ 'model' => empty($aSearch['model']) ? 'gpt-4.1' : $aSearch['model'], 'messages' => $aMessage, - 'temperature' => 0.2,// 降低随机性(0-1,0为最确定) + 'temperature' => 0.2,// 降低随机性(0-1,0为最确定) ]; $aResult = json_decode($this->curlOpenAIStream($aMessage),true); $sJsonData = empty($aResult['data']) ? '' : $aResult['data']; @@ -283,7 +283,7 @@ class OpenAi $data = [ 'model' => $model, 'messages' => $aMessage, - 'temperature' => 0.2,// 降低随机性(0-1,0为最确定) + 'temperature' => 0.2,// 降低随机性(0-1,0为最确定) ]; $this->curl = curl_init(); @@ -357,8 +357,13 @@ class OpenAi $data = [ 'model' => $model, 'messages' => $aMessage, - 'temperature' => 0.2,// 降低随机性(0-1,0为最确定) - 'stream' => true // 关键:启用流式传输,避免超时 + // 'temperature' => 0.2,// 降低随机性(0-1,0为最确定) + 'temperature' => 0.6, // 中等随机性 + // 'max_tokens' => 1000, + 'top_p' => 0.8, + 'frequency_penalty' => 0.3, + 'presence_penalty' => 0.2, + 'stream' => true // 关键:启用流式传输,避免超时 ]; // Curl通用配置 @@ -380,10 +385,10 @@ class OpenAi // === 5. 流式响应处理(核心避免超时) === $streamContent = ''; // 累积流式返回的内容 - // 回调函数:每收到一块数据就处理并保存,避免整段等待 + // 回调函数:每收到一块数据就处理并保存,避免整段等待 curl_setopt($this->curl, CURLOPT_WRITEFUNCTION, function ($curl, $data) use (&$streamContent) { $streamContent .= $data; - return strlen($data); // 必须返回数据长度,否则CURL会中断 + return strlen($data); // 必须返回数据长度,否则CURL会中断 }); //执行请求 @@ -397,11 +402,11 @@ class OpenAi //错误处理 if (!empty($curlErrno)) { - // 超时但已有部分数据:保存进度,下次从该块重试 + // 超时但已有部分数据:保存进度,下次从该块重试 if ($curlErrno == CURLE_OPERATION_TIMEDOUT && !empty($streamContent)) { return json_encode([ 'status' => 3, - 'msg' => "处理超时,已保存进度", + 'msg' => "处理超时,已保存进度", ]); } // 其他错误(如网络问题) @@ -448,27 +453,34 @@ class OpenAi } //记录处理开始 $iNum = count($aMessage); - $sRedisKey = 'ai_create_article_'.$iId; - $this->oQueueRedis->recordQuestionProcessingStart($sRedisKey,$iNum); - //定义空数组 - $aChunkResult = $aFail = []; - $batchId = uniqid(); - $iQueueCount1 = $iQueueCount2 = 0; - foreach ($aMessage as $key => $value) { - $aParam['messages'] = $value; - $aParam['chunkIndex'] = $key; - $aParam['count_num'] = $iNum; - // if($key%2 == 0){ - $aParam['key_name'] = 'queue_1_completed'; - Queue::push('app\api\job\createFieldForQueue@fire', $aParam, 'createFieldForQueue'); - // }else{ - // $aParam['url'] = $this->sAiUrl; - // $aParam['key_name'] = 'queue_2_completed'; - // Queue::push('app\api\job\createFieldForQueue@fire', $aParam, 'createFieldForQueueBak'); - // } - + $sRedisKey = 'queue_job:ai_create_article:'.$iId; + $result = $this->oQueueRedis->recordQuestionProcessingStart($sRedisKey,$iNum); + $result = empty($result) ? 0 : $result; + if($result == 1){ + //定义空数组 + foreach ($aMessage as $key => $value) { + $aParam['messages'] = $value; + $aParam['chunkIndex'] = $key; + $aParam['count_num'] = $iNum; + // if($key%2 == 0){ + $aParam['key_name'] = 'queue_1_completed'; + Queue::push('app\api\job\createFieldForQueue@fire', $aParam, 'createFieldForQueue'); + // }else{ + // $aParam['url'] = $this->sAiUrl; + // $aParam['key_name'] = 'queue_2_completed'; + // Queue::push('app\api\job\createFieldForQueue@fire', $aParam, 'createFieldForQueueBak'); + // } + + } + return json_encode(['status' => 1, 'msg' => 'Content is being generated, please wait']); } - return json_encode(['status' => 1, 'msg' => 'Content is being generated, please wait']); + if($result == 2){ + return json_encode(['status' => 3, 'msg' => 'The data has been generated, please proceed with the next steps']); + } + if($result == 3){ + return json_encode(['status' => 4, 'msg' => 'Content is being generated, please wait']); + } + return json_encode(['status' => 5, 'msg' => 'Redis write failure']); } /** * 微信公众号-生成内容队列形式 @@ -491,11 +503,11 @@ class OpenAi $aResult = $this->curlOpenAIStream($aParam); //更新处理进度 $iIndex = empty($aParam['chunkIndex']) ? 0 : $aParam['chunkIndex']; - $sRedisKey = 'ai_create_article_'.$iId; + $sRedisKey = 'queue_job:ai_create_article:'.$iId; $sKeyName = empty($aParam['key_name']) ? 'queue_1_completed' : $aParam['key_name']; $iProgress = $this->oQueueRedis->updateQuestionProcessingProgress($sRedisKey,$sKeyName); //保存内容 - $sRedisKey = 'ai_create_article_progress_'.$iId; + $sRedisKey = 'queue_job:ai_create_article_progress:'.$iId; $this->oQueueRedis->saveChunkProgress($sRedisKey, $iIndex,$aResult); //更新入库 @@ -572,7 +584,7 @@ class OpenAi * 从文本中提取被```json```和```包裹的JSON内容并解析 * @param string $text 包含JSON代码块的文本 * @param bool $assoc 是否返回关联数组(默认true) - * @return array|object 解析后的JSON数据,失败时返回null + * @return array|object 解析后的JSON数据,失败时返回null */ public function extractAndParse($text, $assoc = true){ From 7d43e1a4dd66268fb9632944a4211827cf6890a4 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:08:43 +0800 Subject: [PATCH 06/21] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/common/QueueJob.php | 428 +++++++++++++------------------- 1 file changed, 176 insertions(+), 252 deletions(-) diff --git a/application/common/QueueJob.php b/application/common/QueueJob.php index d4b194f..756bc7e 100644 --- a/application/common/QueueJob.php +++ b/application/common/QueueJob.php @@ -2,6 +2,7 @@ namespace app\common; use think\Db; +use think\Cache; use app\common\QueueRedis; class QueueJob @@ -11,116 +12,11 @@ class QueueJob private $logPath; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - } - - /** - * 记录任务开始 - * @param array $aParam - * @return int 日志ID,失败返回0 - */ - public function addLog($aParam = []) - { - - $sJobId = empty($aParam['job_id']) ? uniqid() : $aParam['job_id']; - return $sJobId; - - // 数据过滤(只保留必填字段) - $aInsert = []; - foreach ($this->aField as $field) { - if (isset($aParam[$field])) { - $aInsert[$field] = $aParam[$field]; - } - } - - // 补充默认值 - if (!isset($aInsert['create_time'])) { - $aInsert['create_time'] = time(); - } - if (!isset($aInsert['update_time'])) { - $aInsert['update_time'] = $aInsert['create_time']; - } - - try { - return Db::name('wechat_queue_logs')->insertGetId($aInsert); - } catch (\Exception $e) { - $this->log("添加任务日志失败: " . $e->getMessage() . " | 参数: " . json_encode($aInsert, self::JSON_OPTIONS)); - return 0; - } - } - - /** - * 记录任务状态更新 - * @param array $aParam - * @return bool - */ - public function updateLog($aParam = []) - { - return true; - $iLogId = empty($aParam['log_id']) ? 0 : $aParam['log_id']; - if (empty($iLogId)) { - $this->log("更新日志失败: 缺少log_id"); - return false; - } - - // 数据过滤 - $aUpdate = []; - foreach ($this->aField as $field) { - if (isset($aParam[$field])) { - $aUpdate[$field] = $aParam[$field]; - } - } - - // 强制更新时间 - $aUpdate['update_time'] = time(); - - try { - return Db::name('wechat_queue_logs') - ->where('log_id', $iLogId) - ->limit(1) - ->update($aUpdate) > 0; - } catch (\Exception $e) { - $this->log("更新任务日志失败 [ID:{$iLogId}]: " . $e->getMessage()); - return false; - } - } - - /** - * 设置日志路径并确保目录存在 - * @param string $logPath - * @throws \RuntimeException - */ - public function ensureLogDirExists($logPath = '') - { - if (empty($logPath)) { - $error = "日志路径不能为空"; - $this->log($error); - return $error; - } - - $this->logPath = $logPath; - $logDir = dirname($this->logPath); - - // 检查并创建目录(处理权限问题) - if (!is_dir($logDir)) { - $oldUmask = umask(0); - $created = mkdir($logDir, 0755, true); - umask($oldUmask); - - if (!$created || !is_dir($logDir)) { - $error = "无法创建日志目录: {$logDir} (权限不足)"; - $this->log($error); - return $error; - } - } } /** @@ -129,127 +25,8 @@ class QueueJob */ public function log($message) { - // 防止缓冲区溢出 - if (count($this->logBuffer) >= 1000) { - $this->flushLog(); - } - - $time = date('H:i:s'); - $this->logBuffer[] = "[$time] $message\n"; - - // 缓冲区满或超时则刷新 - if (count($this->logBuffer) >= 50 || time() - $this->lastLogTime > 10) { - $this->flushLog(); - } - } - - /** - * 刷新日志缓冲区到文件 - */ - public function flushLog() - { - if (empty($this->logBuffer)) { - return; - } - - // 检查日志路径是否设置 - if (empty($this->logPath)) { - $this->logBuffer = []; - return; - } - - // 检查文件大小并处理 - $this->checkAndTruncateLog(); - - $fp = fopen($this->logPath, 'a'); - if ($fp === false) { - // 紧急写入失败日志(避免递归) - $errorMsg = "[" . date('H:i:s') . "] 错误: 无法打开日志文件 {$this->logPath}\n"; - error_log($errorMsg); // 写入系统日志 - $this->logBuffer = []; - return; - } - - try { - // 尝试获取文件锁 - if (flock($fp, LOCK_EX)) { - fwrite($fp, implode('', $this->logBuffer)); - flock($fp, LOCK_UN); - } else { - // 无锁情况下尝试写入 - fwrite($fp, implode('', $this->logBuffer)); - $this->logBuffer[] = "[" . date('H:i:s') . "] 警告: 日志写入未加锁,可能存在冲突风险\n"; - } - } catch (\Exception $e) { - $errorMsg = "[" . date('H:i:s') . "] 错误: 写入日志失败: {$e->getMessage()}\n"; - fwrite($fp, $errorMsg); - } finally { - fclose($fp); - } - - $this->logBuffer = []; - $this->lastLogTime = time(); - } - - /** - * 检查日志文件大小,超过限制则清空 - */ - public function checkAndTruncateLog() - { - if (empty($this->logPath) || !file_exists($this->logPath)) { - return; - } - - // 清除文件状态缓存并获取大小 - clearstatcache(true, $this->logPath); - $fileSize = @filesize($this->logPath); - - if ($fileSize === false) { - $this->log("错误: 无法获取日志文件大小 {$this->logPath}"); - return; - } - - if ($fileSize >= $this->logMaxSize) { - $fp = fopen($this->logPath, 'w'); - if ($fp === false) { - $this->log("错误: 无法清空日志文件 {$this->logPath}"); - return; - } - - try { - if (flock($fp, LOCK_EX)) { - // 二次检查文件大小(避免竞态条件) - clearstatcache(true, $this->logPath); - if (filesize($this->logPath) >= $this->logMaxSize) { - ftruncate($fp, 0); - $this->log("日志文件超过" . $this->formatFileSize($this->logMaxSize) . ",已清空"); - } - flock($fp, LOCK_UN); - } - } catch (\Exception $e) { - $this->log("错误: 清空日志文件失败: {$e->getMessage()}"); - } finally { - fclose($fp); - } - } - } - - /** - * 格式化文件大小(字节转人类可读格式) - * @param int $bytes - * @return string - */ - public function formatFileSize($bytes) - { - if ($bytes <= 0) { - return '0 B'; - } - - $units = ['B', 'KB', 'MB', 'GB', 'TB']; - $unitIndex = min(floor(log($bytes, 1024)), count($units) - 1); - $size = $bytes / pow(1024, $unitIndex); - - return number_format($size, 2) . ' ' . $units[$unitIndex]; + $log = date("Y-m-d H:i:s") . " " . $message . "\n"; + echo $log; } /** @@ -278,25 +55,17 @@ class QueueJob /** * 处理可重试异常 * @param \Exception $e - * @param int $iLogId * @param string $sRedisKey + * @param string $sRedisValue * @param \think\queue\Job $job */ - public function handleRetryableException($e, $iLogId, $sRedisKey, $job) + public function handleRetryableException($e,$sRedisKey,$sRedisValue,$job) { $sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage(); $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); $this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}"); - if ($iLogId > 0) { - $this->updateLog([ - 'log_id' => $iLogId, - 'status' => 2, - 'error' => $sMsg . ':' . $sTrace, - ]); - } - - $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600); + $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue); $attempts = $job->attempts(); if ($attempts >= $this->maxRetries) { @@ -312,34 +81,189 @@ class QueueJob /** * 处理不可重试异常 * @param \Exception $e - * @param int $iLogId * @param string $sRedisKey + * @param string $sRedisValue * @param \think\queue\Job $job */ - public function handleNonRetryableException($e, $iLogId, $sRedisKey, $job) + public function handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job) { $sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage(); $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); $this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}"); - - if ($iLogId > 0) { - $this->updateLog([ - 'log_id' => $iLogId, - 'status' => 3, // 3:不可重试失败 - 'error' => $sMsg . ':' . $sTrace, - ]); - } - - $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600); + $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue); $this->log("不可重试错误,直接删除任务 | 执行日志:{$sMsg}"); $job->delete(); } /** - * 析构函数:确保最后日志被写入 + * 检查并重建数据库连接 */ - public function __destruct() + public function checkDbConnectionBak() { - $this->flushLog(); + return true; + $maxAttempts = 2; // 最大重试次数 + $attempt = 0; + while ($attempt < $maxAttempts) { + try { + // 尝试查询以验证连接 + Db::query('SELECT 1'); + return true; // 连接有效,直接返回 + } catch (\Exception $e) { + $attempt++; + $waitTime = pow(2, $attempt); // 指数退避:2s, 4s, 8s... + // 记录连接失败日志 + $sMsg = empty($e->getMessage()) ? '检查失败' : $e->getMessage(); + $this->log("数据库连接检查失败(尝试{$attempt}/{$maxAttempts}): {$sMsg}"); + // 关闭所有连接 + Db::close(); + + // 最后一次尝试不需要等待 + if ($attempt < $maxAttempts) { + $this->log("{$waitTime}秒后尝试重新连接..."); + sleep($waitTime); // 等待一段时间再重试 + } + // 尝试重新连接 + try { + Db::connect(); + Db::query('SELECT 1'); // 验证重连成功 + $this->log("数据库连接已重建(尝试{$attempt}/{$maxAttempts})"); + return true; + } catch (\Exception $e2) { + $this->log("数据库重连尝试{$attempt}/{$maxAttempts}失败: {$e2->getMessage()}"); + } + } + } + + // 所有重试都失败 + $this->log("数据库连接异常,已达到最大重试次数({$maxAttempts})"); + return false; + } + + + /** + * 数据库连接检查与重建(高可用版) + * 解决 MySQL server has gone away 等连接超时问题 + * @param bool $force 是否强制检查(忽略缓存时间) + * @return bool 连接是否有效 + */ + public function checkDbConnection($force = false) + { + return true; + // 1. 用进程ID隔离静态变量,避免多Worker进程互相干扰 + // 每个队列Worker是独立进程,静态变量需进程隔离 + static $lastCheckTime = []; + $pid = getmypid(); // 获取当前进程ID + $checkInterval = 60; // 自动检查间隔(秒) + + // 非强制检查且未到间隔时间,直接返回有效(减少性能消耗) + if (!$force && isset($lastCheckTime[$pid]) && (time() - $lastCheckTime[$pid] < $checkInterval)) { + return true; + } + + // 2. 配置重试参数 + $maxAttempts = 3; // 最大重试次数 + $attempt = 0; // 当前尝试次数 + $baseWait = 2; // 基础等待时间(秒) + + // 3. 循环重试连接 + while ($attempt < $maxAttempts) { + try { + // 执行轻量查询验证连接(DUAL是MySQL伪表,效率极高) + $result = Db::query('SELECT 1 FROM DUAL'); + // 验证查询结果是否有效 + if (is_array($result) && !empty($result)) { + $lastCheckTime[$pid] = time(); + $this->log("进程[{$pid}]数据库连接有效"); + return true; + } else { + throw new Exception("连接验证失败:查询结果异常"); + } + } + // 优先捕获PDO底层异常(数据库连接错误多为此类) + catch (PDOException $e) { + $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime); + } + // 捕获框架层异常 + catch (Exception $e) { + $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime); + } finally { + $attempt++; // 无论成功失败,计数+1 + } + } + + // 4. 达到最大重试次数,返回失败 + $this->log("进程[{$pid}]数据库连接异常,已达最大重试次数({$maxAttempts})"); + return false; + } + + /** + * 处理连接错误的统一逻辑 + * @param Exception $e 异常对象 + * @param int $pid 进程ID + * @param int $attempt 当前尝试次数 + * @param int $maxAttempts 最大尝试次数 + * @param int $baseWait 基础等待时间 + * @param array $lastCheckTime 检查时间记录 + */ + private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait, &$lastCheckTime) + { + $errorMsg = empty($e->getMessage()) ? '未知数据库错误' : $e->getMessage(); + $errorCode = empty($e->getCode()) ? 0 : $e->getCode(); + + // 记录错误详情(含进程ID便于排查多进程问题) + $this->log("进程[{$pid}]连接检查失败(尝试{$attempt}/{$maxAttempts}):{$errorMsg}(错误码:{$errorCode})"); + + // 5. 强制清理当前进程的无效连接 + Db::close(); // 关闭框架层面的连接 + $this->clearDbInstanceCache(); // 清除框架连接缓存(关键步骤) + cache('db_connection_status', null); + + // 最后一次尝试无需等待,直接重试 + if ($attempt + 1 >= $maxAttempts) { + return; + } + + // 6. 差异化等待策略(针对特定错误延长等待) + $isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false; + // 普通错误:2^1=2s → 2^2=4s;致命错误:3^1=3s → 3^2=9s + $waitTime = $isGoneAway ? $baseWait * pow(3, $attempt) : $baseWait * pow(2, $attempt); + + $this->log("进程[{$pid}]将在{$waitTime}秒后重试..."); + sleep($waitTime); // 等待指定时间 + + // 7. 尝试重建连接并二次验证 + try { + // 强制重建连接(第二个参数true表示忽略缓存) + Db::connect(config('database'), true); + + // 执行验证查询 + $result = Db::query('SELECT 1 FROM DUAL'); + // 检查结果是否有效 + if (is_array($result) && !empty($result)) { + $lastCheckTime[$pid] = time(); + $this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})"); + $attempt = $maxAttempts; // 标记成功并退出循环 + } else { + throw new Exception("重建连接后查询结果异常"); + } + } catch (Exception $e2) { + $this->log("进程[{$pid}]重连失败(尝试{$attempt}/{$maxAttempts}):{$e2->getMessage()}"); + } + } + + /** + * 清除ThinkPHP5的Db类实例缓存(解决框架连接缓存问题) + * 核心原理:通过反射突破私有属性访问限制 + */ + private function clearDbInstanceCache() + { + try { + $reflection = new \ReflectionClass('\think\Db'); + $instanceProp = $reflection->getProperty('instance'); // 获取Db类的instance属性 + $instanceProp->setAccessible(true); // 设为可访问 + $instanceProp->setValue(null, []); // 清空静态缓存的连接实例 + } catch (\ReflectionException $e) { + $this->log("清除数据库实例缓存失败:{$e->getMessage()}"); + } } } \ No newline at end of file From 46c56aa4856c8bf684587ecaa4f6cf1d9e34491c Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:11:56 +0800 Subject: [PATCH 07/21] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/common/QueueRedis.php | 59 +++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/application/common/QueueRedis.php b/application/common/QueueRedis.php index c28faf7..1aaacbf 100644 --- a/application/common/QueueRedis.php +++ b/application/common/QueueRedis.php @@ -124,23 +124,38 @@ LUA; $redis = $this->connect(); } } - // 任务结束时的批量操作 -public function finishJob($sRedisKey, $status, $expire) -{ - try { - $redis = $this->connect(); - // 使用Lua脚本确保原子性 - $script = <<eval($script, [$sRedisKey, $status, $expire], 1) === 1; - } catch (\Exception $e) { - Log::error("Redis完成任务失败: {$e->getMessage()}"); - return false; - } -} + /** + * 标记任务状态并释放锁(带所有权验证) + * @param string $sRedisKey 任务锁的键名 + * @param string $status 状态(completed/failed) + * @param int $expire 状态的过期时间(秒) + * @param string $sRedisValue 锁的值(用于验证所有权) + * @return bool 是否执行成功 + */ + public function finishJob($sRedisKey, $status, $expire, $sRedisValue){ + try { + $redis = $this->connect(); + // Lua 脚本:先验证锁所有权,再设置状态并删除锁 + $script = <<eval($script, [$sRedisKey, $sRedisValue, $status, $expire], 1); + return $result === 1; + } catch (\Exception $e) { + Log::error("Redis完成任务失败: {$e->getMessage()} | 键: {$sRedisKey}"); + return false; + } + } // 记录处理进度 public function recordProcessingStart($key, $totalQuestions) { @@ -163,6 +178,14 @@ LUA; { try { $redis = $this->connect(); + //判断是否执行 + $sStatus = $redis->hGet($key, 'status'); + if (!empty($sStatus) && $sStatus == 'completed') { + return 2; + } + if (!empty($sStatus) && $sStatus == 'processing') { + return 3; + } $redis->hMSet($key, [ 'status' => 'processing', 'total' => $totalQuestions, @@ -172,9 +195,9 @@ LUA; 'queue_2_completed' => 0 ]); $redis->expire($key, 10800); // 6小时过期 - return true; + return 1; } catch (\Exception $e) { - return false; + return 4; } } // 多问题按条件拆分成两个队列更新日志记录 From 10353ee0e975b1026b0130410d4d1c44be123e31 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:21:03 +0800 Subject: [PATCH 08/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/createFieldForQueue.php | 59 ++++++--------------- 1 file changed, 17 insertions(+), 42 deletions(-) diff --git a/application/api/job/createFieldForQueue.php b/application/api/job/createFieldForQueue.php index 50fa7a2..1c38e7b 100644 --- a/application/api/job/createFieldForQueue.php +++ b/application/api/job/createFieldForQueue.php @@ -8,25 +8,17 @@ use app\common\QueueRedis; class createFieldForQueue { - private $logPath; private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/createFieldForQueue_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -34,11 +26,16 @@ class createFieldForQueue $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -47,7 +44,6 @@ class createFieldForQueue if (empty($iRedisId)) { $this->oQueueJob->log("无效的redis_id,删除任务"); $job->delete(); - $this->oQueueJob->flushLog(); return; } @@ -74,53 +70,32 @@ class createFieldForQueue $job->release($delay); } } - $this->oQueueJob->flushLog(); - return; - } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); return; } try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } $oOpenAi = new OpenAi; $aResult = json_decode($oOpenAi->createFieldForQueue($data), true); $sMsg = empty($aResult['msg']) ? '内容生成成功' : $aResult['msg']; - - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e, $sRedisKey,$sRedisValue,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From 7594e4fa31387dea659acd336bbf128e364f2aca Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:21:19 +0800 Subject: [PATCH 09/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/ArticleAiCreateContent.php | 60 ++++++------------- 1 file changed, 18 insertions(+), 42 deletions(-) diff --git a/application/api/job/ArticleAiCreateContent.php b/application/api/job/ArticleAiCreateContent.php index cccd965..0ad128e 100644 --- a/application/api/job/ArticleAiCreateContent.php +++ b/application/api/job/ArticleAiCreateContent.php @@ -7,25 +7,17 @@ use app\common\QueueRedis; use app\api\controller\Aiarticle; class ArticleAiCreateContent { - private $logPath; private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/ArticleAiCreateContent_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +25,17 @@ class ArticleAiCreateContent $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,54 +70,32 @@ class ArticleAiCreateContent $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { - + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } //生成内容 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->create($data),true); $sMsg = empty($aResult['msg']) ? '内容生成失败' : $aResult['msg']; - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From 0c6fdf01f870b8cb7ca0744a94e735c342d38402 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:24:43 +0800 Subject: [PATCH 10/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/RecommendArticleField.php | 60 ++++++------------- 1 file changed, 18 insertions(+), 42 deletions(-) diff --git a/application/api/job/RecommendArticleField.php b/application/api/job/RecommendArticleField.php index 66285a3..c2c7749 100644 --- a/application/api/job/RecommendArticleField.php +++ b/application/api/job/RecommendArticleField.php @@ -7,25 +7,17 @@ use app\common\QueueRedis; use app\common\Article; class RecommendArticleField { - private $logPath; private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/ArticleFiled_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +25,17 @@ class RecommendArticleField $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -45,7 +43,6 @@ class RecommendArticleField if (empty($iArticleId)) { $this->oQueueJob->log("无效的article_id,删除任务"); $job->delete(); - $this->oQueueJob->flushLog(); return; } @@ -72,52 +69,31 @@ class RecommendArticleField $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } $oArticle = new Article; $aResult = json_decode($oArticle->getAiField($data), true); $sMsg = empty($aResult['msg']) ? '内容生成成功' : $aResult['msg']; - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}|执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey}|执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e, $sRedisKey, $sRedisValue,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From 5c6709b35e1d33e9f79502874ae3974545073a84 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:27:01 +0800 Subject: [PATCH 11/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/RecommendReviewer.php | 58 +++++++---------------- 1 file changed, 18 insertions(+), 40 deletions(-) diff --git a/application/api/job/RecommendReviewer.php b/application/api/job/RecommendReviewer.php index 4986609..965c384 100644 --- a/application/api/job/RecommendReviewer.php +++ b/application/api/job/RecommendReviewer.php @@ -11,21 +11,14 @@ class RecommendReviewer private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/RecommendReviewer_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class RecommendReviewer $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,29 +71,16 @@ class RecommendReviewer $job->release($delay); } } - $this->oQueueJob->flushLog(); - return; - } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); return; } try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } //获取推荐审稿人信息 $aParam = ['article_id' => $iArticleId,'page' => 1,'size' => empty($data['size']) ? 5 : $data['size']]; $oReviewer = new Reviewer; @@ -130,28 +116,20 @@ class RecommendReviewer $sMsg .= empty($aResult['msg']) ? 'Reviewer data insertion failed' : $aResult['msg']; } } - //更新日志 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From 8bd1512e806b02516e260bdd1f84205e3252f72a Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:28:03 +0800 Subject: [PATCH 12/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/ReviewerScore.php | 60 ++++++++------------------- 1 file changed, 18 insertions(+), 42 deletions(-) diff --git a/application/api/job/ReviewerScore.php b/application/api/job/ReviewerScore.php index 90ed628..5d672d0 100644 --- a/application/api/job/ReviewerScore.php +++ b/application/api/job/ReviewerScore.php @@ -11,21 +11,14 @@ class ReviewerScore private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/ReviewerScore_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class ReviewerScore $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -76,57 +75,34 @@ class ReviewerScore $job->release($delay); } } - $this->oQueueJob->flushLog(); - return; - } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); return; } try { - // 执行核心任务 + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + $aParam = ['article_id' => $iArticleId,'reviewer_id' => $iReviewerId,'art_rev_id' => $iArtRevId]; $oReviewer = new Reviewer; $aResult = json_decode($oReviewer->score($aParam),true); $sMsg = empty($aResult['msg']) ? '给审稿人评分处理失败' : $aResult['msg']; - - //更新日志 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From 8a77a07e16bf4fb062710c22ea7b6d224695c77c Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:29:12 +0800 Subject: [PATCH 13/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/RevisionReviewer.php | 57 +++++++----------------- 1 file changed, 17 insertions(+), 40 deletions(-) diff --git a/application/api/job/RevisionReviewer.php b/application/api/job/RevisionReviewer.php index 5f96bd5..93529d7 100644 --- a/application/api/job/RevisionReviewer.php +++ b/application/api/job/RevisionReviewer.php @@ -11,21 +11,14 @@ class RevisionReviewer private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/RevisionReviewer_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class RevisionReviewer $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,29 +71,15 @@ class RevisionReviewer $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } //获取符合条件的文章审稿人信息 $aParam = ['article_id' => $iArticleId]; $oReviewer = new Reviewer; @@ -102,27 +87,19 @@ class RevisionReviewer $sMsg = empty($aResult['msg']) ? '审稿人同意审稿但超时未审的数据失败' : $aResult['msg']; //更新日志 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From ba50a1acf797dc1e81d0ccaf72e1d6b3388c6bc5 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:30:08 +0800 Subject: [PATCH 14/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/RelatedArticle.php | 58 ++++++++------------------ 1 file changed, 18 insertions(+), 40 deletions(-) diff --git a/application/api/job/RelatedArticle.php b/application/api/job/RelatedArticle.php index 62d0491..f35b877 100644 --- a/application/api/job/RelatedArticle.php +++ b/application/api/job/RelatedArticle.php @@ -11,21 +11,14 @@ class RelatedArticle private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/RelatedArticle_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class RelatedArticle $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,29 +71,16 @@ class RelatedArticle $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + //查询文章所关联的文章 $oJournalArticle = new JournalArticle; $aResult = json_decode(JournalArticle::get($data),true); @@ -102,27 +88,19 @@ class RelatedArticle $sMsg = empty($aResult['msg']) ? '获取相关文章信息失败' : $aResult['msg']; //更新日志 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From a122f55ac268492746de415a9cdce9a3b763ab5f Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:31:34 +0800 Subject: [PATCH 15/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/SendRelatedArticleEmail.php | 57 ++++++------------- 1 file changed, 17 insertions(+), 40 deletions(-) diff --git a/application/api/job/SendRelatedArticleEmail.php b/application/api/job/SendRelatedArticleEmail.php index 353178c..90a68e0 100644 --- a/application/api/job/SendRelatedArticleEmail.php +++ b/application/api/job/SendRelatedArticleEmail.php @@ -11,21 +11,14 @@ class SendRelatedArticleEmail private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/SendRelatedArticleEmail_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class SendRelatedArticleEmail $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -92,29 +91,15 @@ class SendRelatedArticleEmail $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } //查询是否发送过邮件 $oJournalArticle = new JournalArticle; $aLog = json_decode($oJournalArticle::getLog(['article_id' => $iArticleId,'article_author_id' => $article_author_id,'related_article_id' => $related_article_id,'is_success' => 1]),true); @@ -136,27 +121,19 @@ class SendRelatedArticleEmail } //更新日志 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From 2df1f50c5f27f1f7422852f735fdf8d895bd5d1c Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:32:27 +0800 Subject: [PATCH 16/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/SendReviewEmail.php | 62 ++++++++----------------- 1 file changed, 19 insertions(+), 43 deletions(-) diff --git a/application/api/job/SendReviewEmail.php b/application/api/job/SendReviewEmail.php index a140f98..11a1bb2 100644 --- a/application/api/job/SendReviewEmail.php +++ b/application/api/job/SendReviewEmail.php @@ -11,21 +11,14 @@ class SendReviewEmail private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/SendReviewEmail_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,16 +26,21 @@ class SendReviewEmail $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } // 获取文章ID - // 获取文章ID $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; //作者邮箱 $email = empty($data['email']) ? '' : $data['email']; @@ -62,8 +60,8 @@ class SendReviewEmail $reviewer_id = empty($data['reviewer_id']) ? 0 : $data['reviewer_id']; //邮件类型 $type = empty($data['type']) ? 1 : $data['type']; - if (empty($iArticleId)) { - $this->oQueueJob->log("无效的article_id,删除任务"); + if (empty($iArticleId) || empty($email)) { + $this->oQueueJob->log("无效的article_id/email,删除任务"); $job->delete(); return; } @@ -92,29 +90,15 @@ class SendReviewEmail $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } //查询是否发送过邮件 $oReviewer = new Reviewer; if($type != 3){ @@ -137,27 +121,19 @@ class SendReviewEmail $iId = $oReviewer->addLog($aEmailLog); } //更新日志 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From e06a31ed051c18fed4e03d6ba8e2f92edab03671 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:33:21 +0800 Subject: [PATCH 17/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/WechatDraft.php | 58 +++++++++-------------------- 1 file changed, 18 insertions(+), 40 deletions(-) diff --git a/application/api/job/WechatDraft.php b/application/api/job/WechatDraft.php index 08d70ad..031caf0 100644 --- a/application/api/job/WechatDraft.php +++ b/application/api/job/WechatDraft.php @@ -11,21 +11,14 @@ class WechatDraft private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/WechatDraft_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class WechatDraft $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,56 +71,35 @@ class WechatDraft $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + //上传草稿箱 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->syncWechat($data),true); $sMsg = empty($aResult['msg']) ? '上传草稿箱失败' : $aResult['msg']; //更新日志 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From e858a21f0bc7e9136f4a6894e1cf48fc57c03080 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:35:59 +0800 Subject: [PATCH 18/21] =?UTF-8?q?job=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/WechatDraftPublish.php | 60 +++++++-------------- application/api/job/WechatMaterial.php | 61 +++++++--------------- application/api/job/WechatQueryStatus.php | 60 +++++++-------------- 3 files changed, 59 insertions(+), 122 deletions(-) diff --git a/application/api/job/WechatDraftPublish.php b/application/api/job/WechatDraftPublish.php index 60b280e..93ef526 100644 --- a/application/api/job/WechatDraftPublish.php +++ b/application/api/job/WechatDraftPublish.php @@ -11,21 +11,14 @@ class WechatDraftPublish private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/WechatDraftPublish_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class WechatDraftPublish $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,56 +71,35 @@ class WechatDraftPublish $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + //发布草稿箱 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->publishDraft($data),true); $sMsg = empty($aResult['msg']) ? '草稿箱发布失败' : $aResult['msg']; - //更新任务状态 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新日志 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } diff --git a/application/api/job/WechatMaterial.php b/application/api/job/WechatMaterial.php index caa098d..2a2cfcf 100644 --- a/application/api/job/WechatMaterial.php +++ b/application/api/job/WechatMaterial.php @@ -11,21 +11,14 @@ class WechatMaterial private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/WechatMaterial_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class WechatMaterial $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,56 +71,36 @@ class WechatMaterial $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + //上传素材 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->uploadMaterial($data),true); $sMsg = empty($aResult['msg']) ? '上传素材失败' : $aResult['msg']; - //更新任务状态 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新日志 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } diff --git a/application/api/job/WechatQueryStatus.php b/application/api/job/WechatQueryStatus.php index aec968f..77d67f9 100644 --- a/application/api/job/WechatQueryStatus.php +++ b/application/api/job/WechatQueryStatus.php @@ -11,21 +11,14 @@ class WechatQueryStatus private $oQueueJob; private $QueueRedis; private $maxRetries = 2; - private $logBuffer = []; - private $lastLogTime = 0; - private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配 private $lockExpire = 1800; private $completedExprie = 3600; const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR; public function __construct() { - $this->logPath = ROOT_PATH . 'public/queue_log/WechatQueryStatus_' . date('Ymd') . '.log'; $this->oQueueJob = new QueueJob; $this->QueueRedis = QueueRedis::getInstance(); - $this->lastLogTime = time(); - // 确保日志目录存在 - $this->oQueueJob->ensureLogDirExists($this->logPath); } public function fire(Job $job, $data) @@ -33,11 +26,17 @@ class WechatQueryStatus $startTime = microtime(true); $this->oQueueJob->log("-----------队列任务开始-----------"); + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 if (!$this->QueueRedis->getConnectionStatus()) { $this->oQueueJob->log("Redis连接失败,10秒后重试"); $job->release(10); - $this->oQueueJob->flushLog(); return; } @@ -72,56 +71,37 @@ class WechatQueryStatus $job->release($delay); } } - $this->oQueueJob->flushLog(); return; } - - $aParam = [ - 'job_id' => $sRedisKey, - 'job_class' => $sClassName, - 'status' => 0, - 'create_time' => time(), - 'params' => json_encode($data, self::JSON_OPTIONS) - ]; - - $iLogId = $this->oQueueJob->addLog($aParam); - if (!$iLogId) { - $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS)); - $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue); - $job->delete(); - $this->oQueueJob->flushLog(); - return; - } - try { + + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + // 查询状态 $oAiarticle = new Aiarticle; $aResult = json_decode($oAiarticle->queryStatus($data),true); $sMsg = empty($aResult['msg']) ? '查询草稿箱文章是否发布失败' : $aResult['msg']; - //更新任务状态 - $this->oQueueJob->updateLog([ - 'log_id' => $iLogId, - 'status' => 1, - 'update_time' => time(), - 'error' => $sMsg - ]); - $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie); + //更新日志 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); $job->delete(); - $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}"); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); } catch (\RuntimeException $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\LogicException $e) { - $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job); } catch (\Exception $e) { - $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job); + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job); } finally { $executionTime = microtime(true) - $startTime; $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); - $this->oQueueJob->flushLog(); gc_collect_cycles(); } } From d054768c838c4336323eca58ec1df2c226e32bb0 Mon Sep 17 00:00:00 2001 From: chengxl Date: Mon, 28 Jul 2025 15:41:23 +0800 Subject: [PATCH 19/21] =?UTF-8?q?=E6=9F=A5=E7=9C=8B=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/controller/Queueinfo.php | 39 +++++++++++++++++++----- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/application/api/controller/Queueinfo.php b/application/api/controller/Queueinfo.php index 6144849..c007f33 100644 --- a/application/api/controller/Queueinfo.php +++ b/application/api/controller/Queueinfo.php @@ -33,7 +33,7 @@ class Queueinfo extends Base $redis->select($config['select']); // 获取不同队列类型的任务数 - $aQueue = empty($aParam['queue_name']) ? ['ArticleAiCreateContent','RecommendReviewer','RelatedArticle','ReviewerScore','RevisionReviewer','SendRelatedArticleEmail','SendReviewEmail','WechatDraft','WechatDraftPublish','WechatMaterial','WechatQueryStatus'] : [$aParam['queue_name']]; + $aQueue = empty($aParam['queue_name']) ? ['ArticleAiCreateContent','RecommendReviewer','RelatedArticle','ReviewerScore','RevisionReviewer','SendRelatedArticleEmail','SendReviewEmail','WechatDraft','WechatDraftPublish','WechatMaterial','WechatQueryStatus','createFieldForQueue','RecommendArticleField'] : [$aParam['queue_name']]; foreach ($aQueue as $key => $value) { $types[$value] = [ 'pending' => 'queues:'.$value, @@ -53,7 +53,8 @@ class Queueinfo extends Base } } - + // 关闭 Redis 连接(关键步骤) + $redis->close(); echo '
';var_dump($counts);
     }
     public function removeKey() {
@@ -99,7 +100,8 @@ echo '
';var_dump($counts);
             } 
             
         }
-
+    // 关闭 Redis 连接(关键步骤)
+        $redis->close();
 echo '
';var_dump($counts);
     }
     // Redis值删除
@@ -142,7 +144,8 @@ echo '
';var_dump($counts);
         }
 
 echo '
';var_dump($sQueueInfo,$sKey,$iNum);exit;
-
+        // 关闭 Redis 连接(关键步骤)
+        $redis->close();
     }
 
 
@@ -171,36 +174,56 @@ echo '
';var_dump($sQueueInfo,$sKey,$iNum);exit;
 
         $sKey = $redis->get($aParam['key_name']);
         var_dump($sKey);
+        // 关闭 Redis 连接(关键步骤)
+        $redis->close();
     }
 
     public function removeKey1(){
 
+        // 获取请求参数
         $aParam = $this->request->post();
-        if(empty($aParam['key_name'])){
+        if (empty($aParam['key_name'])) {
             exit('非法操作');
         }
 
+        // 获取队列配置(Redis 配置)
         $config = \think\Config::get('queue');
 
+        // 队列名称默认值处理
         $sQueueName = empty($aParam['queue_name']) ? 'ArticleAiCreateContent' : $aParam['queue_name'];
+
+        // 初始化 Redis 连接
         $redis = new \Redis();
+        // 连接 Redis 服务器
         $redis->connect($config['host'], $config['port']);
-        
+
+        // 若有密码则认证
         if (!empty($config['password'])) {
             $redis->auth($config['password']);
         }
-        
+
+        // 选择数据库(默认 0)
         $redis->select($config['select']);
 
-        $sKey = $redis->hGetAll($aParam['key_name']);
+        // 操作示例:获取哈希表数据
         echo '我是HgetAll';
+        $sKey = $redis->hGetAll($aParam['key_name']);
         var_dump($sKey);
+
+        // 删除哈希表
         $result = $redis->del($aParam['key_name']);
         var_dump($result);
+
+        // 尝试获取字符串类型的键(验证是否删除)
         echo '我是get';
         $sKey = $redis->get($aParam['key_name']);
         var_dump($sKey);
+
+        // 再次删除(确保删除)
         $result = $redis->del($aParam['key_name']);
         var_dump($result);
+
+        // 关闭 Redis 连接(关键步骤)
+        $redis->close();
     }
 }

From d27c173ec7595a29fd9518103dbb1b0b78b449ab Mon Sep 17 00:00:00 2001
From: chengxl 
Date: Mon, 28 Jul 2025 17:53:13 +0800
Subject: [PATCH 20/21] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E8=BF=9E?=
 =?UTF-8?q?=E6=8E=A5?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 application/common/traits/QueueDbHATrait.php | 266 +++++++++++++++++++
 1 file changed, 266 insertions(+)
 create mode 100644 application/common/traits/QueueDbHATrait.php

diff --git a/application/common/traits/QueueDbHATrait.php b/application/common/traits/QueueDbHATrait.php
new file mode 100644
index 0000000..c7aff68
--- /dev/null
+++ b/application/common/traits/QueueDbHATrait.php
@@ -0,0 +1,266 @@
+ 20,        // 缩短检查间隔,提升连接有效性
+        'max_attempts' => 4,           // 增加重试次数,应对连接波动
+        'base_wait' => 1,              // 基础等待时间(秒)
+        'reconnect_threshold' => 3,    // 连续失败告警阈值
+        'fatal_error_codes' => [2006, 2013, 1053], // 致命错误码(含服务中断)
+    ];
+
+    // 进程内状态记录(避免跨进程干扰)
+    private $consecutiveFailures = [];
+    private $lastCheckTime = [];
+
+    /**
+     * 队列任务执行前的连接检查(核心入口)
+     * @param bool $force 是否强制检查
+     * @return bool 连接是否可用
+     */
+    public function checkDbConnectionTrait($force = false)
+    {
+        $pid = getmypid();
+        $this->initConsecutiveFailures($pid);
+
+        // 非强制检查且未到间隔,直接返回
+        if (!$force && isset($this->lastCheckTime[$pid]) 
+            && (time() - $this->lastCheckTime[$pid] < $this->dbConfig['check_interval'])) {
+            return true;
+        }
+
+        $attempt = 0;
+        $maxAttempts = $this->dbConfig['max_attempts'];
+        $baseWait = $this->dbConfig['base_wait'];
+
+        while ($attempt < $maxAttempts) {
+            try {
+                // 执行轻量查询验证(使用框架Db方法,确保与业务代码一致)
+                $result = $this->safeQuery('SELECT 1 FROM DUAL', 2);
+                if ($this->isValidResult($result)) {
+                    $this->resetConsecutiveFailures($pid);
+                    $this->lastCheckTime[$pid] = time();
+                    $this->log("进程[{$pid}]数据库连接有效", 'info');
+                    return true;
+                }
+                throw new Exception("查询结果无效");
+            } catch (PDOException $e) {
+                // 致命错误加速重试
+                $sCode = empty($e->getCode()) ? 0 : $e->getCode();
+                $sMsg = empty($e->getMessage()) ? '' : $e->getMessage();
+                if (in_array($sCode, $this->dbConfig['fatal_error_codes'])) {
+                    $this->log("进程[{$pid}]致命错误({$sCode}):{$sMsg}", 'error');
+                    $attempt = $maxAttempts - 1;
+                }
+                $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait);
+            } catch (Exception $e) {
+                $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait);
+            } finally {
+                $attempt++;
+            }
+        }
+
+        // 达到最大重试次数
+        $this->incrementConsecutiveFailures($pid);
+        $this->log("进程[{$pid}]连接异常,已达最大重试次数({$maxAttempts})", 'error');
+        return false;
+    }
+
+    /**
+     * 处理连接错误(确保后续能正常使用Db::insert())
+     */
+    private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait)
+    {
+        $errorMsg = empty($e->getMessage()) ? '未知错误' : $e->getMessage();
+        $errorCode = empty($e->getCode()) ? 0 : $e->getCode();
+        $this->log("进程[{$pid}]连接失败(尝试{$attempt}/{$maxAttempts}):{$errorMsg}(码:{$errorCode})", 'warning');
+
+        // 强制清理当前进程的连接缓存(关键:确保重建的连接能被Db::insert()使用)
+        $this->cleanupConnections();
+
+        // 最后一次尝试无需等待
+        if ($attempt + 1 >= $maxAttempts) {
+            return;
+        }
+
+        // 差异化等待策略
+        $waitTime = $this->calculateWaitTime($errorMsg, $attempt, $baseWait);
+        $this->log("进程[{$pid}]将在{$waitTime}秒后重试", 'info');
+        $this->safeSleep($waitTime);
+
+        // 重建连接并验证(使用框架方法,确保与业务代码兼容)
+        try {
+            // 强制重建框架连接,确保Db::insert()能使用新连接
+            Db::connect(config('database'), true);
+            $result = $this->safeQuery('SELECT 1 FROM DUAL', 2);
+            if ($this->isValidResult($result)) {
+                $this->resetConsecutiveFailures($pid);
+                $this->lastCheckTime[$pid] = time();
+                $this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})", 'info');
+                $attempt = $maxAttempts; // 退出循环
+            } else {
+                throw new Exception("重建连接后查询无效");
+            }
+        } catch (Exception $e2) {
+            $sMsg2 = empty($e2->getMessage()) ? '' : $e2->getMessage();
+            $this->log("进程[{$pid}]重连失败:{$sMsg2}", 'error');
+        }
+    }
+
+    /**
+     * 安全执行查询(兼容框架Db方法,带超时控制)
+     */
+    private function safeQuery($sql, $timeout)
+    {
+        $start = microtime(true);
+        // 使用框架Db::query(),确保与业务中Db::insert()使用相同的连接机制
+        $result = Db::query($sql);
+
+        // 代码层面控制超时,不依赖database.php配置
+        if (microtime(true) - $start > $timeout) {
+            throw new Exception("查询超时({$timeout}秒)");
+        }
+        return $result;
+    }
+
+    /**
+     * 清理连接资源(仅影响当前进程,不干扰现有系统)
+     */
+    private function cleanupConnections()
+    {
+        // 关闭当前进程的框架连接(不影响其他进程)
+        Db::close();
+        // 清除Db类静态缓存(仅当前进程)
+        $this->clearDbInstanceCache();
+        // 保留系统缓存,避免影响现有业务
+    }
+
+    /**
+     * 清除Db类实例缓存(确保新连接能被正确创建)
+     */
+    private function clearDbInstanceCache()
+    {
+        static $reflection = null;
+        static $instanceProp = null;
+
+        if (!$reflection) {
+            try {
+                $reflection = new \ReflectionClass('\think\Db');
+                $instanceProp = $reflection->getProperty('instance');
+                $instanceProp->setAccessible(true);
+            } catch (\ReflectionException $e) {
+                $this->log("反射初始化失败:{$e->getMessage()}", 'error');
+                return;
+            }
+        }
+
+        try {
+            // 仅清空当前进程的Db实例缓存,不影响其他进程(如Web请求)
+            $instanceProp->setValue(null, []);
+        } catch (\ReflectionException $e) {
+            $this->log("清除Db缓存失败:{$e->getMessage()}", 'error');
+        }
+    }
+
+    /**
+     * 计算等待时间(针对队列优化)
+     */
+    private function calculateWaitTime($errorMsg, $attempt, $baseWait)
+    {
+        $isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false;
+        $isTimeout = stripos($errorMsg, 'timeout') !== false;
+
+        if ($isGoneAway) {
+            return $baseWait * pow(2, $attempt); // 致命错误:1→2→4秒
+        } elseif ($isTimeout) {
+            return $baseWait * pow(1.5, $attempt); // 超时错误:1→1.5→2.25秒
+        } else {
+            return $baseWait * pow(1.2, $attempt); // 普通错误:1→1.2→1.44秒
+        }
+    }
+
+    /**
+     * 验证查询结果有效性
+     */
+    private function isValidResult($result)
+    {
+        return is_array($result) && !empty($result) 
+            && isset(current($result)['1']) && current($result)['1'] == 1;
+    }
+
+    /**
+     * 连续失败计数管理
+     */
+    private function initConsecutiveFailures($pid)
+    {
+        if (!isset($this->consecutiveFailures[$pid])) {
+            $this->consecutiveFailures[$pid] = 0;
+        }
+    }
+
+    private function incrementConsecutiveFailures($pid)
+    {
+        $this->consecutiveFailures[$pid]++;
+        if ($this->consecutiveFailures[$pid] >= $this->dbConfig['reconnect_threshold']) {
+            $this->alert("进程[{$pid}]连续连接失败{$this->consecutiveFailures[$pid]}次,可能存在数据库隐患");
+        }
+    }
+
+    private function resetConsecutiveFailures($pid)
+    {
+        $this->consecutiveFailures[$pid] = 0;
+    }
+
+    /**
+     * 日志记录(仅输出到队列控制台,不干扰系统日志)
+     */
+    private function log($message, $level = 'info')
+    {
+        $logTime = date('Y-m-d H:i:s');
+        $content = "[{$logTime}] [{$level}] {$message}";
+        // 仅在队列Worker控制台输出,不写入系统日志文件
+        echo $content . "\n";
+    }
+
+    /**
+     * 告警通知(独立日志文件,不干扰现有系统)
+     */
+    private function alert($message)
+    {
+        $alertFile = RUNTIME_PATH . "log/queue_db_alert_" . date('Ymd') . ".log";
+        file_put_contents($alertFile, "[".date('Y-m-d H:i:s')."] ALERT: {$message}\n", FILE_APPEND | LOCK_EX);
+    }
+
+    /**
+     * 安全睡眠(支持Worker正常终止)
+     */
+    private function safeSleep($seconds)
+    {
+        $interval = 1;
+        while ($seconds > 0) {
+            if (connection_aborted() || $this->isWorkerStopped()) {
+                throw new Exception("队列Worker已终止,中断睡眠");
+            }
+            $sleep = min($interval, $seconds);
+            sleep($sleep);
+            $seconds -= $sleep;
+        }
+    }
+
+    /**
+     * 检测Worker是否已停止(兼容TP5.0机制)
+     */
+    private function isWorkerStopped()
+    {
+        $stopFile = RUNTIME_PATH . 'queue/stop_worker';
+        return file_exists($stopFile);
+    }
+}
\ No newline at end of file

From 17cf1c7ddd4d8612b8d9ff9db071580ae09dfe7c Mon Sep 17 00:00:00 2001
From: chengxl 
Date: Mon, 28 Jul 2025 17:55:09 +0800
Subject: [PATCH 21/21] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E4=BF=AE=E6=94=B9?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 application/common/QueueJob.php | 167 +-------------------------------
 1 file changed, 4 insertions(+), 163 deletions(-)

diff --git a/application/common/QueueJob.php b/application/common/QueueJob.php
index 756bc7e..c83e558 100644
--- a/application/common/QueueJob.php
+++ b/application/common/QueueJob.php
@@ -4,7 +4,7 @@ namespace app\common;
 use think\Db;
 use think\Cache;
 use app\common\QueueRedis;
-
+use app\common\traits\QueueDbHATrait;
 class QueueJob
 {
     // 必填参数
@@ -13,7 +13,8 @@ class QueueJob
     private $QueueRedis;
     private $maxRetries = 2;
     const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
-
+    // 引入高可用数据库管理 trait
+    use QueueDbHATrait;
     public function __construct()
     {
         $this->QueueRedis = QueueRedis::getInstance();
@@ -95,50 +96,6 @@ class QueueJob
         $job->delete();
     }
 
-    /**
-     * 检查并重建数据库连接
-     */
-    public function checkDbConnectionBak()
-    {
-        return true;
-        $maxAttempts = 2;  // 最大重试次数
-        $attempt = 0;
-        while ($attempt < $maxAttempts) {
-            try {
-                // 尝试查询以验证连接
-                Db::query('SELECT 1');
-                return true;  // 连接有效,直接返回
-            } catch (\Exception $e) {
-                $attempt++;
-                $waitTime = pow(2, $attempt);  // 指数退避:2s, 4s, 8s...
-                // 记录连接失败日志
-                $sMsg = empty($e->getMessage()) ? '检查失败' : $e->getMessage();
-                $this->log("数据库连接检查失败(尝试{$attempt}/{$maxAttempts}): {$sMsg}");
-                // 关闭所有连接
-                Db::close();
-                
-                // 最后一次尝试不需要等待
-                if ($attempt < $maxAttempts) {
-                    $this->log("{$waitTime}秒后尝试重新连接...");
-                    sleep($waitTime);  // 等待一段时间再重试
-                }
-                // 尝试重新连接
-                try {
-                    Db::connect();
-                    Db::query('SELECT 1');  // 验证重连成功
-                    $this->log("数据库连接已重建(尝试{$attempt}/{$maxAttempts})");
-                    return true;
-                } catch (\Exception $e2) {
-                    $this->log("数据库重连尝试{$attempt}/{$maxAttempts}失败: {$e2->getMessage()}");
-                }
-            }
-        }
-        
-        // 所有重试都失败
-        $this->log("数据库连接异常,已达到最大重试次数({$maxAttempts})");
-        return false;
-    }
-
 
     /**
      * 数据库连接检查与重建(高可用版)
@@ -148,122 +105,6 @@ class QueueJob
      */
     public function checkDbConnection($force = false)
     {
-        return true;
-        // 1. 用进程ID隔离静态变量,避免多Worker进程互相干扰
-        // 每个队列Worker是独立进程,静态变量需进程隔离
-        static $lastCheckTime = [];
-        $pid = getmypid(); // 获取当前进程ID
-        $checkInterval = 60; // 自动检查间隔(秒)
-
-        // 非强制检查且未到间隔时间,直接返回有效(减少性能消耗)
-        if (!$force && isset($lastCheckTime[$pid]) && (time() - $lastCheckTime[$pid] < $checkInterval)) {
-            return true;
-        }
-
-        // 2. 配置重试参数
-        $maxAttempts = 3;    // 最大重试次数
-        $attempt = 0;        // 当前尝试次数
-        $baseWait = 2;       // 基础等待时间(秒)
-
-        // 3. 循环重试连接
-        while ($attempt < $maxAttempts) {
-            try {
-                // 执行轻量查询验证连接(DUAL是MySQL伪表,效率极高)
-                $result = Db::query('SELECT 1 FROM DUAL');
-                // 验证查询结果是否有效
-                if (is_array($result) && !empty($result)) {
-                    $lastCheckTime[$pid] = time();
-                    $this->log("进程[{$pid}]数据库连接有效");
-                    return true;
-                } else {
-                    throw new Exception("连接验证失败:查询结果异常");
-                }
-            } 
-            // 优先捕获PDO底层异常(数据库连接错误多为此类)
-            catch (PDOException $e) {
-                $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime);
-            } 
-            // 捕获框架层异常
-            catch (Exception $e) {
-                $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait, $lastCheckTime);
-            } finally {
-                $attempt++; // 无论成功失败,计数+1
-            }
-        }
-
-        // 4. 达到最大重试次数,返回失败
-        $this->log("进程[{$pid}]数据库连接异常,已达最大重试次数({$maxAttempts})");
-        return false;
-    }
-
-    /**
-     * 处理连接错误的统一逻辑
-     * @param Exception $e 异常对象
-     * @param int $pid 进程ID
-     * @param int $attempt 当前尝试次数
-     * @param int $maxAttempts 最大尝试次数
-     * @param int $baseWait 基础等待时间
-     * @param array $lastCheckTime 检查时间记录
-     */
-    private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait, &$lastCheckTime)
-    {
-        $errorMsg = empty($e->getMessage()) ? '未知数据库错误' : $e->getMessage();
-        $errorCode = empty($e->getCode()) ? 0 : $e->getCode();
-        
-        // 记录错误详情(含进程ID便于排查多进程问题)
-        $this->log("进程[{$pid}]连接检查失败(尝试{$attempt}/{$maxAttempts}):{$errorMsg}(错误码:{$errorCode})");
-
-        // 5. 强制清理当前进程的无效连接
-        Db::close(); // 关闭框架层面的连接
-        $this->clearDbInstanceCache(); // 清除框架连接缓存(关键步骤)
-        cache('db_connection_status', null);
-
-        // 最后一次尝试无需等待,直接重试
-        if ($attempt + 1 >= $maxAttempts) {
-            return;
-        }
-
-        // 6. 差异化等待策略(针对特定错误延长等待)
-        $isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false;
-        // 普通错误:2^1=2s → 2^2=4s;致命错误:3^1=3s → 3^2=9s
-        $waitTime = $isGoneAway ? $baseWait * pow(3, $attempt) : $baseWait * pow(2, $attempt);
-
-        $this->log("进程[{$pid}]将在{$waitTime}秒后重试...");
-        sleep($waitTime); // 等待指定时间
-
-        // 7. 尝试重建连接并二次验证
-        try {
-            // 强制重建连接(第二个参数true表示忽略缓存)
-            Db::connect(config('database'), true);
-            
-            // 执行验证查询
-            $result = Db::query('SELECT 1 FROM DUAL');
-            // 检查结果是否有效
-            if (is_array($result) && !empty($result)) {
-                $lastCheckTime[$pid] = time();
-                $this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})");
-                $attempt = $maxAttempts; // 标记成功并退出循环
-            } else {
-                throw new Exception("重建连接后查询结果异常");
-            }
-        } catch (Exception $e2) {
-            $this->log("进程[{$pid}]重连失败(尝试{$attempt}/{$maxAttempts}):{$e2->getMessage()}");
-        }
-    }
-
-    /**
-     * 清除ThinkPHP5的Db类实例缓存(解决框架连接缓存问题)
-     * 核心原理:通过反射突破私有属性访问限制
-     */
-    private function clearDbInstanceCache()
-    {
-        try {
-            $reflection = new \ReflectionClass('\think\Db');
-            $instanceProp = $reflection->getProperty('instance'); // 获取Db类的instance属性
-            $instanceProp->setAccessible(true); // 设为可访问
-            $instanceProp->setValue(null, []); // 清空静态缓存的连接实例
-        } catch (\ReflectionException $e) {
-            $this->log("清除数据库实例缓存失败:{$e->getMessage()}");
-        }
+        return $this->checkDbConnectionTrait();
     }
 }
\ No newline at end of file