job修改
This commit is contained in:
@@ -11,21 +11,14 @@ class WechatDraftPublish
|
|||||||
private $oQueueJob;
|
private $oQueueJob;
|
||||||
private $QueueRedis;
|
private $QueueRedis;
|
||||||
private $maxRetries = 2;
|
private $maxRetries = 2;
|
||||||
private $logBuffer = [];
|
|
||||||
private $lastLogTime = 0;
|
|
||||||
private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
|
|
||||||
private $lockExpire = 1800;
|
private $lockExpire = 1800;
|
||||||
private $completedExprie = 3600;
|
private $completedExprie = 3600;
|
||||||
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
|
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
$this->logPath = ROOT_PATH . 'public/queue_log/WechatDraftPublish_' . date('Ymd') . '.log';
|
|
||||||
$this->oQueueJob = new QueueJob;
|
$this->oQueueJob = new QueueJob;
|
||||||
$this->QueueRedis = QueueRedis::getInstance();
|
$this->QueueRedis = QueueRedis::getInstance();
|
||||||
$this->lastLogTime = time();
|
|
||||||
// 确保日志目录存在
|
|
||||||
$this->oQueueJob->ensureLogDirExists($this->logPath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function fire(Job $job, $data)
|
public function fire(Job $job, $data)
|
||||||
@@ -33,11 +26,17 @@ class WechatDraftPublish
|
|||||||
$startTime = microtime(true);
|
$startTime = microtime(true);
|
||||||
$this->oQueueJob->log("-----------队列任务开始-----------");
|
$this->oQueueJob->log("-----------队列任务开始-----------");
|
||||||
|
|
||||||
|
// 检查数据库连接
|
||||||
|
if (!$this->oQueueJob->checkDbConnection(true)) {
|
||||||
|
$this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
|
||||||
|
$job->release(10);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 检查Redis连接状态
|
// 检查Redis连接状态
|
||||||
if (!$this->QueueRedis->getConnectionStatus()) {
|
if (!$this->QueueRedis->getConnectionStatus()) {
|
||||||
$this->oQueueJob->log("Redis连接失败,10秒后重试");
|
$this->oQueueJob->log("Redis连接失败,10秒后重试");
|
||||||
$job->release(10);
|
$job->release(10);
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,56 +71,35 @@ class WechatDraftPublish
|
|||||||
$job->release($delay);
|
$job->release($delay);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$aParam = [
|
|
||||||
'job_id' => $sRedisKey,
|
|
||||||
'job_class' => $sClassName,
|
|
||||||
'status' => 0,
|
|
||||||
'create_time' => time(),
|
|
||||||
'params' => json_encode($data, self::JSON_OPTIONS)
|
|
||||||
];
|
|
||||||
|
|
||||||
$iLogId = $this->oQueueJob->addLog($aParam);
|
|
||||||
if (!$iLogId) {
|
|
||||||
$this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
|
|
||||||
$this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
|
|
||||||
$job->delete();
|
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
// 执行核心任务前再次检查连接
|
||||||
|
$result = $this->oQueueJob->checkDbConnection();
|
||||||
|
if (!$result) {
|
||||||
|
throw new \RuntimeException("数据库连接异常,无法执行核心任务");
|
||||||
|
}
|
||||||
|
|
||||||
//发布草稿箱
|
//发布草稿箱
|
||||||
$oAiarticle = new Aiarticle;
|
$oAiarticle = new Aiarticle;
|
||||||
$aResult = json_decode($oAiarticle->publishDraft($data),true);
|
$aResult = json_decode($oAiarticle->publishDraft($data),true);
|
||||||
$sMsg = empty($aResult['msg']) ? '草稿箱发布失败' : $aResult['msg'];
|
$sMsg = empty($aResult['msg']) ? '草稿箱发布失败' : $aResult['msg'];
|
||||||
|
|
||||||
//更新任务状态
|
//更新日志
|
||||||
$this->oQueueJob->updateLog([
|
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
|
||||||
'log_id' => $iLogId,
|
|
||||||
'status' => 1,
|
|
||||||
'update_time' => time(),
|
|
||||||
'error' => $sMsg
|
|
||||||
]);
|
|
||||||
|
|
||||||
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
|
|
||||||
$job->delete();
|
$job->delete();
|
||||||
$this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
|
$this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
|
||||||
|
|
||||||
} catch (\RuntimeException $e) {
|
} catch (\RuntimeException $e) {
|
||||||
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
|
$this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
|
||||||
} catch (\LogicException $e) {
|
} catch (\LogicException $e) {
|
||||||
$this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
|
$this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job);
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
|
$this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
|
||||||
} finally {
|
} finally {
|
||||||
$executionTime = microtime(true) - $startTime;
|
$executionTime = microtime(true) - $startTime;
|
||||||
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
|
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
gc_collect_cycles();
|
gc_collect_cycles();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,21 +11,14 @@ class WechatMaterial
|
|||||||
private $oQueueJob;
|
private $oQueueJob;
|
||||||
private $QueueRedis;
|
private $QueueRedis;
|
||||||
private $maxRetries = 2;
|
private $maxRetries = 2;
|
||||||
private $logBuffer = [];
|
|
||||||
private $lastLogTime = 0;
|
|
||||||
private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
|
|
||||||
private $lockExpire = 1800;
|
private $lockExpire = 1800;
|
||||||
private $completedExprie = 3600;
|
private $completedExprie = 3600;
|
||||||
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
|
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
$this->logPath = ROOT_PATH . 'public/queue_log/WechatMaterial_' . date('Ymd') . '.log';
|
|
||||||
$this->oQueueJob = new QueueJob;
|
$this->oQueueJob = new QueueJob;
|
||||||
$this->QueueRedis = QueueRedis::getInstance();
|
$this->QueueRedis = QueueRedis::getInstance();
|
||||||
$this->lastLogTime = time();
|
|
||||||
// 确保日志目录存在
|
|
||||||
$this->oQueueJob->ensureLogDirExists($this->logPath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function fire(Job $job, $data)
|
public function fire(Job $job, $data)
|
||||||
@@ -33,11 +26,17 @@ class WechatMaterial
|
|||||||
$startTime = microtime(true);
|
$startTime = microtime(true);
|
||||||
$this->oQueueJob->log("-----------队列任务开始-----------");
|
$this->oQueueJob->log("-----------队列任务开始-----------");
|
||||||
|
|
||||||
|
// 检查数据库连接
|
||||||
|
if (!$this->oQueueJob->checkDbConnection(true)) {
|
||||||
|
$this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
|
||||||
|
$job->release(10);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 检查Redis连接状态
|
// 检查Redis连接状态
|
||||||
if (!$this->QueueRedis->getConnectionStatus()) {
|
if (!$this->QueueRedis->getConnectionStatus()) {
|
||||||
$this->oQueueJob->log("Redis连接失败,10秒后重试");
|
$this->oQueueJob->log("Redis连接失败,10秒后重试");
|
||||||
$job->release(10);
|
$job->release(10);
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,56 +71,36 @@ class WechatMaterial
|
|||||||
$job->release($delay);
|
$job->release($delay);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$aParam = [
|
|
||||||
'job_id' => $sRedisKey,
|
|
||||||
'job_class' => $sClassName,
|
|
||||||
'status' => 0,
|
|
||||||
'create_time' => time(),
|
|
||||||
'params' => json_encode($data, self::JSON_OPTIONS)
|
|
||||||
];
|
|
||||||
|
|
||||||
$iLogId = $this->oQueueJob->addLog($aParam);
|
|
||||||
if (!$iLogId) {
|
|
||||||
$this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
|
|
||||||
$this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
|
|
||||||
$job->delete();
|
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
|
||||||
|
// 执行核心任务前再次检查连接
|
||||||
|
$result = $this->oQueueJob->checkDbConnection();
|
||||||
|
if (!$result) {
|
||||||
|
throw new \RuntimeException("数据库连接异常,无法执行核心任务");
|
||||||
|
}
|
||||||
|
|
||||||
//上传素材
|
//上传素材
|
||||||
$oAiarticle = new Aiarticle;
|
$oAiarticle = new Aiarticle;
|
||||||
$aResult = json_decode($oAiarticle->uploadMaterial($data),true);
|
$aResult = json_decode($oAiarticle->uploadMaterial($data),true);
|
||||||
$sMsg = empty($aResult['msg']) ? '上传素材失败' : $aResult['msg'];
|
$sMsg = empty($aResult['msg']) ? '上传素材失败' : $aResult['msg'];
|
||||||
|
|
||||||
//更新任务状态
|
//更新日志
|
||||||
$this->oQueueJob->updateLog([
|
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
|
||||||
'log_id' => $iLogId,
|
|
||||||
'status' => 1,
|
|
||||||
'update_time' => time(),
|
|
||||||
'error' => $sMsg
|
|
||||||
]);
|
|
||||||
|
|
||||||
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
|
|
||||||
$job->delete();
|
$job->delete();
|
||||||
$this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
|
$this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
|
||||||
|
|
||||||
} catch (\RuntimeException $e) {
|
} catch (\RuntimeException $e) {
|
||||||
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
|
$this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
|
||||||
} catch (\LogicException $e) {
|
} catch (\LogicException $e) {
|
||||||
$this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
|
$this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job);
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
|
$this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
|
||||||
} finally {
|
} finally {
|
||||||
$executionTime = microtime(true) - $startTime;
|
$executionTime = microtime(true) - $startTime;
|
||||||
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
|
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
gc_collect_cycles();
|
gc_collect_cycles();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,21 +11,14 @@ class WechatQueryStatus
|
|||||||
private $oQueueJob;
|
private $oQueueJob;
|
||||||
private $QueueRedis;
|
private $QueueRedis;
|
||||||
private $maxRetries = 2;
|
private $maxRetries = 2;
|
||||||
private $logBuffer = [];
|
|
||||||
private $lastLogTime = 0;
|
|
||||||
private $logMaxSize = 1048576; // 1MB (1*1024*1024) - 修正注释与值匹配
|
|
||||||
private $lockExpire = 1800;
|
private $lockExpire = 1800;
|
||||||
private $completedExprie = 3600;
|
private $completedExprie = 3600;
|
||||||
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
|
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
$this->logPath = ROOT_PATH . 'public/queue_log/WechatQueryStatus_' . date('Ymd') . '.log';
|
|
||||||
$this->oQueueJob = new QueueJob;
|
$this->oQueueJob = new QueueJob;
|
||||||
$this->QueueRedis = QueueRedis::getInstance();
|
$this->QueueRedis = QueueRedis::getInstance();
|
||||||
$this->lastLogTime = time();
|
|
||||||
// 确保日志目录存在
|
|
||||||
$this->oQueueJob->ensureLogDirExists($this->logPath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function fire(Job $job, $data)
|
public function fire(Job $job, $data)
|
||||||
@@ -33,11 +26,17 @@ class WechatQueryStatus
|
|||||||
$startTime = microtime(true);
|
$startTime = microtime(true);
|
||||||
$this->oQueueJob->log("-----------队列任务开始-----------");
|
$this->oQueueJob->log("-----------队列任务开始-----------");
|
||||||
|
|
||||||
|
// 检查数据库连接
|
||||||
|
if (!$this->oQueueJob->checkDbConnection(true)) {
|
||||||
|
$this->oQueueJob->log("数据库连接失败,无法执行任务,10秒后重试");
|
||||||
|
$job->release(10);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 检查Redis连接状态
|
// 检查Redis连接状态
|
||||||
if (!$this->QueueRedis->getConnectionStatus()) {
|
if (!$this->QueueRedis->getConnectionStatus()) {
|
||||||
$this->oQueueJob->log("Redis连接失败,10秒后重试");
|
$this->oQueueJob->log("Redis连接失败,10秒后重试");
|
||||||
$job->release(10);
|
$job->release(10);
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,56 +71,37 @@ class WechatQueryStatus
|
|||||||
$job->release($delay);
|
$job->release($delay);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$aParam = [
|
|
||||||
'job_id' => $sRedisKey,
|
|
||||||
'job_class' => $sClassName,
|
|
||||||
'status' => 0,
|
|
||||||
'create_time' => time(),
|
|
||||||
'params' => json_encode($data, self::JSON_OPTIONS)
|
|
||||||
];
|
|
||||||
|
|
||||||
$iLogId = $this->oQueueJob->addLog($aParam);
|
|
||||||
if (!$iLogId) {
|
|
||||||
$this->oQueueJob->log("日志创建失败,释放锁并删除任务:".json_encode($data, self::JSON_OPTIONS));
|
|
||||||
$this->QueueRedis->releaseRedisLock($sRedisKey, $sRedisValue);
|
|
||||||
$job->delete();
|
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
|
||||||
|
// 执行核心任务前再次检查连接
|
||||||
|
$result = $this->oQueueJob->checkDbConnection();
|
||||||
|
if (!$result) {
|
||||||
|
throw new \RuntimeException("数据库连接异常,无法执行核心任务");
|
||||||
|
}
|
||||||
|
|
||||||
// 查询状态
|
// 查询状态
|
||||||
$oAiarticle = new Aiarticle;
|
$oAiarticle = new Aiarticle;
|
||||||
$aResult = json_decode($oAiarticle->queryStatus($data),true);
|
$aResult = json_decode($oAiarticle->queryStatus($data),true);
|
||||||
$sMsg = empty($aResult['msg']) ? '查询草稿箱文章是否发布失败' : $aResult['msg'];
|
$sMsg = empty($aResult['msg']) ? '查询草稿箱文章是否发布失败' : $aResult['msg'];
|
||||||
|
|
||||||
//更新任务状态
|
|
||||||
$this->oQueueJob->updateLog([
|
|
||||||
'log_id' => $iLogId,
|
|
||||||
'status' => 1,
|
|
||||||
'update_time' => time(),
|
|
||||||
'error' => $sMsg
|
|
||||||
]);
|
|
||||||
|
|
||||||
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie);
|
//更新日志
|
||||||
|
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie,$sRedisValue);
|
||||||
$job->delete();
|
$job->delete();
|
||||||
$this->oQueueJob->log("任务执行成功 | 日志ID: {$iLogId} | 执行日志:{$sMsg}");
|
$this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey} | 执行日志:{$sMsg}");
|
||||||
|
|
||||||
} catch (\RuntimeException $e) {
|
} catch (\RuntimeException $e) {
|
||||||
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
|
$this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
|
||||||
} catch (\LogicException $e) {
|
} catch (\LogicException $e) {
|
||||||
$this->oQueueJob->handleNonRetryableException($e, $iLogId, $sRedisKey, $job);
|
$this->oQueueJob->handleNonRetryableException($e,$sRedisKey,$sRedisKey,$job);
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
$this->oQueueJob->handleRetryableException($e, $iLogId, $sRedisKey, $job);
|
$this->oQueueJob->handleRetryableException($e,$sRedisKey,$sRedisKey,$job);
|
||||||
} finally {
|
} finally {
|
||||||
$executionTime = microtime(true) - $startTime;
|
$executionTime = microtime(true) - $startTime;
|
||||||
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
|
$this->oQueueJob->log("任务执行完成,耗时: " . number_format($executionTime, 4) . "秒");
|
||||||
$this->oQueueJob->flushLog();
|
|
||||||
gc_collect_cycles();
|
gc_collect_cycles();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user