diff --git a/application/common/Aireview.php b/application/common/Aireview.php index b4dc0c2..2aedceb 100644 --- a/application/common/Aireview.php +++ b/application/common/Aireview.php @@ -52,7 +52,7 @@ class Aireview //查询文章审核内容-判断新增或修改 $aWhere = ['article_id' => $iArticleId,'journal_id' => $iJournalId]; - $aAiReview = Db::table('t_article_ai_review')->field('id')->where($aWhere)->find(); + $aAiReview = Db::name('article_ai_review')->field('id')->where($aWhere)->find(); $iLogId = empty($aAiReview['id']) ? 0 : $aAiReview['id']; //新增 if(empty($iLogId)){ @@ -91,7 +91,8 @@ class Aireview } //查询文章审核内容 $aWhere = ['article_id' => $aParam['article_id']]; - $aAiReview = Db::table('t_article_ai_review')->where($aWhere)->find(); + $aAiReview = Db::name('article_ai_review')->where($aWhere)->find(); + return ['status' => 1,'msg' => 'Successfully obtained article review content','data' => $aAiReview]; } } diff --git a/application/common/Article.php b/application/common/Article.php index 9067ce0..75f29db 100644 --- a/application/common/Article.php +++ b/application/common/Article.php @@ -9,7 +9,8 @@ class Article protected $sJavaUrl = "http://ts.tmrjournals.com/"; //官网文件地址 protected $sFileUrl = "https://submission.tmrjournals.com/public/"; - + //Ai地址 + protected $sAiUrl = "http://125.39.141.154:10002"; /** * 获取文章文件内容 */ @@ -58,21 +59,28 @@ class Article public function updateAiArticle($aParam = []){ //文章ID $iArticleId = empty($aParam['article_id']) ? 0 : $aParam['article_id']; + //主键ID + $iAiArticleId = empty($aParam['ai_article_id']) ? 0 : $aParam['ai_article_id']; + //查询内容是否存在 $aWhere = ['is_delete' => 2]; - if(empty($iArticleId)){ + if(empty($iArticleId) && empty($iArticleId)){ return json_encode(['status' => 2,'msg' => 'Please select the article to be modified']); } - + if(!empty($iArticleId)){ + $aWhere['article_id'] = $iArticleId; + } + if(!empty($iAiArticleId)){ + $aWhere['ai_article_id'] = $iAiArticleId; + } //查询文章是否生成AI内容 - $aWhere= ['is_delete' => 2,'article_id' => $iArticleId]; $aAiArticle = Db::name('ai_article')->field('ai_article_id')->where($aWhere)->find(); if(empty($aAiArticle)){ return json_encode(['status' => 3,'msg' => 'The article content of WeChat official account has not been generated']); } $iAiArticleId = $aAiArticle['ai_article_id']; //必填参数验证 - $aFields = ['article_id','title_english','title_chinese','journal_issn','covered','digest','research_result','content','highlights','discussion','prospect','research_background','discussion_results','research_method','overview','summary','is_generate']; + $aFields = ['article_id','article_type','media_type','journal_id','journal_issn','title_english','title_chinese','content','covered','discussion_results','research_method','digest','research_background','overview','summary','highlights','discussion','prospect','is_generate']; $sFiled = ''; $aUpdateParam = []; foreach($aFields as $val){ @@ -147,9 +155,9 @@ class Article //获取文章领域 $aArticleField = $this->getArticleField($aWhere); - if(!empty($aArticleField['data'])){ - return json_encode(array('status' => 4,'msg' =>'The article has been added to the field' )); - } + // if(!empty($aArticleField['data'])){ + // return json_encode(array('status' => 4,'msg' =>'The article has been added to the field' )); + // } //文章标题 $title = empty($aArticle['title']) ? '' : $aArticle['title']; if(empty($title)){ @@ -187,7 +195,7 @@ class Article //请求OPENAI $oOpenAi = new OpenAi; - $aParam = ['messages' => $aMessage,'model' => empty($aParam['api_model']) ? 'gpt-4.1' : $aParam['api_model']]; + $aParam = ['messages' => $aMessage,'model' => empty($aParam['api_model']) ? 'gpt-4.1' : $aParam['api_model'],'url' => $this->sAiUrl]; $aResult = json_decode($oOpenAi->curlOpenAIStream($aParam),true); //处理返回信息 $aData = empty($aResult['data']) ? [] : $aResult['data']; diff --git a/application/common/OpenAi.php b/application/common/OpenAi.php index 85c27a4..3f93982 100644 --- a/application/common/OpenAi.php +++ b/application/common/OpenAi.php @@ -3,6 +3,8 @@ namespace app\common; use think\Cache; use think\Db; use think\Queue; +use app\common\Article; +use app\common\QueueRedis; class OpenAi { protected $sApiKey = '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'; @@ -65,20 +67,21 @@ class OpenAi ]; //定义redis连接 private $redis; + private $oQueueRedis; public function __construct() { + // 初始化 Redis 连接 $config = \think\Config::get('queue'); $this->redis = new \Redis(); $this->redis->connect($config['host'], $config['port']); - if (!empty($config['password'])) { $this->redis->auth($config['password']); } - $this->redis->select($config['select']); - // 初始化 Redis 连接 - // $this->redis = Cache::store('redis')->handler(); + $this->redis->select($config['select']); + + $this->oQueueRedis = QueueRedis::getInstance(); } /** @@ -341,205 +344,6 @@ class OpenAi curl_close($this->curl); return json_encode(['status' => 1,'msg' => 'success','data' => $aContent]); } - /** - * 对接OPENAI接口-并行CURL请求【重要维度单独询问】 - */ - public function curlMultiOpenAIImportant($aSearch = [],$timeout = 120, $iChunkSize = 2) { - // 入参校验 - if (empty($aSearch)) { - return json_encode(['status' => 2, 'msg' => 'Parameter is empty']); - } - //提问问题类型 - $sKey = empty($aSearch['question']) ? '' : $aSearch['question']; - if (empty($sKey)) { - return json_encode(['status' => 2, 'msg' => 'Please select the type of question']); - } - //获取问题 - $aQuestion = $this->$sKey; - if (empty($aQuestion)) { - return json_encode(['status' => 2, 'msg' => 'question is empty']); - } - - //分批处理(核心优化:控制并发量) - $aChunk = array_chunk($aQuestion, $iChunkSize); // 按批次拆分,每批最多5个请求 - //定义空数组用于接收数据 - $aEmptyData = $aLog = $aReturnData = []; - //分批次处理开始 - foreach ($aChunk as $iChunkKey => $item) { - // 初始化多curl句柄 - $oCurlMulti = curl_multi_init(); - $aCurl = []; - // 批量初始化请求 - foreach ($item as $key => $value) { - // 跳过无效参数 - if (empty($value)) { - $aLog[] = [ - 'content' => $iChunkKey.'-'.$key.':Invalid parameter' - ]; - continue; - } - //问题处理-变量替换 - $aQuestionInfo = $this->buildReviewPromptImportant($aSearch,$value); - if(empty($aQuestionInfo)){ - $aLog[] = [ - 'content' => $iChunkKey.'-'.$key.':The problem is empty:'.json_encode($value) - ]; - continue; - } - // 核心配置优化 - $oCurl = curl_init(); - curl_setopt_array($oCurl, [ - CURLOPT_URL => $this->sUrl, - CURLOPT_HTTPHEADER => [ - 'Content-Type: application/json', - 'Authorization: Bearer ' . $this->sApiKey, - 'Expect:', - ], - CURLOPT_PROXY => $this->proxy, - // SSL验证优化:若代理证书不可信,临时关闭(生产环境需配置信任证书) - CURLOPT_SSL_VERIFYPEER => true, // 调试时设为false,生产环境设为true - CURLOPT_SSL_VERIFYHOST => 2, // 调试时设为0,生产环境设为2 - CURLOPT_POST => true, - CURLOPT_POSTFIELDS => json_encode($aQuestionInfo), - CURLOPT_RETURNTRANSFER => true, - // 超时优化:延长响应超时,新增连接超时 - CURLOPT_TIMEOUT => $timeout, // 总超时(秒),建议60-120 - CURLOPT_CONNECTTIMEOUT => 20, // 连接超时(秒),避免无限等待 - CURLOPT_LOW_SPEED_LIMIT => 1024, // 最低速度(字节/秒),低于此值触发超时 - CURLOPT_LOW_SPEED_TIME => 30, // 持续低速时间(秒),超过则终止 - ]); - curl_multi_add_handle($oCurlMulti, $oCurl); - $aCurl[$key] = $oCurl; - } - // 空请求处理 - if (empty($aCurl)) { - curl_multi_close($oCurlMulti); - continue; - } - - // 核心优化:修复curl_multi循环逻辑,确保所有请求完成 - $active = null; - $mrc = CURLM_OK; - // 第一阶段:处理瞬时可完成的请求 - do { - $mrc = curl_multi_exec($oCurlMulti, $active); - } while ($mrc == CURLM_CALL_MULTI_PERFORM); - - // 第二阶段:等待所有活跃请求完成(关键优化) - while ($active > 0 && $mrc == CURLM_OK) { - // 等待事件(超时1秒,避免CPU空转) - if (curl_multi_select($oCurlMulti, 1.0) != -1) { - // 处理就绪的请求 - do { - $mrc = curl_multi_exec($oCurlMulti, $active); - } while ($mrc == CURLM_CALL_MULTI_PERFORM); - } else { - // 无事件时,检查是否超时(防止无限阻塞) - $timedOut = false; - foreach ($aCurl as $oCurl) { - $startTime = curl_getinfo($oCurl, CURLINFO_STARTTRANSFER_TIME); - if ($startTime > 0 && (microtime(true) - $startTime) > $timeout) { - $timedOut = true; - break; - } - } - if ($timedOut) break; // 超时则强制退出 - } - } - - // 处理当前批次结果 - foreach ($aCurl as $key => $oCurl) { - // 1. 捕获curl错误(连接失败、超时等) - $sError = curl_error($oCurl); - if (!empty($sError)) { - $aLog[] = [ - 'content' => "Curl error: {$sError}" - ]; - $aEmptyData[] = $key; - curl_multi_remove_handle($oCurlMulti, $oCurl); - curl_close($oCurl); - continue; - } - - // 2. 获取HTTP状态码(关键优化:处理OpenAI的API错误) - $httpCode = curl_getinfo($oCurl, CURLINFO_HTTP_CODE); - $sContent = curl_multi_getcontent($oCurl); - // 3. 处理非200状态码(如限流、服务不可用) - if ($httpCode != 200) { - $errorMsg = "HTTP {$httpCode}: " . (empty($sContent) ? 'No response' : $sContent); - // 记录关键错误日志(便于调试) - $aLog[] = [ - 'http_code' => $httpCode, - 'content' => $errorMsg, - ]; - $aEmptyData[] = $key; - curl_multi_remove_handle($oCurlMulti, $oCurl); - curl_close($oCurl); - continue; - } - - // 4. 解析响应内容(原逻辑优化) - $aResult = json_decode($sContent, true); - if (json_last_error() != JSON_ERROR_NONE) { - $aLog[] = [ - 'content' => "Invalid JSON: {$sContent}", - ]; - $aEmptyData[] = $key; - curl_multi_remove_handle($oCurlMulti, $oCurl); - curl_close($oCurl); - continue; - } - - // 5. 提取OpenAI的content(简化判断逻辑) - $aOpenAiContent = empty($aResult['choices'][0]['message']['content']) ? '' : $aResult['choices'][0]['message']['content']; - if (empty($aOpenAiContent)) { - $aLog[] = [ - 'content' => "OPENAI returns empty content", - ]; - $aEmptyData[] = $key; - curl_multi_remove_handle($oCurlMulti, $oCurl); - curl_close($oCurl); - continue; - } - - // 6. 处理业务解析(原extractAndParse逻辑) - $aData = $this->extractAndParse($aOpenAiContent); - $aContent = empty($aData['data']) ? [] : $aData['data']; - $sMsg = empty($aData['msg']) ? 'Success' : $aData['msg']; - if (empty($aContent)) { - $aEmptyData[] = $key; - } - $aLog[] = [ - 'content' => $sMsg, - ]; - $aReturnData += $aContent; - // 释放资源 - curl_multi_remove_handle($oCurlMulti, $oCurl); - curl_close($oCurl); - } - - // 关闭当前批次的multi句柄 - curl_multi_close($oCurlMulti); - // 批次间隔(核心优化:避免触发OpenAI限流) - if ($iChunkKey < count($aChunk) - 1) { - usleep(1000000); // 批次间间隔1秒(根据OpenAI配额调整) - } - } - $aParam = [ - 'status' => 1, - 'msg' => 'success', - 'data' => empty($aReturnData) ? [] : $aReturnData, - 'empty_data' => empty($aEmptyData) ? [] : $aEmptyData, - 'log_data' => empty($aLog) ? [] : $aLog, - 'open_ai_id' => empty($aSearch['open_ai_id']) ? 0 : $aSearch['open_ai_id'] - ]; - - //日志记录 - $this->addLog($aParam); - - return json_encode($aParam); - } - /** * CURL 发送请求到 OpenAI【流式】 * @param $messages 内容 @@ -557,7 +361,7 @@ class OpenAi //超时设置 $timeout = empty($aParam['timeout']) ? 300 : $aParam['timeout']; //接口地址 - $sUrl = $this->sUrl; + $sUrl = empty($aParam['url']) ? $this->sUrl : $aParam['url']; //组装数据 $data = [ @@ -624,8 +428,7 @@ class OpenAi /** * 解析流式响应 */ - private function parseMedicalStreamResponse($streamContent) - { + private function parseMedicalStreamResponse($streamContent){ $fullContent = ''; $lines = explode("\n", $streamContent); foreach ($lines as $line) { @@ -639,57 +442,10 @@ class OpenAi return $fullContent; } - /** - * 记录处理进度【Redis】 - */ - private function recordProcessingStart($key,$totalQuestions) - { - $this->redis->hMSet($key, [ - 'status' => 'processing', - 'total' => $totalQuestions, - 'completed' => 0, - 'start_time' => time() - ]); - $this->redis->expire($key, 86400); // 24小时过期 - } - /** - * 更新处理进度【Redis】 - */ - private function updateProcessingProgress($key,$iId,$completed) - { - - $this->redis->hSet($key, 'completed', $completed); - //完成进度 - $iProgress = round(($completed / $this->redis->hGet($key, 'total')) * 100, 2); - if($iProgress == 100){ - $this->recordProcessingComplete($key,$iId); - } - $this->redis->hSet($key, 'progress', $iProgress); - } - /** - * 记录处理完成【Redis】 - */ - private function recordProcessingComplete($key,$iId) - { - $this->redis->hSet($key, 'status', 'completed'); - $this->redis->hSet($key, 'end_time', time()); - $this->wechatGegnerate(['article_id' => $iId]); - } - - /** - * 保存分块进度【Redis】 - */ - private function saveChunkProgress($key, $chunkIndex, $content) - { - $this->redis->hset($key, "chunk_{$chunkIndex}", $content); - $this->redis->expire($key, 86400); // 进度保存24小时 - } - /** * 微信公众号-生成公微内容(CURL) */ - public function createWechatContent($aParam = []) - { + public function createWechatContent($aParam = []){ //主键ID $iId = empty($aParam['redis_id']) ? 0 : $aParam['redis_id']; if(empty($iId)){ @@ -703,7 +459,7 @@ class OpenAi //记录处理开始 $iNum = count($aMessage); $sRedisKey = 'ai_create_article_'.$iId; - $this->recordProcessingStart($sRedisKey,$iNum); + $this->oQueueRedis->recordProcessingStart($sRedisKey,$iNum); //定义空数组 $aChunkResult = $aFail = []; foreach ($aMessage as $key => $value) { @@ -733,72 +489,64 @@ class OpenAi $iMaxNum = empty($aParam['count_num']) ? 0 : $aParam['count_num']; //请求OPENAI $aResult = $this->curlOpenAIStream($aParam); - //更新处理进度 $iIndex = empty($aParam['chunkIndex']) ? 0 : $aParam['chunkIndex']; $sRedisKey = 'ai_create_article_'.$iId; - $this->updateProcessingProgress($sRedisKey,$iId,$iIndex + 1); - + $iProgress = $this->oQueueRedis->updateProcessingProgress($sRedisKey,$iIndex + 1); //保存内容 $sRedisKey = 'ai_create_article_progress_'.$iId; - $this->saveChunkProgress($sRedisKey, $iIndex,$aResult); + $this->oQueueRedis->saveChunkProgress($sRedisKey, $iIndex,$aResult); //更新入库 $aReturnData = json_decode($aResult,true); $aDataInfo =empty($aReturnData['data']) ? [] : $aReturnData['data']; $aData = empty($aDataInfo) ? [] : $this->extractAndParse($aDataInfo); $aData = empty($aData['data']) ? [] : $aData['data']; - if(!empty($aData)){ + if(!empty($aData)){//更新AI审稿记录表 + if($iProgress >= 100){ + $aData['is_generate'] = 1; + } $aData['article_id'] = $iId; - $this->updateAiArticle($aData); + $this->updateAiContent($aData); } return $aResult; } - /** - * 获取期刊内容 + * 微信公众号-更新AI生成内容 */ - public function getJournalPaperArt($aParam = []){ + private function updateAiContent($aParam = []){ - //判断文章ID - $sIssn = empty($aParam['issn']) ? [] : $aParam['issn']; - if(empty($sIssn)){ - return json_encode(['status' => 2,'msg' => 'Please select an article']); - } - //接口获取期刊内容 - $sUrl = $this->sTmrUrl."/api/Supplementary/getJournalPaperArt"; - $aParam = ['issn' => $sIssn]; - $aResult = object_to_array(json_decode(myPost($sUrl,$aParam),true)); - return json_encode($aResult); - } - - /** - * 获取文章文件内容 - */ - - public function getFileContent($aParam = []){ - - //判断文章ID - $iArticleId = empty($aParam['article_id']) ? [] : $aParam['article_id']; + //文章ID + $iArticleId = empty($aParam['article_id']) ? 0 : $aParam['article_id']; if(empty($iArticleId)){ - return json_encode(['status' => 2,'msg' => 'Please select an article']); + return json_encode(['status' => 2,'msg' => 'Please select the article to be modified']); } + //更新生成状态 + $oArticle = new Article; + $aResult = json_decode($oArticle->updateAiArticle($aParam),true); + $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; + $sMsg = empty($aResult['msg']) ? '更新状态失败' : $aResult['msg']; + //是否生成 + $is_generate = empty($aParam['is_generate']) ? 2 : $aParam['is_generate']; - //获取文件内容 - $aWhere = ['article_id' => $iArticleId,'type_name' => 'manuscirpt']; - $aFile = Db::name('article_file')->field('file_url')->where($aWhere)->order('ctime desc')->limit(1)->find(); - if(empty($aFile['file_url'])){ - return json_encode(['status' => 2,'msg' => 'No Manuscript']); + //内容生成完成推送上传素材队列 + if($is_generate == 1){ + if($iStatus == 1){ + //四小时后推送上传素材并推送草稿箱 + $iDelaySeconds = 4 * 3600; // 4小时的秒数 + Queue::later($iDelaySeconds,'app\api\job\WechatMaterial@fire', ['article_id' => $iArticleId], 'WechatMaterial'); + $sMsg = '文章AI内容生成成功'; + }else{ + $iStatus = 2; + } + //插入日志记录 + $oMaterial = new Material; + $aLogInfo = ['article_id' => $iArticleId,'type' => 5,'msg' =>$sMsg,'status' => $iStatus,'create_time' => time()]; + $result = $oMaterial->addWechatLog($aLogInfo); } - - //接口获取上传文件 - $sUrl = $this->sJavaUrl."api/typeset/readDocx"; - $aParam['fileRoute'] = $this->sFileUrl.$aFile['file_url']; - $aResult = object_to_array(json_decode(myPost($sUrl,$aParam))); return json_encode($aResult); } - /** * 添加接口访问日志 */ @@ -819,87 +567,13 @@ class OpenAi return DB::name('openapi_log')->insertGetId($aInsert); } - /** - * 更新AI生成内容入库 - * @param $messages 内容 - * @param $model 模型类型 - */ - private function updateAiArticle($aParam = []){ - //文章ID - $iArticleId = empty($aParam['article_id']) ? 0 : $aParam['article_id']; - - //查询内容是否存在 - $aWhere = ['is_delete' => 2]; - if(empty($iArticleId)){ - return json_encode(['status' => 2,'msg' => 'Please select the article to be modified']); - } - $aWhere['article_id'] = $iArticleId; - $aAiArticle = Db::name('ai_article')->field('ai_article_id')->where($aWhere)->find(); - if(empty($aAiArticle)){ - return json_encode(['status' => 3,'msg' => 'he article content of WeChat official account has not been generated']); - } - $iAiArticleId = $aAiArticle['ai_article_id']; - - //必填参数验证 - $aFields = ['article_id','title_english','title_chinese','journal_issn','covered','digest','research_result','content','highlights','discussion','prospect','research_background','discussion_results','research_method','overview','summary','is_generate']; - $sFiled = ''; - $aUpdateParam = []; - foreach($aFields as $val){ - if(!isset($aParam[$val])){ - continue; - } - if(is_array($aParam[$val])){ - $aParam[$val] = implode(";",$aParam[$val]); - } - $aUpdateParam[$val] = empty($aParam[$val]) ? '' : addslashes($aParam[$val]); - } - if(empty($aUpdateParam)){ - return json_encode(['status' => 1,'msg' => 'No data currently being processed']); - } - //执行入库 - $aUpdateParam['update_time'] = time(); - $result = Db::name('ai_article')->where('ai_article_id',$iAiArticleId)->limit(1)->update($aUpdateParam); - if($result === false){ - return json_encode(['status' => 4,'msg' => 'UPDATEING AI article failed']); - } - return json_encode(['status' => 1,'msg' => 'No data currently being processed']); - } - - private function wechatGegnerate($aParam = []){ - - //文章ID - $iArticleId = empty($aParam['article_id']) ? 0 : $aParam['article_id']; - if(empty($iArticleId)){ - return json_encode(['status' => 2,'msg' => 'Please select the article to be modified']); - } - //更新生成状态 - $aParam['is_generate'] = 1; - $aResult = json_decode($this->updateAiArticle($aParam),true); - $iStatus = empty($aResult['status']) ? 0 : $aResult['status']; - $sMsg = empty($aResult['msg']) ? '更新状态失败' : $aResult['msg']; - if($iStatus == 1){ - //四小时后推送上传素材并推送草稿箱 - $iDelaySeconds = 4 * 3600; // 4小时的秒数 - Queue::later($iDelaySeconds,'app\api\job\WechatMaterial@fire', ['article_id' => $iArticleId], 'WechatMaterial'); - $sMsg = '文章AI内容生成成功'; - }else{ - $iStatus = 2; - } - - //插入日志记录 - $oMaterial = new Material; - $aLogInfo = ['article_id' => $iArticleId,'type' => 5,'msg' =>$sMsg,'status' => $iStatus,'create_time' => time()]; - $result = json_decode($oMaterial->addWechatLog($aLogInfo),true); - return json_encode($aResult); - } - /** * 从文本中提取被```json```和```包裹的JSON内容并解析 * @param string $text 包含JSON代码块的文本 * @param bool $assoc 是否返回关联数组(默认true) * @return array|object 解析后的JSON数据,失败时返回null */ - private function extractAndParse($text, $assoc = true){ + public function extractAndParse($text, $assoc = true){ // 使用正则表达式提取JSON代码块 preg_match('/```json\s*(\{.*?\})\s*```/s', $text, $matches); diff --git a/application/common/QueueJob.php b/application/common/QueueJob.php index 895917c..93fb51e 100644 --- a/application/common/QueueJob.php +++ b/application/common/QueueJob.php @@ -1,129 +1,340 @@ redis = new \Redis(); - $this->redis->connect($config['host'], $config['port']); - - if (!empty($config['password'])) { - $this->redis->auth($config['password']); - } - - $this->redis->select($config['select']); - // 初始化 Redis 连接 - // $this->redis = Cache::store('redis')->handler(); + $this->QueueRedis = QueueRedis::getInstance(); + $this->lastLogTime = time(); } - // 记录任务开始 - public function addLog($aParam = []) { - - //数据处理 - $aField = $this->aField; + /** + * 记录任务开始 + * @param array $aParam + * @return int 日志ID,失败返回0 + */ + public function addLog($aParam = []) + { + // 数据过滤(只保留必填字段) $aInsert = []; - foreach ($aField as $key => $value) { - if(isset($aParam[$value])){ - $aInsert[$value] = $aParam[$value]; - } + foreach ($this->aField as $field) { + if (isset($aParam[$field])) { + $aInsert[$field] = $aParam[$field]; + } } - $result = 0; - if(!empty($aInsert)){ - $result = DB::name('wechat_queue_logs')->insertGetId($aParam); + + // 补充默认值 + if (!isset($aInsert['create_time'])) { + $aInsert['create_time'] = time(); + } + if (!isset($aInsert['update_time'])) { + $aInsert['update_time'] = $aInsert['create_time']; + } + + try { + return Db::name('wechat_queue_logs')->insertGetId($aInsert); + } catch (\Exception $e) { + $this->log("添加任务日志失败: " . $e->getMessage() . " | 参数: " . json_encode($aInsert, self::JSON_OPTIONS)); + return 0; } - return $result; } - // 记录任务成功 - public function updateLog($aParam = []) { - + /** + * 记录任务状态更新 + * @param array $aParam + * @return bool + */ + public function updateLog($aParam = []) + { $iLogId = empty($aParam['log_id']) ? 0 : $aParam['log_id']; - if(empty($iLogId)){ + if (empty($iLogId)) { + $this->log("更新日志失败: 缺少log_id"); return false; } - //数据处理 - $aField = $this->aField; + + // 数据过滤 $aUpdate = []; - foreach ($aField as $key => $value) { - if(isset($aParam[$value])){ - $aUpdate[$value] = $aParam[$value]; - } + foreach ($this->aField as $field) { + if (isset($aParam[$field])) { + $aUpdate[$field] = $aParam[$field]; + } + } + + // 强制更新时间 + $aUpdate['update_time'] = time(); + + try { + return Db::name('wechat_queue_logs') + ->where('log_id', $iLogId) + ->limit(1) + ->update($aUpdate) > 0; + } catch (\Exception $e) { + $this->log("更新任务日志失败 [ID:{$iLogId}]: " . $e->getMessage()); + return false; } - unset($aParam['log_id']); - $result = DB::name('wechat_queue_logs')->where('log_id',$iLogId)->limit(1)->update($aUpdate); - return $result; } /** - * 写入Reids 防止1小时内重复操作 + * 设置日志路径并确保目录存在 + * @param string $logPath + * @throws \RuntimeException */ + public function ensureLogDirExists($logPath = '') + { + if (empty($logPath)) { + $error = "日志路径不能为空"; + $this->log($error); + return $error; + } - public function setRedisLabel($aParam = []){ + $this->logPath = $logPath; + $logDir = dirname($this->logPath); - //判断数据是否为空 - if(empty($aParam['redis_key'])){ - return 3; + // 检查并创建目录(处理权限问题) + if (!is_dir($logDir)) { + $oldUmask = umask(0); + $created = mkdir($logDir, 0755, true); + umask($oldUmask); + + if (!$created || !is_dir($logDir)) { + $error = "无法创建日志目录: {$logDir} (权限不足)"; + $this->log($error); + return $error; + } } - //获取值 - $sValue = $this->getRedisLabel($aParam['redis_key']); - if($sValue == $aParam['redis_key']){ - return 4; - } - $result = Cache::set($aParam['redis_key'], $aParam['redis_key'], 3600); - if($result == true){ - return 1; - } - //写入 - return 2; } /** - * 获取Reids值 + * 写入日志到缓冲区 + * @param string $message */ - public function getRedisLabel($sRedisKey = ''){ - if(empty($sRedisKey)){ - return ''; + public function log($message) + { + // 防止缓冲区溢出 + if (count($this->logBuffer) >= 1000) { + $this->flushLog(); + } + + $time = date('H:i:s'); + $this->logBuffer[] = "[$time] $message\n"; + + // 缓冲区满或超时则刷新 + if (count($this->logBuffer) >= 50 || time() - $this->lastLogTime > 10) { + $this->flushLog(); } - return Cache::get($sRedisKey); } + /** + * 刷新日志缓冲区到文件 + */ + public function flushLog() + { + if (empty($this->logBuffer)) { + return; + } - // 使用SETNX原子操作设置锁 - public function setRedisLock($key, $value, $expire) - { - return $this->redis->set($key, $value, ['nx', 'ex' => $expire]); + // 检查日志路径是否设置 + if (empty($this->logPath)) { + $this->logBuffer = []; + return; + } + + // 检查文件大小并处理 + $this->checkAndTruncateLog(); + + $fp = fopen($this->logPath, 'a'); + if ($fp === false) { + // 紧急写入失败日志(避免递归) + $errorMsg = "[" . date('H:i:s') . "] 错误: 无法打开日志文件 {$this->logPath}\n"; + error_log($errorMsg); // 写入系统日志 + $this->logBuffer = []; + return; + } + + try { + // 尝试获取文件锁 + if (flock($fp, LOCK_EX)) { + fwrite($fp, implode('', $this->logBuffer)); + flock($fp, LOCK_UN); + } else { + // 无锁情况下尝试写入 + fwrite($fp, implode('', $this->logBuffer)); + $this->logBuffer[] = "[" . date('H:i:s') . "] 警告: 日志写入未加锁,可能存在冲突风险\n"; + } + } catch (\Exception $e) { + $errorMsg = "[" . date('H:i:s') . "] 错误: 写入日志失败: {$e->getMessage()}\n"; + fwrite($fp, $errorMsg); + } finally { + fclose($fp); + } + + $this->logBuffer = []; + $this->lastLogTime = time(); } - - // 获取Redis值 - public function getRedisValue($key) + + /** + * 检查日志文件大小,超过限制则清空 + */ + public function checkAndTruncateLog() { - return $this->redis->get($key); + if (empty($this->logPath) || !file_exists($this->logPath)) { + return; + } + + // 清除文件状态缓存并获取大小 + clearstatcache(true, $this->logPath); + $fileSize = @filesize($this->logPath); + + if ($fileSize === false) { + $this->log("错误: 无法获取日志文件大小 {$this->logPath}"); + return; + } + + if ($fileSize >= $this->logMaxSize) { + $fp = fopen($this->logPath, 'w'); + if ($fp === false) { + $this->log("错误: 无法清空日志文件 {$this->logPath}"); + return; + } + + try { + if (flock($fp, LOCK_EX)) { + // 二次检查文件大小(避免竞态条件) + clearstatcache(true, $this->logPath); + if (filesize($this->logPath) >= $this->logMaxSize) { + ftruncate($fp, 0); + $this->log("日志文件超过" . $this->formatFileSize($this->logMaxSize) . ",已清空"); + } + flock($fp, LOCK_UN); + } + } catch (\Exception $e) { + $this->log("错误: 清空日志文件失败: {$e->getMessage()}"); + } finally { + fclose($fp); + } + } } - - // 安全释放锁(仅当值匹配时删除) - public function releaseRedisLock($key, $value) + + /** + * 格式化文件大小(字节转人类可读格式) + * @param int $bytes + * @return string + */ + public function formatFileSize($bytes) { - - // 使用Lua脚本确保原子性 - $script = <<redis->eval($script, [$key, $value], 1); + if ($bytes <= 0) { + return '0 B'; + } + + $units = ['B', 'KB', 'MB', 'GB', 'TB']; + $unitIndex = min(floor(log($bytes, 1024)), count($units) - 1); + $size = $bytes / pow(1024, $unitIndex); + + return number_format($size, 2) . ' ' . $units[$unitIndex]; } -} -?> \ No newline at end of file + + /** + * 获取重试延迟时间 + * @param string $errorMsg + * @return int + */ + public function getRetryDelay($errorMsg) + { + $delayMap = [ + 'MySQL server has gone away' => 60, + 'timeout' => 30, + 'OpenAI' => 45, + 'network' => 60 + ]; + + foreach ($delayMap as $keyword => $delay) { + if (stripos($errorMsg, $keyword) !== false) { // 不区分大小写匹配 + return $delay; + } + } + + return 10; + } + + /** + * 处理可重试异常 + * @param \Exception $e + * @param int $iLogId + * @param string $sRedisKey + * @param \think\queue\Job $job + */ + public function handleRetryableException($e, $iLogId, $sRedisKey, $job) + { + $sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage(); + $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); + $this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}"); + + if ($iLogId > 0) { + $this->updateLog([ + 'log_id' => $iLogId, + 'status' => 2, + 'error' => $sMsg . ':' . $sTrace, + ]); + } + + $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600); + + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->log("超过最大重试次数({$this->maxRetries}),停止重试"); + $job->delete(); + } else { + $delay = $this->getRetryDelay($sMsg); + $this->log("{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + + /** + * 处理不可重试异常 + * @param \Exception $e + * @param int $iLogId + * @param string $sRedisKey + * @param \think\queue\Job $job + */ + public function handleNonRetryableException($e, $iLogId, $sRedisKey, $job) + { + $sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage(); + $sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString(); + $this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}"); + + if ($iLogId > 0) { + $this->updateLog([ + 'log_id' => $iLogId, + 'status' => 3, // 3:不可重试失败 + 'error' => $sMsg . ':' . $sTrace, + ]); + } + + $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600); + $this->log("不可重试错误,直接删除任务"); + $job->delete(); + } + + /** + * 析构函数:确保最后日志被写入 + */ + public function __destruct() + { + $this->flushLog(); + } +} \ No newline at end of file diff --git a/application/common/QueueRedis.php b/application/common/QueueRedis.php index eccbe69..c6b05e3 100644 --- a/application/common/QueueRedis.php +++ b/application/common/QueueRedis.php @@ -34,9 +34,12 @@ class QueueRedis $this->config['host'] ?? '127.0.0.1', $this->config['port'] ?? 6379 ); - // 始终执行认证(空密码会被 Redis 忽略) - $this->redis->auth($this->config['password'] ?? ''); + if (!empty($this->config['password'])) { + $this->redis->auth($this->config['password']); + } + + $this->redis->select($this->config['select'] ?? 0); } @@ -149,7 +152,7 @@ LUA; 'completed' => 0, 'start_time' => time() ]); - $redis->expire($key, 21600); // 6小时过期 + $redis->expire($key, 10800); // 6小时过期 return true; } catch (\Exception $e) { return false; @@ -185,7 +188,7 @@ LUA; $redis = $this->connect(); $redis->hSet($key, "chunk_{$chunkIndex}", $content); // 确保设置过期时间(如果已设置则忽略) - $redis->expire($key, 21600); + $redis->expire($key, 10800); return true; } @@ -202,7 +205,7 @@ LUA; public function getConnectionStatus() { try { - return $this->connect()->ping() === '+PONG'; + return $this->connect()->ping(); } catch (\Exception $e) { return false; }