From 603deadbb4b6c526bd9934d012ba6a98b6c14d1f Mon Sep 17 00:00:00 2001 From: chengxl Date: Fri, 15 Aug 2025 14:48:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=93=8D=E4=BD=9Credis=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/common/QueueRedis.php | 39 +++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/application/common/QueueRedis.php b/application/common/QueueRedis.php index f8a77d1..ba1582f 100644 --- a/application/common/QueueRedis.php +++ b/application/common/QueueRedis.php @@ -274,6 +274,24 @@ LUA; } + // 获取分块进度 + public function getChunkProgress($key, $chunkIndex = '') + { + $redis = $this->connect(); + + if(empty($chunkIndex)){ + + $aChunkValue = $redis->hGetAll($key); + }else{ + $sChunkKey = "chunk_{$chunkIndex}"; + // 获取值 + $aChunkValue = $redis->hGet($key, $sChunkKey); + } + + + return $aChunkValue; + } + public function getJobStatus($jobId) { try { @@ -292,5 +310,26 @@ LUA; } } + // 强制释放锁(Lua脚本保证原子性) + public function forceReleaseLock($key, $value) + { + $script = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end'; + return $this->redis->eval($script, [$key, $value], 1) > 0; + } + + // 在QueueRedis类中新增原子化方法(替换原有分散调用) + public function atomicJobUpdate($key, $status, $expire, $value) + { + // 使用Lua脚本原子执行"判断锁值+更新状态+设置过期" + $script = ' + if redis.call("get", KEYS[1]) == ARGV[1] then + redis.call("set", KEYS[1], ARGV[2], "EX", ARGV[3]) + return 1 + end + return 0 + '; + return $this->redis->eval($script, [$key, $value, $status, $expire], 1) > 0; + } + } ?> \ No newline at end of file