From 41007a130211708fe81593e2c27237fdf9604af2 Mon Sep 17 00:00:00 2001 From: chengxl Date: Fri, 15 Aug 2025 14:40:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=AC=E5=85=B1=E6=96=B9=E6=B3=95=E5=92=8C?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=96=B0=E5=A2=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/api/job/ArticleReview.php | 78 ++++ application/api/job/ArticleReviewForQueue.php | 81 ++++ .../api/job/ArticleReviewForQueueChunk.php | 81 ++++ .../api/job/TranslateContentForQueue.php | 102 +++++ application/common/HelperFunction.php | 380 ++++++++++++++++++ 5 files changed, 722 insertions(+) create mode 100644 application/api/job/ArticleReview.php create mode 100644 application/api/job/ArticleReviewForQueue.php create mode 100644 application/api/job/ArticleReviewForQueueChunk.php create mode 100644 application/api/job/TranslateContentForQueue.php create mode 100644 application/common/HelperFunction.php diff --git a/application/api/job/ArticleReview.php b/application/api/job/ArticleReview.php new file mode 100644 index 0000000..64cc707 --- /dev/null +++ b/application/api/job/ArticleReview.php @@ -0,0 +1,78 @@ +oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + } + + public function fire(Job $job, $data) + { + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); + + // 获取文章ID + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id']; + if (empty($iArticleId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + try { + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iArticleId}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + //生成内容 + $oAireview = new Aireview; + $response = $oAireview->review($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } + $sMsg = empty($aResult['msg']) ? 'success' : $aResult['msg']; + //更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); + + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (\Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } finally { + $this->oQueueJob->finnal(); + } + } +} \ No newline at end of file diff --git a/application/api/job/ArticleReviewForQueue.php b/application/api/job/ArticleReviewForQueue.php new file mode 100644 index 0000000..9aea199 --- /dev/null +++ b/application/api/job/ArticleReviewForQueue.php @@ -0,0 +1,81 @@ +oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + } + + public function fire(Job $job, $data) + { + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); + + try { + + // 验证任务数据完整性 + $iRedisId = empty($data['article_id']) ? 0 : $data['article_id']; + $sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex']; + if (empty($iRedisId)) { + $this->oQueueJob->log("无效的redis_id,删除任务"); + $job->delete(); + return; + } + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iRedisId}:{$sChunkIndex}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 + $oOpenAi = new OpenAi; + $response = $oOpenAi->articleReviewForQueue($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } + $sMsg = empty($aResult['msg']) ? '内容生成成功' : $aResult['msg']; + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); + + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } finally { + $this->oQueueJob->finnal(); + } + } +} \ No newline at end of file diff --git a/application/api/job/ArticleReviewForQueueChunk.php b/application/api/job/ArticleReviewForQueueChunk.php new file mode 100644 index 0000000..9c8cc78 --- /dev/null +++ b/application/api/job/ArticleReviewForQueueChunk.php @@ -0,0 +1,81 @@ +oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + } + + public function fire(Job $job, $data) + { + //任务开始判断 + $this->oQueueJob->init($job); + + // 获取 Redis 任务的原始数据 + $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); + $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); + $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; + + $this->oQueueJob->log("-----------队列任务开始-----------"); + $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); + + try { + + // 验证任务数据完整性 + $iRedisId = empty($data['article_id']) ? 0 : $data['article_id']; + $sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex']; + if (empty($iRedisId)) { + $this->oQueueJob->log("无效的redis_id,删除任务"); + $job->delete(); + return; + } + + // 生成Redis键并尝试获取锁 + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iRedisId}:{$sChunkIndex}"; + $sRedisValue = uniqid() . '_' . getmypid(); + if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { + return; // 未获取到锁,已处理 + } + + // 执行核心任务 + $oOpenAi = new OpenAi; + $response = $oOpenAi->articleReviewForQueueChunk($data); + // 验证API响应 + if (empty($response)) { + throw new RuntimeException("OpenAI API返回空结果"); + } + // 检查JSON解析错误 + $aResult = json_decode($response, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException("解析OpenAI响应失败: " . json_last_error_msg() . " | 原始响应: {$response}"); + } + $sMsg = empty($aResult['msg']) ? '内容生成成功' : $aResult['msg']; + + // 更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); + + } catch (RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job); + } catch (Exception $e) { + $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job); + } finally { + $this->oQueueJob->finnal(); + } + } +} \ No newline at end of file diff --git a/application/api/job/TranslateContentForQueue.php b/application/api/job/TranslateContentForQueue.php new file mode 100644 index 0000000..0f19de2 --- /dev/null +++ b/application/api/job/TranslateContentForQueue.php @@ -0,0 +1,102 @@ +oQueueJob = new QueueJob; + $this->QueueRedis = QueueRedis::getInstance(); + } + + public function fire(Job $job, $data) + { + $startTime = microtime(true); + $this->oQueueJob->log("-----------队列任务开始-----------"); + + // 检查数据库连接 + if (!$this->oQueueJob->checkDbConnection(true)) { + $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试"); + $job->release(10); + return; + } + // 检查Redis连接状态 + if (!$this->QueueRedis->getConnectionStatus()) { + $this->oQueueJob->log("Redis连接失败,10秒后重试"); + $job->release(10); + return; + } + + $iRedisId = empty($data['article_id']) ? 0 : $data['article_id']; + $sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex']; + if (empty($iRedisId)) { + $this->oQueueJob->log("无效的article_id,删除任务"); + $job->delete(); + return; + } + + $sClassName = get_class($this); + $sRedisKey = "queue_job:{$sClassName}:{$iRedisId}:{$sChunkIndex}"; + $sRedisValue = uniqid() . '_' . getmypid(); + $lockExpire = $this->lockExpire; + + $isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire); + if (!$isLocked) { + $jobStatus = $this->QueueRedis->getJobStatus($sRedisKey); + if (in_array($jobStatus, ['completed', 'failed'])) { + $this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}"); + $job->delete(); + } else { + $attempts = $job->attempts(); + if ($attempts >= $this->maxRetries) { + $this->oQueueJob->log("超过最大重试次数,停止重试"); + $job->delete(); + } else { + $lockTtl = $this->QueueRedis->getLockTtl($sRedisKey); + $delay = $lockTtl > 0 ? $lockTtl + 5 : 30; + $this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})"); + $job->release($delay); + } + } + return; + } + + try { + // 执行核心任务前再次检查连接 + $result = $this->oQueueJob->checkDbConnection(); + if (!$result) { + throw new \RuntimeException("数据库连接异常,无法执行核心任务"); + } + $oOpenAi = new OpenAi; + $aResult = json_decode($oOpenAi->translateContentForQueue($data), true); + $sMsg = empty($aResult['msg']) ? '内容翻译成功' : $aResult['msg']; + //更新完成标识 + $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue); + $job->delete(); + $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}"); + + } catch (\RuntimeException $e) { + $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); + } catch (\LogicException $e) { + $this->oQueueJob->handleNonRetryableException($e, $sRedisKey,$sRedisValue,$job); + } catch (\Exception $e) { + $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job); + } finally { + $executionTime = microtime(true) - $startTime; + $this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒"); + gc_collect_cycles(); + } + } +} \ No newline at end of file diff --git a/application/common/HelperFunction.php b/application/common/HelperFunction.php new file mode 100644 index 0000000..fb276a5 --- /dev/null +++ b/application/common/HelperFunction.php @@ -0,0 +1,380 @@ + $range) { + // 解析区间值(处理字符串和数字两种格式) + $rangeStr = is_string($range['range']) ? $range['range'] : (string)$range['range']; + $rangeParts = explode(',', $rangeStr); + // 单个值:表示 >= 该值 + if (count($rangeParts) == 1) { + $min = (int)$rangeParts[0]; + if ($iSort > $min) { + return $title; // 返回所属标题 + } + } + // 两个值:表示 [min, max] 闭区间(包含两端) + elseif (count($rangeParts) == 2) { + $min = (int)$rangeParts[0]; + $max = (int)$rangeParts[1]; + if ($iSort >= $min && $iSort <= $max) { + return $title; // 返回所属标题 + } + } + } + return ''; // 不在任何区间 + } + + /** + * 增强版流式响应解析 - 解决JSON片段拼接问题 + */ + public function parseMedicalStreamResponse($streamContent){ + $fullContent = ''; + $lines = explode("\n", $streamContent); + $validLines = 0; + $errorLines = 0; + foreach ($lines as $line) { + $line = trim($line); + if(empty($line)){ + continue; + } + // 处理DeepSeek的SSE格式 + if(strpos($line, 'data: ') === 0) { + // 检查结束标记 + if ($line === 'data: [DONE]') { + break; + } + + $jsonStr = substr($line, 6); + $jsonData = json_decode($jsonStr, true); + // 解析错误处理与修复 + if (json_last_error() !== JSON_ERROR_NONE) { + $errorLines++; + // 针对DeepSeek常见的JSON格式问题进行修复 + $jsonStr = $this->fixDeepSeekJson($jsonStr); + $jsonData = json_decode($jsonStr, true); + } + + // 提取内容(兼容DeepSeek的响应结构) + if (isset($jsonData['choices'][0]['delta']['content'])) { + $fullContent .= $jsonData['choices'][0]['delta']['content']; + $validLines++; + } elseif (isset($jsonData['choices'][0]['text'])) { + $fullContent .= $jsonData['choices'][0]['text']; + $validLines++; + } + } + } + // 记录解析统计,便于调试 + error_log("流式解析: 有效行{$validLines}, 错误行{$errorLines}"); + return $fullContent; + } + + /** + * 高性能DeepSeek JSON修复函数(终极版) + * 确保修复后的JSON字符串100%可解析,同时保持最优性能 + */ + private function fixDeepSeekJson($jsonStr) { + // 基础处理:去除首尾空白并处理空字符串(高效操作) + $jsonStr = trim($jsonStr); + if (empty($jsonStr)) { + return '{}'; + } + + // 1. 预处理:清除首尾干扰字符(减少正则使用) + $len = strlen($jsonStr); + $start = 0; + // 跳过开头的逗号和空白 + while ($start < $len && ($jsonStr[$start] === ',' || ctype_space($jsonStr[$start]))) { + $start++; + } + $end = $len - 1; + // 跳过结尾的逗号和空白 + while ($end >= $start && ($jsonStr[$end] === ',' || ctype_space($jsonStr[$end]))) { + $end--; + } + if ($start > 0 || $end < $len - 1) { + $jsonStr = substr($jsonStr, $start, $end - $start + 1); + $len = strlen($jsonStr); + // 处理截取后可能为空的情况 + if ($len === 0) { + return '{}'; + } + } + // 2. 括号平衡修复(核心逻辑保持,减少计算) + $braceDiff = substr_count($jsonStr, '{') - substr_count($jsonStr, '}'); + if ($braceDiff !== 0) { + if ($braceDiff > 0) { + $jsonStr .= str_repeat('}', $braceDiff); + } else { + // 仅在必要时使用正则移除多余括号 + $jsonStr = preg_replace('/}(?=([^"]*"[^"]*")*[^"]*$)/', '', $jsonStr, -$braceDiff); + } + } + $bracketDiff = substr_count($jsonStr, '[') - substr_count($jsonStr, ']'); + if ($bracketDiff !== 0) { + if ($bracketDiff > 0) { + $jsonStr .= str_repeat(']', $bracketDiff); + } else { + $jsonStr = preg_replace('/](?=([^"]*"[^"]*")*[^"]*$)/', '', $jsonStr, -$bracketDiff); + } + } + // 3. 控制字符清理(合并为单次处理) + $jsonStr = preg_replace( + '/([\x00-\x1F\x7F]|[^\x20-\x7E\xA0-\xFF]|\\\\u001f|\\\\u0000)/', + '', + $jsonStr + ); + // 4. 引号处理(仅在有引号时处理,减少操作) + if (strpos($jsonStr, '"') !== false) { + // 修复未转义引号(优化正则) + $jsonStr = preg_replace('/(?handleJsonError($jsonStr, $errorCode); + $attempts++; + } + + // 终极容错:如果所有尝试都失败,返回空JSON对象 + return '{}'; + } + + /** + * 根据JSON解析错误类型进行针对性修复 + */ + private function handleJsonError($jsonStr, $errorCode) { + switch ($errorCode) { + case JSON_ERROR_SYNTAX: + // 语法错误:尝试更激进的清理 + $jsonStr = preg_replace('/[^\w{}[\]":,.\s\\\]/', '', $jsonStr); + $jsonStr = preg_replace('/,\s*([}\]])/', ' $1', $jsonStr); + break; + + case JSON_ERROR_CTRL_CHAR: + // 控制字符错误:进一步清理控制字符 + $jsonStr = preg_replace('/[\x00-\x1F\x7F]/u', '', $jsonStr); + break; + + case JSON_ERROR_UTF8: + // UTF8编码错误:尝试重新编码 + $jsonStr = utf8_encode(utf8_decode($jsonStr)); + break; + + default: + // 其他错误:使用备用修复策略 + $jsonStr = $this->fallbackJsonFix($jsonStr); + } + + return $jsonStr; + } + + /** + * 备用JSON修复策略(更激进的修复方式) + * 当主修复逻辑失败时使用 + */ + private function fallbackJsonFix($jsonStr) { + // 更彻底的清理 + $jsonStr = preg_replace('/[^\w{}[\]":,.\s\\\]/u', '', $jsonStr); + + + if (!preg_match('/^[\[{]/', $jsonStr)) { + $jsonStr = '{' . $jsonStr . '}'; + } + + // 最后尝试平衡括号 + $openBrace = substr_count($jsonStr, '{'); + $closeBrace = substr_count($jsonStr, '}'); + $jsonStr .= str_repeat('}', max(0, $openBrace - $closeBrace)); + + $openBracket = substr_count($jsonStr, '['); + $closeBracket = substr_count($jsonStr, ']'); + $jsonStr .= str_repeat(']', max(0, $openBracket - $closeBracket)); + + // 确保结尾正确 + $lastChar = substr($jsonStr, -1); + if ($lastChar !== '}' && $lastChar !== ']') { + $jsonStr .= preg_match('/^\{/', $jsonStr) ? '}' : ']'; + } + + return $jsonStr; + } + + /** + * 从文本中提取被```json```和```包裹的JSON内容并解析 + * @param string $text 包含JSON代码块的文本 + * @param bool $assoc 是否返回关联数组(默认true) + * @return array|object 解析后的JSON数据,失败时返回null + */ + public function extractAndParse($text, $assoc = true){ + // 尝试提取标准JSON代码块 + preg_match('/```json\s*(\{.*?\})\s*```/s', $text, $matches); + $jsonContent = empty($matches[1]) ? $text : $matches[1]; + + // 若未提取到,尝试宽松匹配(允许没有json标记) + if (empty($jsonContent)) { + preg_match('/```\s*(\{.*?\})\s*```/s', $text, $matches); + $jsonContent = empty($matches[1]) ? $text : $matches[1]; + } + + // 清理JSON内容,去除多余标记和控制字符 + $jsonContent = trim(trim($jsonContent, '```json'), '```'); + $jsonContent = preg_replace('/[\x00-\x1F\x7F]/', '', $jsonContent); // 过滤所有控制字符 + + // 解析JSON + $aData = json_decode($jsonContent, $assoc); + + // 检查解析结果 + if (json_last_error() !== JSON_ERROR_NONE) { + return [ + 'status' => 2, + 'msg' => "API返回无效JSON: " . json_last_error_msg() . '===============' . $jsonContent, + 'data' => null + ]; + } + return ['status' => 1, 'msg' => 'success', 'data' => $aData]; + } + + /** + * 文本分块(按字符估算token) + */ + public function splitContent($content, $maxChunkTokens=12000, $charPerToken = 4, $overlap = 200){ + $chunks = []; + $maxChars = $maxChunkTokens * $charPerToken; + $contentLength = strlen($content); + $start = 0; + + while ($start < $contentLength) { + $end = $start + $maxChars; + if ($end >= $contentLength) { + $chunks[] = substr($content, $start); + break; + } + + // 寻找最佳拆分点(优先段落,再句子) + $delimiters = ["\n\n", ". ", "! ", "? ", "; ", " "];; + $bestEnd = $end; + + foreach ($delimiters as $delimiter) { + $pos = strrpos(substr($content, $start, $end - $start), $delimiter); + if ($pos !== false) { + $bestEnd = $start + $pos + strlen($delimiter); + break; + } + } + + // 截取当前块 + $chunks[] = substr($content, $start, $bestEnd - $start); + + // 下一块起始位置(回退重叠部分) + $start = max($start, $bestEnd - $overlap); + } + + return $chunks; + } + /** + * 处理文本过滤标签 + */ + public function filterAllTags($sContent, $config = []){ + + // 初始化默认配置 + $purifierConfig = HTMLPurifier_Config::createDefault(); + + // 设置默认规则(可根据需求调整) + $purifierConfig->set('Core.Encoding', 'UTF-8'); // 编码 + $purifierConfig->set('HTML.Allowed', ''); // 允许的标签及属性 + $purifierConfig->set('CSS.AllowedProperties', 'color,font-weight'); // 允许的CSS属性 + + // 合并自定义配置(覆盖默认值) + foreach ($config as $key => $value) { + $purifierConfig->set($key, $value); + } + + // 实例化并过滤 + $purifier = new HTMLPurifier($purifierConfig); + return $purifier->purify($sContent); + } + + /** + * 字符串过滤 + * @param $messages 内容 + * @param $model 模型类型 + */ + public function func_safe($data,$ignore_magic_quotes=false){ + if(is_string($data)){ + $data=trim(htmlspecialchars($data));//防止被挂马,跨站攻击 + if(($ignore_magic_quotes==true)){ + $data = addslashes($data);//防止sql注入 + } + return $data; + }else if(is_array($data)){//如果是数组采用递归过滤 + foreach($data as $key=>$value){ + $data[$key]=func_safe($value); + } + return $data; + }else{ + return $data; + } + } + +}