job修改
This commit is contained in:
@@ -11,21 +11,14 @@ class RelatedArticle
|
||||
private $oQueueJob;
|
||||
private $QueueRedis;
|
||||
private $maxRetries = 2;
|
||||
private $logBuffer = [];
|
||||
private $lastLogTime = 0;
|
||||
private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
|
||||
private $lockExpire = 1800;
|
||||
private $completedExprie = 3600;
|
||||
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$this->logPath = ROOT_PATH . 'public/queue_log/RelatedArticle_' . date('Ymd') . '.log';
|
||||
$this->oQueueJob = new QueueJob;
|
||||
$this->QueueRedis = QueueRedis::getInstance();
|
||||
$this->lastLogTime = time();
|
||||
// 确保日志目录存在
|
||||
$this->oQueueJob->ensureLogDirExists($this->logPath);
|
||||
}
|
||||
|
||||
public function fire(Job $job, $data)
|
||||
@@ -33,11 +26,17 @@ class RelatedArticle
|
||||
$startTime = microtime(true);
|
||||
$this->oQueueJob->log("-----------队列任务开始-----------");
|
||||
|
||||
// 检查数据库连接
|
||||
if (!$this->oQueueJob->checkDbConnection(true)) {
|
||||
$this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
|
||||
$job->release(10);
|
||||
return;
|
||||
}
|
||||
|
||||
// 检查Redis连接状态
|
||||
if (!$this->QueueRedis->getConnectionStatus()) {
|
||||
$this->oQueueJob->log("Redis连接失败,10秒后重试");
|
||||
$job->release(10);
|
||||
$this->oQueueJob->flushLog();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -72,29 +71,16 @@ class RelatedArticle
|
||||
$job->release($delay);
|
||||
}
|
||||
}
|
||||
$this->oQueueJob->flushLog();
|
||||
return;
|
||||
}
|
||||
|
||||
$aParam = [
|
||||
'job_id' => $sRedisKey,
|
||||
'job_class' => $sClassName,
|
||||
'status' => 0,
|
||||
'create_time' => time(),
|
||||
'params' => json_encode($data, self::JSON_OPTIONS)
|
||||
];
|
||||
|
||||
$iLogId = $this->oQueueJob->addLog($aParam);
|
||||
if (!$iLogId) {
|
||||
$this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
|
||||
$this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
|
||||
$job->delete();
|
||||
$this->oQueueJob->flushLog();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
// 执行核心任务前再次检查连接
|
||||
$result = $this->oQueueJob->checkDbConnection();
|
||||
if (!$result) {
|
||||
throw new \RuntimeException("数据库连接异常,无法执行核心任务");
|
||||
}
|
||||
|
||||
//查询文章所关联的文章
|
||||
$oJournalArticle = new JournalArticle;
|
||||
$aResult = json_decode(JournalArticle::get($data),true);
|
||||
@@ -102,27 +88,19 @@ class RelatedArticle
|
||||
$sMsg = empty($aResult['msg']) ? '获取相关文章信息失败' : $aResult['msg'];
|
||||
|
||||
//更新日志
|
||||
$this->oQueueJob->updateLog([
|
||||
'log_id' => $iLogId,
|
||||
'status' => 1,
|
||||
'update_time' => time(),
|
||||
'error' => $sMsg
|
||||
]);
|
||||
|
||||
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
|
||||
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
|
||||
$job->delete();
|
||||
$this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
|
||||
$this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
|
||||
|
||||
} catch (\RuntimeException $e) {
|
||||
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
|
||||
$this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
|
||||
} catch (\LogicException $e) {
|
||||
$this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
|
||||
$this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisValue,$job);
|
||||
} catch (\Exception $e) {
|
||||
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
|
||||
$this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisValue,$job);
|
||||
} finally {
|
||||
$executionTime = microtime(true) - $startTime;
|
||||
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
|
||||
$this->oQueueJob->flushLog();
|
||||
gc_collect_cycles();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user