Files
tougao/application/api/job/uploadMaterialStep.php
2025-08-06 17:27:26 +08:00

105 lines
4.1 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\api\job;
use think\queue\Job;
use app\common\Material;
use app\common\QueueJob;
use app\common\QueueRedis;
class uploadMaterialStep
{
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
}
public function fire(Job $job, $data)
{
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
// 检查数据库连接
if (!$this->oQueueJob->checkDbConnection(true)) {
$this->oQueueJob->log("数据库连接失败无法执行任务10秒后重试");
$job->release(10);
return;
}
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败10秒后重试");
$job->release(10);
return;
}
$iArticleId = empty($data['article_id']) ? 0 : $data['article_id'];
if(empty($iArticleId)){
$iArticleId = empty($data['journal_id']) ? 0 : $data['journal_id'];
}
$sChunkIndex = empty($data['chunkIndex']) ? 0 : $data['chunkIndex'];
if (empty($iArticleId)) {
$this->oQueueJob->log("无效的article_id/journal_id删除任务");
$job->delete();
return;
}
$sClassName = get_class($this);
$sRedisKey = "queue_job:{$sClassName}:{$iArticleId}:{$sChunkIndex}";
$sRedisValue = uniqid() . '_' . getmypid();
$lockExpire = $this->lockExpire;
$isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire);
if (!$isLocked) {
$jobStatus = $this->QueueRedis->getJobStatus($sRedisKey);
if (in_array($jobStatus, ['completed', 'failed'])) {
$this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}");
$job->delete();
} else {
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->oQueueJob->log("超过最大重试次数,停止重试");
$job->delete();
} else {
$lockTtl = $this->QueueRedis->getLockTtl($sRedisKey);
$delay = $lockTtl > 0 ? $lockTtl + 5 : 30;
$this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})");
$job->release($delay);
}
}
return;
}
try {
// 执行核心任务前再次检查连接
$result = $this->oQueueJob->checkDbConnection();
if (!$result) {
throw new \RuntimeException("数据库连接异常,无法执行核心任务");
}
$oMaterial = new Material;
$aResult = json_decode($oMaterial->uploadMaterialStep($data), true);
$sMsg = empty($aResult['msg']) ? '素材上传失败' : $aResult['msg'];
//更新完成标识
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
$job->delete();
$this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
} catch (\RuntimeException $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job);
} catch (\LogicException $e) {
$this->oQueueJob->handleNonRetryableException($e, $sRedisKey,$sRedisValue,$job);
} catch (\Exception $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey,$sRedisValue,$job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "");
gc_collect_cycles();
}
}
}