Files
tougao/application/api/job/WechatDraft.php
2025-07-22 16:38:40 +08:00

128 lines
4.8 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\api\job;
use think\queue\Job;
use app\common\QueueJob;
use app\common\QueueRedis;
use app\api\controller\Aiarticle;
class WechatDraft
{
private $logPath;
private $oQueueJob;
private $QueueRedis;
private $maxRetries = 2;
private $logBuffer = [];
private $lastLogTime = 0;
private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
private $lockExpire = 1800;
private $completedExprie = 3600;
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
$this->logPath = ROOT_PATH . 'public/queue_log/WechatDraft_' . date('Ymd') . '.log';
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
$this->lastLogTime = time();
// 确保日志目录存在
$this->oQueueJob->ensureLogDirExists($this->logPath);
}
public function fire(Job $job, $data)
{
$startTime = microtime(true);
$this->oQueueJob->log("-----------队列任务开始-----------");
// 检查Redis连接状态
if (!$this->QueueRedis->getConnectionStatus()) {
$this->oQueueJob->log("Redis连接失败10秒后重试");
$job->release(10);
$this->oQueueJob->flushLog();
return;
}
// 获取文章ID
$iArticleId = empty($data['article_id']) ? 0 : $data['article_id'];
if (empty($iArticleId)) {
$this->oQueueJob->log("无效的article_id删除任务");
$job->delete();
return;
}
$sClassName = get_class($this);
$sRedisKey = "queue_job:{$sClassName}:{$iArticleId}";
$sRedisValue = uniqid() . '_' . getmypid();
$lockExpire = $this->lockExpire;
$isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $lockExpire);
if (!$isLocked) {
$jobStatus = $this->QueueRedis->getJobStatus($sRedisKey);
if (in_array($jobStatus, ['completed', 'failed'])) {
$this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}");
$job->delete();
} else {
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->oQueueJob->log("超过最大重试次数,停止重试");
$job->delete();
} else {
$lockTtl = $this->QueueRedis->getLockTtl($sRedisKey);
$delay = $lockTtl > 0 ? $lockTtl + 5 : 30;
$this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/{$this->maxRetries})");
$job->release($delay);
}
}
$this->oQueueJob->flushLog();
return;
}
$aParam = [
'job_id' => $sRedisKey,
'job_class' => $sClassName,
'status' => 0,
'create_time' => time(),
'params' => json_encode($data, self::JSON_OPTIONS)
];
$iLogId = $this->oQueueJob->addLog($aParam);
if (!$iLogId) {
$this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
$this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
$job->delete();
$this->oQueueJob->flushLog();
return;
}
try {
//上传草稿箱
$oAiarticle = new Aiarticle;
$aResult = json_decode($oAiarticle->syncWechat($data),true);
$sMsg = empty($aResult['msg']) ? '上传草稿箱失败' : $aResult['msg'];
//更新日志
$this->oQueueJob->updateLog([
'log_id' => $iLogId,
'status' => 1,
'update_time' => time(),
'error' => $sMsg
]);
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
$job->delete();
$this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId}");
} catch (\RuntimeException $e) {
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
} catch (\LogicException $e) {
$this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
} catch (\Exception $e) {
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
} finally {
$executionTime = microtime(true) - $startTime;
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "");
$this->oQueueJob->flushLog();
gc_collect_cycles();
}
}
}