Files
tougao/application/common/QueueJob.php
2025-07-22 16:40:27 +08:00

340 lines
9.8 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\common;
use think\Db;
use app\common\QueueRedis;
class QueueJob
{
// 必填参数
protected $aField = ['job_id', 'job_class', 'status', 'create_time', 'update_time', 'error', 'params'];
private $logPath;
private $QueueRedis;
private $maxRetries = 2;
private $logBuffer = [];
private $lastLogTime = 0;
private $logMaxSize = 1048576; // 1MB (1*1024*1024)
const JSON_OPTIONS = JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR;
public function __construct()
{
$this->QueueRedis = QueueRedis::getInstance();
$this->lastLogTime = time();
}
/**
* 记录任务开始
* @param array $aParam
* @return int 日志ID失败返回0
*/
public function addLog($aParam = [])
{
// 数据过滤(只保留必填字段)
$aInsert = [];
foreach ($this->aField as $field) {
if (isset($aParam[$field])) {
$aInsert[$field] = $aParam[$field];
}
}
// 补充默认值
if (!isset($aInsert['create_time'])) {
$aInsert['create_time'] = time();
}
if (!isset($aInsert['update_time'])) {
$aInsert['update_time'] = $aInsert['create_time'];
}
try {
return Db::name('wechat_queue_logs')->insertGetId($aInsert);
} catch (\Exception $e) {
$this->log("添加任务日志失败: " . $e->getMessage() . " | 参数: " . json_encode($aInsert, self::JSON_OPTIONS));
return 0;
}
}
/**
* 记录任务状态更新
* @param array $aParam
* @return bool
*/
public function updateLog($aParam = [])
{
$iLogId = empty($aParam['log_id']) ? 0 : $aParam['log_id'];
if (empty($iLogId)) {
$this->log("更新日志失败: 缺少log_id");
return false;
}
// 数据过滤
$aUpdate = [];
foreach ($this->aField as $field) {
if (isset($aParam[$field])) {
$aUpdate[$field] = $aParam[$field];
}
}
// 强制更新时间
$aUpdate['update_time'] = time();
try {
return Db::name('wechat_queue_logs')
->where('log_id', $iLogId)
->limit(1)
->update($aUpdate) > 0;
} catch (\Exception $e) {
$this->log("更新任务日志失败 [ID:{$iLogId}]: " . $e->getMessage());
return false;
}
}
/**
* 设置日志路径并确保目录存在
* @param string $logPath
* @throws \RuntimeException
*/
public function ensureLogDirExists($logPath = '')
{
if (empty($logPath)) {
$error = "日志路径不能为空";
$this->log($error);
return $error;
}
$this->logPath = $logPath;
$logDir = dirname($this->logPath);
// 检查并创建目录(处理权限问题)
if (!is_dir($logDir)) {
$oldUmask = umask(0);
$created = mkdir($logDir, 0755, true);
umask($oldUmask);
if (!$created || !is_dir($logDir)) {
$error = "无法创建日志目录: {$logDir} (权限不足)";
$this->log($error);
return $error;
}
}
}
/**
* 写入日志到缓冲区
* @param string $message
*/
public function log($message)
{
// 防止缓冲区溢出
if (count($this->logBuffer) >= 1000) {
$this->flushLog();
}
$time = date('H:i:s');
$this->logBuffer[] = "[$time] $message\n";
// 缓冲区满或超时则刷新
if (count($this->logBuffer) >= 50 || time() - $this->lastLogTime > 10) {
$this->flushLog();
}
}
/**
* 刷新日志缓冲区到文件
*/
public function flushLog()
{
if (empty($this->logBuffer)) {
return;
}
// 检查日志路径是否设置
if (empty($this->logPath)) {
$this->logBuffer = [];
return;
}
// 检查文件大小并处理
$this->checkAndTruncateLog();
$fp = fopen($this->logPath, 'a');
if ($fp === false) {
// 紧急写入失败日志(避免递归)
$errorMsg = "[" . date('H:i:s') . "] 错误: 无法打开日志文件 {$this->logPath}\n";
error_log($errorMsg); // 写入系统日志
$this->logBuffer = [];
return;
}
try {
// 尝试获取文件锁
if (flock($fp, LOCK_EX)) {
fwrite($fp, implode('', $this->logBuffer));
flock($fp, LOCK_UN);
} else {
// 无锁情况下尝试写入
fwrite($fp, implode('', $this->logBuffer));
$this->logBuffer[] = "[" . date('H:i:s') . "] 警告: 日志写入未加锁,可能存在冲突风险\n";
}
} catch (\Exception $e) {
$errorMsg = "[" . date('H:i:s') . "] 错误: 写入日志失败: {$e->getMessage()}\n";
fwrite($fp, $errorMsg);
} finally {
fclose($fp);
}
$this->logBuffer = [];
$this->lastLogTime = time();
}
/**
* 检查日志文件大小,超过限制则清空
*/
public function checkAndTruncateLog()
{
if (empty($this->logPath) || !file_exists($this->logPath)) {
return;
}
// 清除文件状态缓存并获取大小
clearstatcache(true, $this->logPath);
$fileSize = @filesize($this->logPath);
if ($fileSize === false) {
$this->log("错误: 无法获取日志文件大小 {$this->logPath}");
return;
}
if ($fileSize >= $this->logMaxSize) {
$fp = fopen($this->logPath, 'w');
if ($fp === false) {
$this->log("错误: 无法清空日志文件 {$this->logPath}");
return;
}
try {
if (flock($fp, LOCK_EX)) {
// 二次检查文件大小(避免竞态条件)
clearstatcache(true, $this->logPath);
if (filesize($this->logPath) >= $this->logMaxSize) {
ftruncate($fp, 0);
$this->log("日志文件超过" . $this->formatFileSize($this->logMaxSize) . ",已清空");
}
flock($fp, LOCK_UN);
}
} catch (\Exception $e) {
$this->log("错误: 清空日志文件失败: {$e->getMessage()}");
} finally {
fclose($fp);
}
}
}
/**
* 格式化文件大小(字节转人类可读格式)
* @param int $bytes
* @return string
*/
public function formatFileSize($bytes)
{
if ($bytes <= 0) {
return '0 B';
}
$units = ['B', 'KB', 'MB', 'GB', 'TB'];
$unitIndex = min(floor(log($bytes, 1024)), count($units) - 1);
$size = $bytes / pow(1024, $unitIndex);
return number_format($size, 2) . ' ' . $units[$unitIndex];
}
/**
* 获取重试延迟时间
* @param string $errorMsg
* @return int
*/
public function getRetryDelay($errorMsg)
{
$delayMap = [
'MySQL server has gone away' => 60,
'timeout' => 30,
'OpenAI' => 45,
'network' => 60
];
foreach ($delayMap as $keyword => $delay) {
if (stripos($errorMsg, $keyword) !== false) { // 不区分大小写匹配
return $delay;
}
}
return 10;
}
/**
* 处理可重试异常
* @param \Exception $e
* @param int $iLogId
* @param string $sRedisKey
* @param \think\queue\Job $job
*/
public function handleRetryableException($e, $iLogId, $sRedisKey, $job)
{
$sMsg = empty($e->getMessage()) ? '可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("可重试异常: {$sMsg} | 堆栈: {$sTrace}");
if ($iLogId > 0) {
$this->updateLog([
'log_id' => $iLogId,
'status' => 2,
'error' => $sMsg . ':' . $sTrace,
]);
}
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600);
$attempts = $job->attempts();
if ($attempts >= $this->maxRetries) {
$this->log("超过最大重试次数({$this->maxRetries}),停止重试");
$job->delete();
} else {
$delay = $this->getRetryDelay($sMsg);
$this->log("{$delay}秒后重试({$attempts}/{$this->maxRetries})");
$job->release($delay);
}
}
/**
* 处理不可重试异常
* @param \Exception $e
* @param int $iLogId
* @param string $sRedisKey
* @param \think\queue\Job $job
*/
public function handleNonRetryableException($e, $iLogId, $sRedisKey, $job)
{
$sMsg = empty($e->getMessage()) ? '不可重试异常' : $e->getMessage();
$sTrace = empty($e->getTraceAsString()) ? '' : $e->getTraceAsString();
$this->log("不可重试异常: {$sMsg} | 堆栈: {$sTrace}");
if ($iLogId > 0) {
$this->updateLog([
'log_id' => $iLogId,
'status' => 3, // 3:不可重试失败
'error' => $sMsg . ':' . $sTrace,
]);
}
$this->QueueRedis->finishJob($sRedisKey, 'failed', 3600);
$this->log("不可重试错误,直接删除任务");
$job->delete();
}
/**
* 析构函数:确保最后日志被写入
*/
public function __destruct()
{
$this->flushLog();
}
}