diff --git a/application/api/controller/Aiarticle.php b/application/api/controller/Aiarticle.php index ed5d4e6..bdfde04 100644 --- a/application/api/controller/Aiarticle.php +++ b/application/api/controller/Aiarticle.php @@ -85,10 +85,8 @@ class Aiarticle extends Base } //查询AI内容是否生成 - $aAiArticle = json_decode($this->getAiArticle(['article_id' => $iArticleId]),true); - $aAiArticleContent = empty($aAiArticle['data']) ? [] : $aAiArticle['data']; - $aAiArticle = empty($aAiArticleContent['ai_article']) ? [] : $aAiArticleContent['ai_article']; - + $aWhere = ['is_delete' => 2,'article_id' => $iArticleId]; + $aAiArticle = Db::name('ai_article')->where($aWhere)->find(); $iId = empty($aAiArticle['ai_article_id']) ? 0 : $aAiArticle['ai_article_id']; if(empty($aAiArticle)){ //插入t_ai_article数据 @@ -100,6 +98,7 @@ class Aiarticle extends Base } $aAiArticle = array_merge(['ai_article_id' => $iId,'article_id' => $iArticleId,'is_generate' => 2],$aInsert); } + //判断是否生成 if(!empty($aAiArticle['is_generate']) && $aAiArticle['is_generate'] == 1){ return json_encode(['status' => 5,'msg' => 'The data has been generated, please proceed with the next steps']); diff --git a/application/api/controller/Article.php b/application/api/controller/Article.php index 1841f4b..8451ee5 100644 --- a/application/api/controller/Article.php +++ b/application/api/controller/Article.php @@ -2005,6 +2005,21 @@ class Article extends Base return jsonError("The article can only be added in state with editor at least"); } + //获取审稿人最后一次邀请时间 chengxiaoling 20250724 start + $iUserId = empty($data['uid']) ? 0 : $data['uid']; + if(empty($iUserId)){ + return jsonError("Please select the reviewers to invite!"); + } + //判断距离上次邀请审稿是否超过10天 + $aWhere = ['reviewer_id' => $iUserId]; + $iTeenDaysLater = strtotime('-10 days');// 计算10天之前的时间戳 + $aUserInfo = Db::name('user_reviewer_info')->field('last_invite_time')->where($aWhere)->where('t_user_reviewer_info.last_invite_time', '<', $iTeenDaysLater) + ->whereOr('t_user_reviewer_info.last_invite_time', '=', 0)->find(); + if(empty($aUserInfo)){ + return jsonError("The time since the last invitation for review by the reviewer has not exceeded 10 days!"); + } + //获取审稿人最后一次邀请时间 chengxiaoling 20250724 end + //增加信息到文章审稿表 $insert_data['reviewer_id'] = $data['uid']; $insert_data['article_id'] = $data['articleId']; @@ -2013,6 +2028,14 @@ class Article extends Base $insert_data['state'] = 5; $res = $this->article_reviewer_obj->insertGetId($insert_data); + //更新审稿人最后一次审稿时间 chengxiaoling 20250724 start + if(!empty($res) && !empty($insert_data['reviewer_id'])){ + $aUpdate = ['last_invite_time'=>time()]; + $aWhere = ['reviewer_id' => $iUserId]; + $updateResult = Db::name('user_reviewer_info')->where($aWhere)->limit(1)->update($aUpdate); + } + //更新审稿人最后一次审稿时间 chengxiaoling 20250724 end + //修改文章状态->审稿中 $this->article_obj->where('article_id', $data['articleId'])->update(['state' => 2]); @@ -3063,7 +3086,7 @@ class Article extends Base 'journal' => 'require', 'title' => 'require', 'type' => 'require', - 'major' => 'require', + // 'major' => 'require', 'abstrart' => 'require' ]); if (!$rule->check($data)) { @@ -3114,6 +3137,12 @@ class Article extends Base $inset_data['ctime'] = time(); $inset_data['state'] = -1; + + //新增字段是否使用AI及使用说明 chengxiaoling 20250725 start + $inset_data['is_use_ai'] = empty($data['is_use_ai']) ? 2 : $data['is_use_ai']; //是否使用AI1是2否 + $inset_data['use_ai_explain'] = isset($data['use_ai_explain']) ? $data['use_ai_explain'] : ''; + //新增字段是否使用AI及使用说明 chengxiaoling 20250725 end + $article_id = $this->article_obj->insertGetId($inset_data); } else { @@ -3141,9 +3170,20 @@ class Article extends Base $up["approval"] = 0; $up['approval_content'] = isset($data["approval_content"]) ? $data["approval_content"] : '';//trim($data['approval_content']); } + + //新增字段是否使用AI及使用说明 chengxiaoling 20250725 start + $up['is_use_ai'] = empty($data['is_use_ai']) ? 2 : $data['is_use_ai']; //是否使用AI1是2否 + $up['use_ai_explain'] = isset($data['use_ai_explain']) ? $data['use_ai_explain'] : '';//使用AI说明 + //新增字段是否使用AI及使用说明 chengxiaoling 20250725 end + $this->article_obj->where('article_id', $article_id)->update($up); } - changeArticleMajor($article_id, $data['major']); + //注释文章筛选领域添加修改为AI推荐领域,在第四步可以查看修改 chengxiaoling 20250722 + // changeArticleMajor($article_id,$data['major']); + if(!empty($article_id)){//AI推荐领域队列执行 + $sQueueId = \think\Queue::push('app\api\job\RecommendArticleField@fire',['article_id' => $article_id], 'RecommendArticleField'); + } + //注释文章筛选领域添加修改为AI推荐领域,在第四步可以查看修改 chengxiaoling 20250722 return jsonSuccess(['article_id' => $article_id]); } @@ -3451,6 +3491,15 @@ class Article extends Base } $this->article_obj->where('article_id', $data['article_id'])->update($update_l); $this->ai_scor($data['article_id']); + + //判断是否有文章领域 进行更新操作 chengxiaoling 20250722 start + $sMajorData = empty($data['article_field']) ? '' : $data['article_field'];//文章领域 + $iArticleId = empty($data['article_id']) ? 0 : $data['article_id'];//文章ID + if(!empty($sMajorData) && !empty($iArticleId)){ + $this->updateArticleField(['article_id' => $iArticleId,'article_field' => $sMajorData]); + } + //判断是否有文章领域 进行更新操作 chengxiaoling 20250722 end + return json(['code' => 0]); } @@ -4617,7 +4666,7 @@ class Article extends Base public function getArticleField($aParam = []) { - $aParam = empty($aParam) ? $this->request->post() : $this->request->post(); + $aParam = empty($aParam) ? $this->request->post() : $aParam; if (empty($aParam['article_id'])) { return json_encode(['status' => 2, 'msg' => 'Please select a Article']); @@ -4646,7 +4695,7 @@ class Article extends Base public function updateArticleField($aParam = []) { - $aParam = empty($aParam) ? $this->request->post() : $this->request->post(); + $aParam = empty($aParam) ? $this->request->post() : $aParam; $iArticleId = empty($aParam['article_id']) ? 0 : $aParam['article_id']; if (empty($iArticleId)) { return json_encode(['status' => 2, 'msg' => 'Please select a article']); diff --git a/application/api/controller/Queueinfo.php b/application/api/controller/Queueinfo.php index 6144849..c007f33 100644 --- a/application/api/controller/Queueinfo.php +++ b/application/api/controller/Queueinfo.php @@ -33,7 +33,7 @@ class Queueinfo extends Base $redis->select($config['select']); // 获取不同队列类型的任务数 - $aQueue = empty($aParam['queue_name']) ? ['ArticleAiCreateContent','RecommendReviewer','RelatedArticle','ReviewerScore','RevisionReviewer','SendRelatedArticleEmail','SendReviewEmail','WechatDraft','WechatDraftPublish','WechatMaterial','WechatQueryStatus'] : [$aParam['queue_name']]; + $aQueue = empty($aParam['queue_name']) ? ['ArticleAiCreateContent','RecommendReviewer','RelatedArticle','ReviewerScore','RevisionReviewer','SendRelatedArticleEmail','SendReviewEmail','WechatDraft','WechatDraftPublish','WechatMaterial','WechatQueryStatus','createFieldForQueue','RecommendArticleField'] : [$aParam['queue_name']]; foreach ($aQueue as $key => $value) { $types[$value] = [ 'pending' => 'queues:'.$value, @@ -53,7 +53,8 @@ class Queueinfo extends Base } } - + // 关闭 Redis 连接(关键步骤) + $redis->close(); echo '
';var_dump($counts);
}
public function removeKey() {
@@ -99,7 +100,8 @@ echo '';var_dump($counts);
}
}
-
+ // 关闭 Redis 连接(关键步骤)
+ $redis->close();
echo '';var_dump($counts);
}
// Redis值删除
@@ -142,7 +144,8 @@ echo '';var_dump($counts);
}
echo '';var_dump($sQueueInfo,$sKey,$iNum);exit;
-
+ // 关闭 Redis 连接(关键步骤)
+ $redis->close();
}
@@ -171,36 +174,56 @@ echo '';var_dump($sQueueInfo,$sKey,$iNum);exit;
$sKey = $redis->get($aParam['key_name']);
var_dump($sKey);
+ // 关闭 Redis 连接(关键步骤)
+ $redis->close();
}
public function removeKey1(){
+ // 获取请求参数
$aParam = $this->request->post();
- if(empty($aParam['key_name'])){
+ if (empty($aParam['key_name'])) {
exit('非法操作');
}
+ // 获取队列配置(Redis 配置)
$config = \think\Config::get('queue');
+ // 队列名称默认值处理
$sQueueName = empty($aParam['queue_name']) ? 'ArticleAiCreateContent' : $aParam['queue_name'];
+
+ // 初始化 Redis 连接
$redis = new \Redis();
+ // 连接 Redis 服务器
$redis->connect($config['host'], $config['port']);
-
+
+ // 若有密码则认证
if (!empty($config['password'])) {
$redis->auth($config['password']);
}
-
+
+ // 选择数据库(默认 0)
$redis->select($config['select']);
- $sKey = $redis->hGetAll($aParam['key_name']);
+ // 操作示例:获取哈希表数据
echo '我是HgetAll';
+ $sKey = $redis->hGetAll($aParam['key_name']);
var_dump($sKey);
+
+ // 删除哈希表
$result = $redis->del($aParam['key_name']);
var_dump($result);
+
+ // 尝试获取字符串类型的键(验证是否删除)
echo '我是get';
$sKey = $redis->get($aParam['key_name']);
var_dump($sKey);
+
+ // 再次删除(确保删除)
$result = $redis->del($aParam['key_name']);
var_dump($result);
+
+ // 关闭 Redis 连接(关键步骤)
+ $redis->close();
}
}
diff --git a/application/api/controller/Reviewer.php b/application/api/controller/Reviewer.php
index 8a337bb..7c732fd 100644
--- a/application/api/controller/Reviewer.php
+++ b/application/api/controller/Reviewer.php
@@ -2182,11 +2182,19 @@ class Reviewer extends Base
if (isset($data['major_id'])&&$data['major_id']!=0){
$where['t_user_reviewer_info.major'] = ['in',$this->majorids($data['major_id'])];
}
+
+ // 计算10天之后的时间戳(10天 = 10 * 24 * 60 * 60秒)
+ $iTeenDaysLater = strtotime('-10 days');
+
+ //获取邀请时间超过10天的 chengxiaoling 20250728 start
//获取总条数
$count = $this->reviewer_to_journal_obj
->join("t_user", "t_user.user_id = t_reviewer_to_journal.reviewer_id", "left")
->join("t_user_reviewer_info", "t_user_reviewer_info.reviewer_id = t_reviewer_to_journal.reviewer_id", "left")
- ->where($where)
+ ->where($where)->where(function($query) use ($iTeenDaysLater) {
+ $query->where('t_user_reviewer_info.last_invite_time', '<', $iTeenDaysLater)
+ ->whereOr('t_user_reviewer_info.last_invite_time', '=', 0);
+ })
->count();
if(empty($count)){
return jsonSuccess(['reviewers' => [],'count' => 0]);
@@ -2197,10 +2205,14 @@ class Reviewer extends Base
->join("t_user", "t_user.user_id = t_reviewer_to_journal.reviewer_id", "left")
->join("t_user_reviewer_info", "t_user_reviewer_info.reviewer_id = t_reviewer_to_journal.reviewer_id", "left")
->field('t_user.account,t_user.email,t_user.realname,t_user_reviewer_info.company,t_user_reviewer_info.field,t_user.user_id,t_user.rs_num')
- ->where($where)
+ ->where($where)->where(function($query) use ($iTeenDaysLater) {
+ $query->where('t_user_reviewer_info.last_invite_time', '<', $iTeenDaysLater)
+ ->whereOr('t_user_reviewer_info.last_invite_time', '=', 0);
+ })
->order('t_user.rs_num desc')
->limit($limit_start, $data['pageSize'])
->select();
+ //获取邀请时间超过10天的 chengxiaoling 20250728 end
if(!empty($list)){
$aUserId = array_column($list, 'user_id');
$aWhere = ['state' => 0,'reviewer_id' => ['in',$aUserId]];
diff --git a/application/api/job/ArticleAiCreateContent.php b/application/api/job/ArticleAiCreateContent.php
index cccd965..0ad128e 100644
--- a/application/api/job/ArticleAiCreateContent.php
+++ b/application/api/job/ArticleAiCreateContent.php
@@ -7,25 +7,17 @@ use app\common\QueueRedis;
use app\api\controller\Aiarticle;
class ArticleAiCreateContent
{
- private $logPath;
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/ArticleAiCreateContent_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +25,17 @@ class ArticleAiCreateContent
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -72,54 +70,32 @@ class ArticleAiCreateContent
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
-
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
//生成内容
$oAiarticle = new Aiarticle;
$aResult = json_decode($oAiarticle->create($data),true);
$sMsg = empty($aResult['msg']) ? '内容生成失败' : $aResult['msg'];
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ //更新完成标识
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/RecommendArticleField.php b/application/api/job/RecommendArticleField.php
index 66285a3..c2c7749 100644
--- a/application/api/job/RecommendArticleField.php
+++ b/application/api/job/RecommendArticleField.php
@@ -7,25 +7,17 @@ use app\common\QueueRedis;
use app\common\Article;
class RecommendArticleField
{
- private $logPath;
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/ArticleFiled_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +25,17 @@ class RecommendArticleField
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -45,7 +43,6 @@ class RecommendArticleField
if (empty($iArticleId)) {
$this->oQueueJob->log("无效的article_id,删除任务");
$job->delete();
- $this->oQueueJob->flushLog();
return;
}
@@ -72,52 +69,31 @@ class RecommendArticleField
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
$oArticle = new Article;
$aResult = json_decode($oArticle->getAiField($data), true);
$sMsg = empty($aResult['msg']) ? '内容生成成功' : $aResult['msg'];
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ //更新完成标识
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}|执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey}|执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e, $sRedisKey, $sRedisValue,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/RecommendReviewer.php b/application/api/job/RecommendReviewer.php
index 4986609..965c384 100644
--- a/application/api/job/RecommendReviewer.php
+++ b/application/api/job/RecommendReviewer.php
@@ -11,21 +11,14 @@ class RecommendReviewer
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/RecommendReviewer_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +26,17 @@ class RecommendReviewer
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -72,29 +71,16 @@ class RecommendReviewer
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
- return;
- }
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
return;
}
try {
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
//获取推荐审稿人信息
$aParam = ['article_id' => $iArticleId,'page' => 1,'size' => empty($data['size']) ? 5 : $data['size']];
$oReviewer = new Reviewer;
@@ -130,28 +116,20 @@ class RecommendReviewer
$sMsg .= empty($aResult['msg']) ? 'Reviewer data insertion failed' : $aResult['msg'];
}
}
- //更新日志
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ //更新完成标识
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/RelatedArticle.php b/application/api/job/RelatedArticle.php
index 62d0491..f35b877 100644
--- a/application/api/job/RelatedArticle.php
+++ b/application/api/job/RelatedArticle.php
@@ -11,21 +11,14 @@ class RelatedArticle
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/RelatedArticle_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +26,17 @@ class RelatedArticle
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -72,29 +71,16 @@ class RelatedArticle
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
+
//查询文章所关联的文章
$oJournalArticle = new JournalArticle;
$aResult = json_decode(JournalArticle::get($data),true);
@@ -102,27 +88,19 @@ class RelatedArticle
$sMsg = empty($aResult['msg']) ? '获取相关文章信息失败' : $aResult['msg'];
//更新日志
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/ReviewerScore.php b/application/api/job/ReviewerScore.php
index 90ed628..5d672d0 100644
--- a/application/api/job/ReviewerScore.php
+++ b/application/api/job/ReviewerScore.php
@@ -11,21 +11,14 @@ class ReviewerScore
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/ReviewerScore_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +26,17 @@ class ReviewerScore
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -76,57 +75,34 @@ class ReviewerScore
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
- return;
- }
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
return;
}
try {
- // 执行核心任务
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
+
$aParam = ['article_id' => $iArticleId,'reviewer_id' => $iReviewerId,'art_rev_id' => $iArtRevId];
$oReviewer = new Reviewer;
$aResult = json_decode($oReviewer->score($aParam),true);
$sMsg = empty($aResult['msg']) ? '给审稿人评分处理失败' : $aResult['msg'];
-
- //更新日志
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/RevisionReviewer.php b/application/api/job/RevisionReviewer.php
index 5f96bd5..93529d7 100644
--- a/application/api/job/RevisionReviewer.php
+++ b/application/api/job/RevisionReviewer.php
@@ -11,21 +11,14 @@ class RevisionReviewer
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/RevisionReviewer_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +26,17 @@ class RevisionReviewer
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -72,29 +71,15 @@ class RevisionReviewer
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
//获取符合条件的文章审稿人信息
$aParam = ['article_id' => $iArticleId];
$oReviewer = new Reviewer;
@@ -102,27 +87,19 @@ class RevisionReviewer
$sMsg = empty($aResult['msg']) ? '审稿人同意审稿但超时未审的数据失败' : $aResult['msg'];
//更新日志
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/SendRelatedArticleEmail.php b/application/api/job/SendRelatedArticleEmail.php
index 353178c..90a68e0 100644
--- a/application/api/job/SendRelatedArticleEmail.php
+++ b/application/api/job/SendRelatedArticleEmail.php
@@ -11,21 +11,14 @@ class SendRelatedArticleEmail
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/SendRelatedArticleEmail_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +26,17 @@ class SendRelatedArticleEmail
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -92,29 +91,15 @@ class SendRelatedArticleEmail
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
//查询是否发送过邮件
$oJournalArticle = new JournalArticle;
$aLog = json_decode($oJournalArticle::getLog(['article_id' => $iArticleId,'article_author_id' => $article_author_id,'related_article_id' => $related_article_id,'is_success' => 1]),true);
@@ -136,27 +121,19 @@ class SendRelatedArticleEmail
}
//更新日志
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue, $job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue, $job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/SendReviewEmail.php b/application/api/job/SendReviewEmail.php
index a140f98..11a1bb2 100644
--- a/application/api/job/SendReviewEmail.php
+++ b/application/api/job/SendReviewEmail.php
@@ -11,21 +11,14 @@ class SendReviewEmail
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/SendReviewEmail_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,16 +26,21 @@ class SendReviewEmail
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
// 获取文章ID
- // 获取文章ID
$iArticleId = empty($data['article_id']) ? 0 : $data['article_id'];
//作者邮箱
$email = empty($data['email']) ? '' : $data['email'];
@@ -62,8 +60,8 @@ class SendReviewEmail
$reviewer_id = empty($data['reviewer_id']) ? 0 : $data['reviewer_id'];
//邮件类型
$type = empty($data['type']) ? 1 : $data['type'];
- if (empty($iArticleId)) {
- $this->oQueueJob->log("无效的article_id,删除任务");
+ if (empty($iArticleId) || empty($email)) {
+ $this->oQueueJob->log("无效的article_id/email,删除任务");
$job->delete();
return;
}
@@ -92,29 +90,15 @@ class SendReviewEmail
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
//查询是否发送过邮件
$oReviewer = new Reviewer;
if($type != 3){
@@ -137,27 +121,19 @@ class SendReviewEmail
$iId = $oReviewer->addLog($aEmailLog);
}
//更新日志
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/WechatDraft.php b/application/api/job/WechatDraft.php
index 08d70ad..031caf0 100644
--- a/application/api/job/WechatDraft.php
+++ b/application/api/job/WechatDraft.php
@@ -11,21 +11,14 @@ class WechatDraft
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/WechatDraft_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +26,17 @@ class WechatDraft
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -72,56 +71,35 @@ class WechatDraft
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
+
//上传草稿箱
$oAiarticle = new Aiarticle;
$aResult = json_decode($oAiarticle->syncWechat($data),true);
$sMsg = empty($aResult['msg']) ? '上传草稿箱失败' : $aResult['msg'];
//更新日志
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/WechatDraftPublish.php b/application/api/job/WechatDraftPublish.php
index 60b280e..93ef526 100644
--- a/application/api/job/WechatDraftPublish.php
+++ b/application/api/job/WechatDraftPublish.php
@@ -11,21 +11,14 @@ class WechatDraftPublish
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/WechatDraftPublish_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +26,17 @@ class WechatDraftPublish
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -72,56 +71,35 @@ class WechatDraftPublish
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
+
//发布草稿箱
$oAiarticle = new Aiarticle;
$aResult = json_decode($oAiarticle->publishDraft($data),true);
$sMsg = empty($aResult['msg']) ? '草稿箱发布失败' : $aResult['msg'];
- //更新任务状态
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ //更新日志
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/WechatMaterial.php b/application/api/job/WechatMaterial.php
index caa098d..2a2cfcf 100644
--- a/application/api/job/WechatMaterial.php
+++ b/application/api/job/WechatMaterial.php
@@ -11,21 +11,14 @@ class WechatMaterial
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/WechatMaterial_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +26,17 @@ class WechatMaterial
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -72,56 +71,36 @@ class WechatMaterial
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
+
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
+
//上传素材
$oAiarticle = new Aiarticle;
$aResult = json_decode($oAiarticle->uploadMaterial($data),true);
$sMsg = empty($aResult['msg']) ? '上传素材失败' : $aResult['msg'];
- //更新任务状态
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ //更新日志
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/WechatQueryStatus.php b/application/api/job/WechatQueryStatus.php
index aec968f..77d67f9 100644
--- a/application/api/job/WechatQueryStatus.php
+++ b/application/api/job/WechatQueryStatus.php
@@ -11,21 +11,14 @@ class WechatQueryStatus
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/WechatQueryStatus_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -33,11 +26,17 @@ class WechatQueryStatus
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
+
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -72,56 +71,37 @@ class WechatQueryStatus
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
return;
}
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
- return;
- }
-
try {
+
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
+
// 查询状态
$oAiarticle = new Aiarticle;
$aResult = json_decode($oAiarticle->queryStatus($data),true);
$sMsg = empty($aResult['msg']) ? '查询草稿箱文章是否发布失败' : $aResult['msg'];
- //更新任务状态
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ //更新日志
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/api/job/createFieldForQueue.php b/application/api/job/createFieldForQueue.php
index 50fa7a2..1c38e7b 100644
--- a/application/api/job/createFieldForQueue.php
+++ b/application/api/job/createFieldForQueue.php
@@ -8,25 +8,17 @@ use app\common\QueueRedis;
class createFieldForQueue
{
- private $logPath;
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
- $this->logPath = ROOT_PATH . 'public/queue_log/createFieldForQueue_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- // 确保日志目录存在
- $this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
@@ -34,11 +26,16 @@ class createFieldForQueue
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
+ // 检查数据库连接
+ if (!$this->oQueueJob->checkDbConnection(true)) {
+ $this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
+ $job->release(10);
+ return;
+ }
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败,10秒后重试");
$job->release(10);
- $this->oQueueJob->flushLog();
return;
}
@@ -47,7 +44,6 @@ class createFieldForQueue
if (empty($iRedisId)) {
$this->oQueueJob->log("无效的redis_id,删除任务");
$job->delete();
- $this->oQueueJob->flushLog();
return;
}
@@ -74,53 +70,32 @@ class createFieldForQueue
$job->release($delay);
}
}
- $this->oQueueJob->flushLog();
- return;
- }
-
- $aParam = [
- 'job_id' => $sRedisKey,
- 'job_class' => $sClassName,
- 'status' => 0,
- 'create_time' => time(),
- 'params' => json_encode($data, self::JSON_OPTIONS)
- ];
-
- $iLogId = $this->oQueueJob->addLog($aParam);
- if (!$iLogId) {
- $this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
- $this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
- $job->delete();
- $this->oQueueJob->flushLog();
return;
}
try {
+ // 执行核心任务前再次检查连接
+ $result = $this->oQueueJob->checkDbConnection();
+ if (!$result) {
+ throw new \RuntimeException("数据库连接异常,无法执行核心任务");
+ }
$oOpenAi = new OpenAi;
$aResult = json_decode($oOpenAi->createFieldForQueue($data), true);
$sMsg = empty($aResult['msg']) ? '内容生成成功' : $aResult['msg'];
-
- $this->oQueueJob->updateLog([
- 'log_id' => $iLogId,
- 'status' => 1,
- 'update_time' => time(),
- 'error' => $sMsg
- ]);
-
- $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
+ //更新完成标识
+ $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
- $this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
+ $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job);
} catch (\LogicException $e) {
- $this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleNonRetryableException($e, $sRedisKey,$sRedisValue,$job);
} catch (\Exception $e) {
- $this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
+ $this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
- $this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
diff --git a/application/common/OpenAi.php b/application/common/OpenAi.php
index 7d8e2bd..e2219b6 100644
--- a/application/common/OpenAi.php
+++ b/application/common/OpenAi.php
@@ -24,7 +24,7 @@ class OpenAi
protected $sTmrUrl = "http://journalapi.tmrjournals.com/public/index.php";//"http://zmzm.journal.dev.com/";//;
protected $aArticleImportantPrompt = [
"journal_scope" => [
- 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。
+ 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。
请针对问题提供客观、专业的评估,并给出简要的理由。请返回中文解释!返回格式必须严格遵循以下JSON结构:{
"journal_scope": {
"assessment": "是/否",
@@ -34,19 +34,19 @@ class OpenAi
"criteria" => "根据文章的标题:{title};摘要:{abstrart}以及期刊范围:{scope}来判断文章是否符合目标期刊{journal_name}"
],
"attribute" => [
- 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。
+ 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。
请针对问题提供客观、专业的评估,并给出简要的理由。请返回中文解释!返回格式必须严格遵循以下JSON结构:{
"attribute": {
"assessment": "是/否",
"explanation": "请总结归纳分析"
}
}',
- "criteria" => "请结合以下几点【研究内容的原创性:论文中的研究内容是否与已有的研究重复?是否在同样的领域提出了类似的结论,但在方法或结果上有所创新?如果有,作者是否清楚地解释了如何与之前的研究不同,或者如何在原有基础上进行扩展或改进?如果是综述文章,汇总并综合最新的研究成果,尤其是近几年内的重要发现,展示领域内最新的进展成果。作者可以识别出未被充分讨论的问题或提出新的研究问题,而不是简单文献堆砌。文章中的图表创新能否将信息的清晰呈现,方便读者理解复杂研究问题。论文方法创新性评估要点:是否采用了新的实验模型或创新的实验设计,能有效解决当前研究中的难点或空白?是否有合理的对照组和多组实验设计,确保研究结果的可靠性?是否使用了当前前沿的技术(如高通量测序、CRISPR基因编辑等),提高了实验精度或数据分析能力?是否结合了跨学科的方法(如生物信息学、人工智能等)?是否应用了多种验证手段或统计方法,确保结果的可信度?是否通过细胞实验、动物模型等多重验证,确保实验结果的可靠性?结论与数据的创新性:研究结论是否提出了新观点或新见解?是否提供了新的实验数据或观察结果,能够突破当前的研究局限?例如,发现了新的生物标志物,或对已知生物通路的作用机制提供了全新的解释】评估文章内容{content}是否有科学前沿性和创新性?"
+ "criteria" => "请结合以下几点【研究内容的原创性:论文中的研究内容是否与已有的研究重复?是否在同样的领域提出了类似的结论,但在方法或结果上有所创新?如果有,作者是否清楚地解释了如何与之前的研究不同,或者如何在原有基础上进行扩展或改进?如果是综述文章,汇总并综合最新的研究成果,尤其是近几年内的重要发现,展示领域内最新的进展成果。作者可以识别出未被充分讨论的问题或提出新的研究问题,而不是简单文献堆砌。文章中的图表创新能否将信息的清晰呈现,方便读者理解复杂研究问题。论文方法创新性评估要点:是否采用了新的实验模型或创新的实验设计,能有效解决当前研究中的难点或空白?是否有合理的对照组和多组实验设计,确保研究结果的可靠性?是否使用了当前前沿的技术(如高通量测序、CRISPR基因编辑等),提高了实验精度或数据分析能力?是否结合了跨学科的方法(如生物信息学、人工智能等)?是否应用了多种验证手段或统计方法,确保结果的可信度?是否通过细胞实验、动物模型等多重验证,确保实验结果的可靠性?结论与数据的创新性:研究结论是否提出了新观点或新见解?是否提供了新的实验数据或观察结果,能够突破当前的研究局限?例如,发现了新的生物标志物,或对已知生物通路的作用机制提供了全新的解释】评估文章内容{content}是否有科学前沿性和创新性?"
]
//,
// "contradiction" => [
- // 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。
+ // 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。
// 请针对问题提供客观、专业的评估,并给出简要的理由。请返回中文解释!返回格式必须严格遵循以下JSON结构:{
// "contradiction": {
// "assessment": "是/否",
@@ -57,7 +57,7 @@ class OpenAi
// ],
// "unreasonable" => [
- // 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。
+ // 'system' => '你是一位资深的学术评审专家,负责严谨、客观地评估学术文章。
// 请针对问题提供客观、专业的评估,并给出简要的理由。请返回中文解释!返回格式必须严格遵循以下JSON结构:{
// "unreasonable": {
// "assessment": "是/否",
@@ -70,33 +70,33 @@ class OpenAi
//公微问题模版
protected $aWechatQuestion = [
- 'system_message' => '您是一位医学期刊的医学科普转化专家,严格遵循用户要求的结构、语言和专业约束,不编造数据,不夸大结论,擅长将复杂的医学研究论文转化为适合微信公众号推送的专业科普内容。请根据提供的医学论文信息,按照以下严格格式生成结构化[JSON结构]输出[中文]:',
+ 'system_message' => '您是一位医学期刊的医学科普转化专家,严格遵循用户要求的结构、语言和专业约束,不编造数据,不夸大结论,擅长将复杂的医学研究论文转化为适合微信公众号推送的专业科普内容。请根据提供的医学论文信息,按照以下严格格式生成结构化[JSON结构]输出[中文]:',
'public_message' => [
- "covered" => "[列出文章涵盖的学科及研究方法,总字数不超过100字,学科和方法之间用逗号分隔,例如:肿瘤学,分子生物学,基因组测序,生物信息学分析]",
+ "covered" => "[列出文章涵盖的学科及研究方法,总字数不超过100字,学科和方法之间用逗号分隔,例如:肿瘤学,分子生物学,基因组测序,生物信息学分析]",
"title_chinese" => "[将标题翻译成中文:内容需自然流畅、口语化、连贯性、学术性]"
// ,
- // "content" => "将内容翻译成中文,需自然流畅、口语化、连贯性、学术性,保留原文的章节结构和图表编号"
+ // "content" => "将内容翻译成中文,需自然流畅、口语化、连贯性、学术性,保留原文的章节结构和图表编号"
],
'default' => [
- "digest" => "[学术规范翻译并提炼摘要,强调逻辑性、科学术语准确性和表达严谨性,采用段落形式,总字数不超过500字]",
- "research_background" => "[提炼研究背景,采用连贯的段落形式,总字数超过200字]",
- "discussion_results" => "[针对文章简单总结讨论和结果,采用连贯的段落形式,总字数超过450字]",
- "research_method" => "[总结文章的研究方法,采用连贯的段落形式,总字数超过300字]",
- "prospect" => "[针对稿件内容进行展望撰写,采用连贯的段落形式]",
- "highlights" => "[总结归纳亮点,至少3点,每点用分号分隔]"
+ "digest" => "[学术规范翻译并提炼摘要,强调逻辑性、科学术语准确性和表达严谨性,采用段落形式,注意内容不要和文章内容有严重重复,总字数不超过500字]",
+ "research_background" => "[提炼研究背景,采用连贯的段落形式,注意内容不要和文章内容有严重重复,总字数超过200字]",
+ "discussion_results" => "[针对文章简单总结讨论和结果,采用连贯的段落形式,注意内容不要和文章内容有严重重复,总字数超过450字]",
+ "research_method" => "[总结文章的研究方法,采用连贯的段落形式,注意内容不要和文章内容有严重重复,总字数超过300字]",
+ "prospect" => "[针对稿件内容进行展望撰写,注意内容不要和文章内容有严重重复,采用连贯的段落形式]",
+ "highlights" => "[总结归纳亮点,至少3点,每点用分号分隔]"
],
'review' => [
- "overview" => "按照学术规范翻译并提炼文章概述,整体内容应大于1200字,其中应包含文章背景(不少于400字),其他内容提炼更强调逻辑性、科学术语准确性和表达的严谨性,注意内容不要有严重重复,采用连贯的段落形式",
- "summary" => "针对文章结论生成一个简单总结,内容不要和文章概述重复,字数150以内",
+ "overview" => "按照学术规范翻译并提炼文章概述,整体内容应大于1200字,其中应包含文章背景(不少于400字),其他内容提炼更强调逻辑性、科学术语准确性和表达的严谨性,注意内容不要和文章内容有严重重复,采用连贯的段落形式",
+ "summary" => "针对文章结论生成一个简单总结,内容不要和文章概述重复,字数150以内",
]
];
//AI审稿提示词
protected $aReviewQuestion = [
- 'system_message' => '您是一位资深的医学期刊学术评审专家,请负责严谨、客观地评估学术文章。请根据提供的医学论文信息,按照以下严格格式生成结构化[JSON结构]输出[中文]:',
+ 'system_message' => '您是一位资深的医学期刊学术评审专家,请负责严谨、客观地评估学术文章。请根据提供的医学论文信息,按照以下严格格式生成结构化[JSON结构]输出[中文]:',
'default' => [
"journal_scope" => "结合标题和摘要以及期刊范围来判断文章是否符合目标期刊?",
- "attribute" => "内容是否有科学前沿性和创新性?参照维度[研究内容的原创性:论文中的研究内容是否与已有的研究重复?是否在同样的领域提出了类似的结论,但在方法或结果上有所创新?如果有,作者是否清楚地解释了如何与之前的研究不同,或者如何在原有基础上进行扩展或改进?如果是综述文章,汇总并综合最新的研究成果,尤其是近几年内的重要发现,展示领域内最新的进展成果。作者可以识别出未被充分讨论的问题或提出新的研究问题,而不是简单文献堆砌。文章中的图表创新能否将信息的清晰呈现,方便读者理解复杂研究问题。论文方法创新性评估要点:是否采用了新的实验模型或创新的实验设计,能有效解决当前研究中的难点或空白?是否有合理的对照组和多组实验设计,确保研究结果的可靠性?是否使用了当前前沿的技术(如高通量测序、CRISPR基因编辑等),提高了实验精度或数据分析能力?是否结合了跨学科的方法(如生物信息学、人工智能等)?是否应用了多种验证手段或统计方法,确保结果的可信度?是否通过细胞实验、动物模型等多重验证,确保实验结果的可靠性?结论与数据的创新性:研究结论是否提出了新观点或新见解?是否提供了新的实验数据或观察结果,能够突破当前的研究局限?例如,发现了新的生物标志物,或对已知生物通路的作用机制提供了全新的解释]",
+ "attribute" => "内容是否有科学前沿性和创新性?参照维度[研究内容的原创性:论文中的研究内容是否与已有的研究重复?是否在同样的领域提出了类似的结论,但在方法或结果上有所创新?如果有,作者是否清楚地解释了如何与之前的研究不同,或者如何在原有基础上进行扩展或改进?如果是综述文章,汇总并综合最新的研究成果,尤其是近几年内的重要发现,展示领域内最新的进展成果。作者可以识别出未被充分讨论的问题或提出新的研究问题,而不是简单文献堆砌。文章中的图表创新能否将信息的清晰呈现,方便读者理解复杂研究问题。论文方法创新性评估要点:是否采用了新的实验模型或创新的实验设计,能有效解决当前研究中的难点或空白?是否有合理的对照组和多组实验设计,确保研究结果的可靠性?是否使用了当前前沿的技术(如高通量测序、CRISPR基因编辑等),提高了实验精度或数据分析能力?是否结合了跨学科的方法(如生物信息学、人工智能等)?是否应用了多种验证手段或统计方法,确保结果的可信度?是否通过细胞实验、动物模型等多重验证,确保实验结果的可靠性?结论与数据的创新性:研究结论是否提出了新观点或新见解?是否提供了新的实验数据或观察结果,能够突破当前的研究局限?例如,发现了新的生物标志物,或对已知生物通路的作用机制提供了全新的解释]",
"contradiction" => "内容是否前后矛盾或存在逻辑不一致的问题?",
"unreasonable" => "内容是否有明显的不合理之处?",
"ethics" => "内容是否存在伦理号缺失或明显伦理问题?",
@@ -188,8 +188,8 @@ class OpenAi
if(empty($aSearch)){
return [];
}
- $sSysMessagePrompt = '你是一位专业的医学翻译专家,请将用户提供的内容准确、流畅地翻译成中文。翻译需自然流畅、口语化、连贯性、学术性,保留原文的专业术语和逻辑结构';
- $sUserPrompt = '"将以下内容翻译为中文,仅返回翻译结果,不要解释:\n {#content#}"';
+ $sSysMessagePrompt = '你是一位专业的医学翻译专家,请将用户提供的内容准确、流畅地翻译成中文。翻译需自然流畅、口语化、连贯性、学术性,保留原文的专业术语和逻辑结构';
+ $sUserPrompt = '"将以下内容翻译为中文,仅返回翻译结果,不要解释:\n {#content#}"';
$sUserPrompt = str_replace(array_keys($aSearch), array_values($aSearch), $sUserPrompt);
$aMessage = [
['role' => 'system', 'content' => $sSysMessagePrompt],
@@ -199,7 +199,7 @@ class OpenAi
$aMessage = [
'model' => empty($aSearch['model']) ? 'gpt-4.1' : $aSearch['model'],
'messages' => $aMessage,
- 'temperature' => 0.2,// 降低随机性(0-1,0为最确定)
+ 'temperature' => 0.2,// 降低随机性(0-1,0为最确定)
];
$aResult = json_decode($this->curlOpenAIStream($aMessage),true);
$sJsonData = empty($aResult['data']) ? '' : $aResult['data'];
@@ -283,7 +283,7 @@ class OpenAi
$data = [
'model' => $model,
'messages' => $aMessage,
- 'temperature' => 0.2,// 降低随机性(0-1,0为最确定)
+ 'temperature' => 0.2,// 降低随机性(0-1,0为最确定)
];
$this->curl = curl_init();
@@ -357,8 +357,13 @@ class OpenAi
$data = [
'model' => $model,
'messages' => $aMessage,
- 'temperature' => 0.2,// 降低随机性(0-1,0为最确定)
- 'stream' => true // 关键:启用流式传输,避免超时
+ // 'temperature' => 0.2,// 降低随机性(0-1,0为最确定)
+ 'temperature' => 0.6, // 中等随机性
+ // 'max_tokens' => 1000,
+ 'top_p' => 0.8,
+ 'frequency_penalty' => 0.3,
+ 'presence_penalty' => 0.2,
+ 'stream' => true // 关键:启用流式传输,避免超时
];
// Curl通用配置
@@ -380,10 +385,10 @@ class OpenAi
// === 5. 流式响应处理(核心避免超时) ===
$streamContent = ''; // 累积流式返回的内容
- // 回调函数:每收到一块数据就处理并保存,避免整段等待
+ // 回调函数:每收到一块数据就处理并保存,避免整段等待
curl_setopt($this->curl, CURLOPT_WRITEFUNCTION, function ($curl, $data) use (&$streamContent) {
$streamContent .= $data;
- return strlen($data); // 必须返回数据长度,否则CURL会中断
+ return strlen($data); // 必须返回数据长度,否则CURL会中断
});
//执行请求
@@ -397,11 +402,11 @@ class OpenAi
//错误处理
if (!empty($curlErrno)) {
- // 超时但已有部分数据:保存进度,下次从该块重试
+ // 超时但已有部分数据:保存进度,下次从该块重试
if ($curlErrno == CURLE_OPERATION_TIMEDOUT && !empty($streamContent)) {
return json_encode([
'status' => 3,
- 'msg' => "处理超时,已保存进度",
+ 'msg' => "处理超时,已保存进度",
]);
}
// 其他错误(如网络问题)
@@ -448,27 +453,34 @@ class OpenAi
}
//记录处理开始
$iNum = count($aMessage);
- $sRedisKey = 'ai_create_article_'.$iId;
- $this->oQueueRedis->recordQuestionProcessingStart($sRedisKey,$iNum);
- //定义空数组
- $aChunkResult = $aFail = [];
- $batchId = uniqid();
- $iQueueCount1 = $iQueueCount2 = 0;
- foreach ($aMessage as $key => $value) {
- $aParam['messages'] = $value;
- $aParam['chunkIndex'] = $key;
- $aParam['count_num'] = $iNum;
- // if($key%2 == 0){
- $aParam['key_name'] = 'queue_1_completed';
- Queue::push('app\api\job\createFieldForQueue@fire', $aParam, 'createFieldForQueue');
- // }else{
- // $aParam['url'] = $this->sAiUrl;
- // $aParam['key_name'] = 'queue_2_completed';
- // Queue::push('app\api\job\createFieldForQueue@fire', $aParam, 'createFieldForQueueBak');
- // }
-
+ $sRedisKey = 'queue_job:ai_create_article:'.$iId;
+ $result = $this->oQueueRedis->recordQuestionProcessingStart($sRedisKey,$iNum);
+ $result = empty($result) ? 0 : $result;
+ if($result == 1){
+ //定义空数组
+ foreach ($aMessage as $key => $value) {
+ $aParam['messages'] = $value;
+ $aParam['chunkIndex'] = $key;
+ $aParam['count_num'] = $iNum;
+ // if($key%2 == 0){
+ $aParam['key_name'] = 'queue_1_completed';
+ Queue::push('app\api\job\createFieldForQueue@fire', $aParam, 'createFieldForQueue');
+ // }else{
+ // $aParam['url'] = $this->sAiUrl;
+ // $aParam['key_name'] = 'queue_2_completed';
+ // Queue::push('app\api\job\createFieldForQueue@fire', $aParam, 'createFieldForQueueBak');
+ // }
+
+ }
+ return json_encode(['status' => 1, 'msg' => 'Content is being generated, please wait']);
}
- return json_encode(['status' => 1, 'msg' => 'Content is being generated, please wait']);
+ if($result == 2){
+ return json_encode(['status' => 3, 'msg' => 'The data has been generated, please proceed with the next steps']);
+ }
+ if($result == 3){
+ return json_encode(['status' => 4, 'msg' => 'Content is being generated, please wait']);
+ }
+ return json_encode(['status' => 5, 'msg' => 'Redis write failure']);
}
/**
* 微信公众号-生成内容队列形式
@@ -491,11 +503,11 @@ class OpenAi
$aResult = $this->curlOpenAIStream($aParam);
//更新处理进度
$iIndex = empty($aParam['chunkIndex']) ? 0 : $aParam['chunkIndex'];
- $sRedisKey = 'ai_create_article_'.$iId;
+ $sRedisKey = 'queue_job:ai_create_article:'.$iId;
$sKeyName = empty($aParam['key_name']) ? 'queue_1_completed' : $aParam['key_name'];
$iProgress = $this->oQueueRedis->updateQuestionProcessingProgress($sRedisKey,$sKeyName);
//保存内容
- $sRedisKey = 'ai_create_article_progress_'.$iId;
+ $sRedisKey = 'queue_job:ai_create_article_progress:'.$iId;
$this->oQueueRedis->saveChunkProgress($sRedisKey, $iIndex,$aResult);
//更新入库
@@ -572,7 +584,7 @@ class OpenAi
* 从文本中提取被```json```和```包裹的JSON内容并解析
* @param string $text 包含JSON代码块的文本
* @param bool $assoc 是否返回关联数组(默认true)
- * @return array|object 解析后的JSON数据,失败时返回null
+ * @return array|object 解析后的JSON数据,失败时返回null
*/
public function extractAndParse($text, $assoc = true){
diff --git a/application/common/QueueJob.php b/application/common/QueueJob.php
index d4b194f..c83e558 100644
--- a/application/common/QueueJob.php
+++ b/application/common/QueueJob.php
@@ -2,8 +2,9 @@
namespace app\common;
use think\Db;
+use think\Cache;
use app\common\QueueRedis;
-
+use app\common\traits\QueueDbHATrait;
class QueueJob
{
// 必填参数
@@ -11,116 +12,12 @@ class QueueJob
private $logPath;
private $QueueRedis;
private $maxRetries = 2;
- private $logBuffer = [];
- private $lastLogTime = 0;
- private $logMaxSize = 1048576; // 1MB (1*1024*1024)
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
-
+ // 引入高可用数据库管理 trait
+ use QueueDbHATrait;
public function __construct()
{
$this->QueueRedis = QueueRedis::getInstance();
- $this->lastLogTime = time();
- }
-
- /**
- * 记录任务开始
- * @param array $aParam
- * @return int 日志ID,失败返回0
- */
- public function addLog($aParam = [])
- {
-
- $sJobId = empty($aParam['job_id']) ? uniqid() : $aParam['job_id'];
- return $sJobId;
-
- // 数据过滤(只保留必填字段)
- $aInsert = [];
- foreach ($this->aField as $field) {
- if (isset($aParam[$field])) {
- $aInsert[$field] = $aParam[$field];
- }
- }
-
- // 补充默认值
- if (!isset($aInsert['create_time'])) {
- $aInsert['create_time'] = time();
- }
- if (!isset($aInsert['update_time'])) {
- $aInsert['update_time'] = $aInsert['create_time'];
- }
-
- try {
- return Db::name('wechat_queue_logs')->insertGetId($aInsert);
- } catch (\Exception $e) {
- $this->log("添加任务日志失败: " . $e->getMessage() . " | 参数: " . json_encode($aInsert, self::JSON_OPTIONS));
- return 0;
- }
- }
-
- /**
- * 记录任务状态更新
- * @param array $aParam
- * @return bool
- */
- public function updateLog($aParam = [])
- {
- return true;
- $iLogId = empty($aParam['log_id']) ? 0 : $aParam['log_id'];
- if (empty($iLogId)) {
- $this->log("更新日志失败: 缺少log_id");
- return false;
- }
-
- // 数据过滤
- $aUpdate = [];
- foreach ($this->aField as $field) {
- if (isset($aParam[$field])) {
- $aUpdate[$field] = $aParam[$field];
- }
- }
-
- // 强制更新时间
- $aUpdate['update_time'] = time();
-
- try {
- return Db::name('wechat_queue_logs')
- ->where('log_id', $iLogId)
- ->limit(1)
- ->update($aUpdate) > 0;
- } catch (\Exception $e) {
- $this->log("更新任务日志失败 [ID:{$iLogId}]: " . $e->getMessage());
- return false;
- }
- }
-
- /**
- * 设置日志路径并确保目录存在
- * @param string $logPath
- * @throws \RuntimeException
- */
- public function ensureLogDirExists($logPath = '')
- {
- if (empty($logPath)) {
- $error = "日志路径不能为空";
- $this->log($error);
- return $error;
- }
-
- $this->logPath = $logPath;
- $logDir = dirname($this->logPath);
-
- // 检查并创建目录(处理权限问题)
- if (!is_dir($logDir)) {
- $oldUmask = umask(0);
- $created = mkdir($logDir, 0755, true);
- umask($oldUmask);
-
- if (!$created || !is_dir($logDir)) {
- $error = "无法创建日志目录: {$logDir} (权限不足)";
- $this->log($error);
- return $error;
- }
- }
}
/**
@@ -129,127 +26,8 @@ class QueueJob
*/
public function log($message)
{
- // 防止缓冲区溢出
- if (count($this->logBuffer) >= 1000) {
- $this->flushLog();
- }
-
- $time = date('H:i:s');
- $this->logBuffer[] = "[$time] $message\n";
-
- // 缓冲区满或超时则刷新
- if (count($this->logBuffer) >= 50 || time() - $this->lastLogTime > 10) {
- $this->flushLog();
- }
- }
-
- /**
- * 刷新日志缓冲区到文件
- */
- public function flushLog()
- {
- if (empty($this->logBuffer)) {
- return;
- }
-
- // 检查日志路径是否设置
- if (empty($this->logPath)) {
- $this->logBuffer = [];
- return;
- }
-
- // 检查文件大小并处理
- $this->checkAndTruncateLog();
-
- $fp = fopen($this->logPath, 'a');
- if ($fp === false) {
- // 紧急写入失败日志(避免递归)
- $errorMsg = "[" . date('H:i:s') . "] 错误: 无法打开日志文件 {$this->logPath}\n";
- error_log($errorMsg); // 写入系统日志
- $this->logBuffer = [];
- return;
- }
-
- try {
- // 尝试获取文件锁
- if (flock($fp, LOCK_EX)) {
- fwrite($fp, implode('', $this->logBuffer));
- flock($fp, LOCK_UN);
- } else {
- // 无锁情况下尝试写入
- fwrite($fp, implode('', $this->logBuffer));
- $this->logBuffer[] = "[" . date('H:i:s') . "] 警告: 日志写入未加锁,可能存在冲突风险\n";
- }
- } catch (\Exception $e) {
- $errorMsg = "[" . date('H:i:s') . "] 错误: 写入日志失败: {$e->getMessage()}\n";
- fwrite($fp, $errorMsg);
- } finally {
- fclose($fp);
- }
-
- $this->logBuffer = [];
- $this->lastLogTime = time();
- }
-
- /**
- * 检查日志文件大小,超过限制则清空
- */
- public function checkAndTruncateLog()
- {
- if (empty($this->logPath) || !file_exists($this->logPath)) {
- return;
- }
-
- // 清除文件状态缓存并获取大小
- clearstatcache(true, $this->logPath);
- $fileSize = @filesize($this->logPath);
-
- if ($fileSize === false) {
- $this->log("错误: 无法获取日志文件大小 {$this->logPath}");
- return;
- }
-
- if ($fileSize >= $this->logMaxSize) {
- $fp = fopen($this->logPath, 'w');
- if ($fp === false) {
- $this->log("错误: 无法清空日志文件 {$this->logPath}");
- return;
- }
-
- try {
- if (flock($fp, LOCK_EX)) {
- // 二次检查文件大小(避免竞态条件)
- clearstatcache(true, $this->logPath);
- if (filesize($this->logPath) >= $this->logMaxSize) {
- ftruncate($fp, 0);
- $this->log("日志文件超过" . $this->formatFileSize($this->logMaxSize) . ",已清空");
- }
- flock($fp, LOCK_UN);
- }
- } catch (\Exception $e) {
- $this->log("错误: 清空日志文件失败: {$e->getMessage()}");
- } finally {
- fclose($fp);
- }
- }
- }
-
- /**
- * 格式化文件大小(字节转人类可读格式)
- * @param int $bytes
- * @return string
- */
- public function formatFileSize($bytes)
- {
- if ($bytes <= 0) {
- return '0 B';
- }
-
- $units = ['B', 'KB', 'MB', 'GB', 'TB'];
- $unitIndex = min(floor(log($bytes, 1024)), count($units) - 1);
- $size = $bytes / pow(1024, $unitIndex);
-
- return number_format($size, 2) . ' ' . $units[$unitIndex];
+ $log = date("Y-m-d H:i:s") . " " . $message . "\n";
+ echo $log;
}
/**
@@ -278,25 +56,17 @@ class QueueJob
/**
* 处理可重试异常
* @param \Exception $e
- * @param int $iLogId
* @param string $sRedisKey
+ * @param string $sRedisValue
* @param \think\queue\Job $job
*/
- public function handleRetryableException($e, $iLogId, $sRedisKey, $job)
+ public function handleRetryableException($e,$sRedisKey,$sRedisValue,$job)
{
$sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}");
- if ($iLogId > 0) {
- $this->updateLog([
- 'log_id' => $iLogId,
- 'status' => 2,
- 'error' => $sMsg . ':' . $sTrace,
- ]);
- }
-
- $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600);
+ $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
@@ -312,34 +82,29 @@ class QueueJob
/**
* 处理不可重试异常
* @param \Exception $e
- * @param int $iLogId
* @param string $sRedisKey
+ * @param string $sRedisValue
* @param \think\queue\Job $job
*/
- public function handleNonRetryableException($e, $iLogId, $sRedisKey, $job)
+ public function handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job)
{
$sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}");
-
- if ($iLogId > 0) {
- $this->updateLog([
- 'log_id' => $iLogId,
- 'status' => 3, // 3:不可重试失败
- 'error' => $sMsg . ':' . $sTrace,
- ]);
- }
-
- $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600);
+ $this->QueueRedis->finishJob($sRedisKey, 'failed', 3600,$sRedisValue);
$this->log("不可重试错误,直接删除任务 | 执行日志:{$sMsg}");
$job->delete();
}
+
/**
- * 析构函数:确保最后日志被写入
+ * 数据库连接检查与重建(高可用版)
+ * 解决 MySQL server has gone away 等连接超时问题
+ * @param bool $force 是否强制检查(忽略缓存时间)
+ * @return bool 连接是否有效
*/
- public function __destruct()
+ public function checkDbConnection($force = false)
{
- $this->flushLog();
+ return $this->checkDbConnectionTrait();
}
}
\ No newline at end of file
diff --git a/application/common/QueueRedis.php b/application/common/QueueRedis.php
index c28faf7..1aaacbf 100644
--- a/application/common/QueueRedis.php
+++ b/application/common/QueueRedis.php
@@ -124,23 +124,38 @@ LUA; $redis = $this->connect();
}
}
- // 任务结束时的批量操作
-public function finishJob($sRedisKey, $status, $expire)
-{
- try {
- $redis = $this->connect();
- // 使用Lua脚本确保原子性
- $script = <<eval($script, [$sRedisKey, $status, $expire], 1) === 1;
- } catch (\Exception $e) {
- Log::error("Redis完成任务失败: {$e->getMessage()}");
- return false;
- }
-}
+ /**
+ * 标记任务状态并释放锁(带所有权验证)
+ * @param string $sRedisKey 任务锁的键名
+ * @param string $status 状态(completed/failed)
+ * @param int $expire 状态的过期时间(秒)
+ * @param string $sRedisValue 锁的值(用于验证所有权)
+ * @return bool 是否执行成功
+ */
+ public function finishJob($sRedisKey, $status, $expire, $sRedisValue){
+ try {
+ $redis = $this->connect();
+ // Lua 脚本:先验证锁所有权,再设置状态并删除锁
+ $script = <<eval($script, [$sRedisKey, $sRedisValue, $status, $expire], 1);
+ return $result === 1;
+ } catch (\Exception $e) {
+ Log::error("Redis完成任务失败: {$e->getMessage()} | 键: {$sRedisKey}");
+ return false;
+ }
+ }
// 记录处理进度
public function recordProcessingStart($key, $totalQuestions)
{
@@ -163,6 +178,14 @@ LUA;
{
try {
$redis = $this->connect();
+ //判断是否执行
+ $sStatus = $redis->hGet($key, 'status');
+ if (!empty($sStatus) && $sStatus == 'completed') {
+ return 2;
+ }
+ if (!empty($sStatus) && $sStatus == 'processing') {
+ return 3;
+ }
$redis->hMSet($key, [
'status' => 'processing',
'total' => $totalQuestions,
@@ -172,9 +195,9 @@ LUA;
'queue_2_completed' => 0
]);
$redis->expire($key, 10800); // 6小时过期
- return true;
+ return 1;
} catch (\Exception $e) {
- return false;
+ return 4;
}
}
// 多问题按条件拆分成两个队列更新日志记录
diff --git a/application/common/traits/QueueDbHATrait.php b/application/common/traits/QueueDbHATrait.php
new file mode 100644
index 0000000..c7aff68
--- /dev/null
+++ b/application/common/traits/QueueDbHATrait.php
@@ -0,0 +1,266 @@
+ 20, // 缩短检查间隔,提升连接有效性
+ 'max_attempts' => 4, // 增加重试次数,应对连接波动
+ 'base_wait' => 1, // 基础等待时间(秒)
+ 'reconnect_threshold' => 3, // 连续失败告警阈值
+ 'fatal_error_codes' => [2006, 2013, 1053], // 致命错误码(含服务中断)
+ ];
+
+ // 进程内状态记录(避免跨进程干扰)
+ private $consecutiveFailures = [];
+ private $lastCheckTime = [];
+
+ /**
+ * 队列任务执行前的连接检查(核心入口)
+ * @param bool $force 是否强制检查
+ * @return bool 连接是否可用
+ */
+ public function checkDbConnectionTrait($force = false)
+ {
+ $pid = getmypid();
+ $this->initConsecutiveFailures($pid);
+
+ // 非强制检查且未到间隔,直接返回
+ if (!$force && isset($this->lastCheckTime[$pid])
+ && (time() - $this->lastCheckTime[$pid] < $this->dbConfig['check_interval'])) {
+ return true;
+ }
+
+ $attempt = 0;
+ $maxAttempts = $this->dbConfig['max_attempts'];
+ $baseWait = $this->dbConfig['base_wait'];
+
+ while ($attempt < $maxAttempts) {
+ try {
+ // 执行轻量查询验证(使用框架Db方法,确保与业务代码一致)
+ $result = $this->safeQuery('SELECT 1 FROM DUAL', 2);
+ if ($this->isValidResult($result)) {
+ $this->resetConsecutiveFailures($pid);
+ $this->lastCheckTime[$pid] = time();
+ $this->log("进程[{$pid}]数据库连接有效", 'info');
+ return true;
+ }
+ throw new Exception("查询结果无效");
+ } catch (PDOException $e) {
+ // 致命错误加速重试
+ $sCode = empty($e->getCode()) ? 0 : $e->getCode();
+ $sMsg = empty($e->getMessage()) ? '' : $e->getMessage();
+ if (in_array($sCode, $this->dbConfig['fatal_error_codes'])) {
+ $this->log("进程[{$pid}]致命错误({$sCode}):{$sMsg}", 'error');
+ $attempt = $maxAttempts - 1;
+ }
+ $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait);
+ } catch (Exception $e) {
+ $this->handleConnectionError($e, $pid, $attempt, $maxAttempts, $baseWait);
+ } finally {
+ $attempt++;
+ }
+ }
+
+ // 达到最大重试次数
+ $this->incrementConsecutiveFailures($pid);
+ $this->log("进程[{$pid}]连接异常,已达最大重试次数({$maxAttempts})", 'error');
+ return false;
+ }
+
+ /**
+ * 处理连接错误(确保后续能正常使用Db::insert())
+ */
+ private function handleConnectionError($e, $pid, &$attempt, $maxAttempts, $baseWait)
+ {
+ $errorMsg = empty($e->getMessage()) ? '未知错误' : $e->getMessage();
+ $errorCode = empty($e->getCode()) ? 0 : $e->getCode();
+ $this->log("进程[{$pid}]连接失败(尝试{$attempt}/{$maxAttempts}):{$errorMsg}(码:{$errorCode})", 'warning');
+
+ // 强制清理当前进程的连接缓存(关键:确保重建的连接能被Db::insert()使用)
+ $this->cleanupConnections();
+
+ // 最后一次尝试无需等待
+ if ($attempt + 1 >= $maxAttempts) {
+ return;
+ }
+
+ // 差异化等待策略
+ $waitTime = $this->calculateWaitTime($errorMsg, $attempt, $baseWait);
+ $this->log("进程[{$pid}]将在{$waitTime}秒后重试", 'info');
+ $this->safeSleep($waitTime);
+
+ // 重建连接并验证(使用框架方法,确保与业务代码兼容)
+ try {
+ // 强制重建框架连接,确保Db::insert()能使用新连接
+ Db::connect(config('database'), true);
+ $result = $this->safeQuery('SELECT 1 FROM DUAL', 2);
+ if ($this->isValidResult($result)) {
+ $this->resetConsecutiveFailures($pid);
+ $this->lastCheckTime[$pid] = time();
+ $this->log("进程[{$pid}]连接已重建(尝试{$attempt}/{$maxAttempts})", 'info');
+ $attempt = $maxAttempts; // 退出循环
+ } else {
+ throw new Exception("重建连接后查询无效");
+ }
+ } catch (Exception $e2) {
+ $sMsg2 = empty($e2->getMessage()) ? '' : $e2->getMessage();
+ $this->log("进程[{$pid}]重连失败:{$sMsg2}", 'error');
+ }
+ }
+
+ /**
+ * 安全执行查询(兼容框架Db方法,带超时控制)
+ */
+ private function safeQuery($sql, $timeout)
+ {
+ $start = microtime(true);
+ // 使用框架Db::query(),确保与业务中Db::insert()使用相同的连接机制
+ $result = Db::query($sql);
+
+ // 代码层面控制超时,不依赖database.php配置
+ if (microtime(true) - $start > $timeout) {
+ throw new Exception("查询超时({$timeout}秒)");
+ }
+ return $result;
+ }
+
+ /**
+ * 清理连接资源(仅影响当前进程,不干扰现有系统)
+ */
+ private function cleanupConnections()
+ {
+ // 关闭当前进程的框架连接(不影响其他进程)
+ Db::close();
+ // 清除Db类静态缓存(仅当前进程)
+ $this->clearDbInstanceCache();
+ // 保留系统缓存,避免影响现有业务
+ }
+
+ /**
+ * 清除Db类实例缓存(确保新连接能被正确创建)
+ */
+ private function clearDbInstanceCache()
+ {
+ static $reflection = null;
+ static $instanceProp = null;
+
+ if (!$reflection) {
+ try {
+ $reflection = new \ReflectionClass('\think\Db');
+ $instanceProp = $reflection->getProperty('instance');
+ $instanceProp->setAccessible(true);
+ } catch (\ReflectionException $e) {
+ $this->log("反射初始化失败:{$e->getMessage()}", 'error');
+ return;
+ }
+ }
+
+ try {
+ // 仅清空当前进程的Db实例缓存,不影响其他进程(如Web请求)
+ $instanceProp->setValue(null, []);
+ } catch (\ReflectionException $e) {
+ $this->log("清除Db缓存失败:{$e->getMessage()}", 'error');
+ }
+ }
+
+ /**
+ * 计算等待时间(针对队列优化)
+ */
+ private function calculateWaitTime($errorMsg, $attempt, $baseWait)
+ {
+ $isGoneAway = stripos($errorMsg, 'MySQL server has gone away') !== false;
+ $isTimeout = stripos($errorMsg, 'timeout') !== false;
+
+ if ($isGoneAway) {
+ return $baseWait * pow(2, $attempt); // 致命错误:1→2→4秒
+ } elseif ($isTimeout) {
+ return $baseWait * pow(1.5, $attempt); // 超时错误:1→1.5→2.25秒
+ } else {
+ return $baseWait * pow(1.2, $attempt); // 普通错误:1→1.2→1.44秒
+ }
+ }
+
+ /**
+ * 验证查询结果有效性
+ */
+ private function isValidResult($result)
+ {
+ return is_array($result) && !empty($result)
+ && isset(current($result)['1']) && current($result)['1'] == 1;
+ }
+
+ /**
+ * 连续失败计数管理
+ */
+ private function initConsecutiveFailures($pid)
+ {
+ if (!isset($this->consecutiveFailures[$pid])) {
+ $this->consecutiveFailures[$pid] = 0;
+ }
+ }
+
+ private function incrementConsecutiveFailures($pid)
+ {
+ $this->consecutiveFailures[$pid]++;
+ if ($this->consecutiveFailures[$pid] >= $this->dbConfig['reconnect_threshold']) {
+ $this->alert("进程[{$pid}]连续连接失败{$this->consecutiveFailures[$pid]}次,可能存在数据库隐患");
+ }
+ }
+
+ private function resetConsecutiveFailures($pid)
+ {
+ $this->consecutiveFailures[$pid] = 0;
+ }
+
+ /**
+ * 日志记录(仅输出到队列控制台,不干扰系统日志)
+ */
+ private function log($message, $level = 'info')
+ {
+ $logTime = date('Y-m-d H:i:s');
+ $content = "[{$logTime}] [{$level}] {$message}";
+ // 仅在队列Worker控制台输出,不写入系统日志文件
+ echo $content . "\n";
+ }
+
+ /**
+ * 告警通知(独立日志文件,不干扰现有系统)
+ */
+ private function alert($message)
+ {
+ $alertFile = RUNTIME_PATH . "log/queue_db_alert_" . date('Ymd') . ".log";
+ file_put_contents($alertFile, "[".date('Y-m-d H:i:s')."] ALERT: {$message}\n", FILE_APPEND | LOCK_EX);
+ }
+
+ /**
+ * 安全睡眠(支持Worker正常终止)
+ */
+ private function safeSleep($seconds)
+ {
+ $interval = 1;
+ while ($seconds > 0) {
+ if (connection_aborted() || $this->isWorkerStopped()) {
+ throw new Exception("队列Worker已终止,中断睡眠");
+ }
+ $sleep = min($interval, $seconds);
+ sleep($sleep);
+ $seconds -= $sleep;
+ }
+ }
+
+ /**
+ * 检测Worker是否已停止(兼容TP5.0机制)
+ */
+ private function isWorkerStopped()
+ {
+ $stopFile = RUNTIME_PATH . 'queue/stop_worker';
+ return file_exists($stopFile);
+ }
+}
\ No newline at end of file