diff --git a/application/common/QueueRedis.php b/application/common/QueueRedis.php index f8a77d1..ba1582f 100644 --- a/application/common/QueueRedis.php +++ b/application/common/QueueRedis.php @@ -274,6 +274,24 @@ LUA; } + // 获取分块进度 + public function getChunkProgress($key, $chunkIndex = '') + { + $redis = $this->connect(); + + if(empty($chunkIndex)){ + + $aChunkValue = $redis->hGetAll($key); + }else{ + $sChunkKey = "chunk_{$chunkIndex}"; + // 获取值 + $aChunkValue = $redis->hGet($key, $sChunkKey); + } + + + return $aChunkValue; + } + public function getJobStatus($jobId) { try { @@ -292,5 +310,26 @@ LUA; } } + // 强制释放锁(Lua脚本保证原子性) + public function forceReleaseLock($key, $value) + { + $script = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end'; + return $this->redis->eval($script, [$key, $value], 1) > 0; + } + + // 在QueueRedis类中新增原子化方法(替换原有分散调用) + public function atomicJobUpdate($key, $status, $expire, $value) + { + // 使用Lua脚本原子执行"判断锁值+更新状态+设置过期" + $script = ' + if redis.call("get", KEYS[1]) == ARGV[1] then + redis.call("set", KEYS[1], ARGV[2], "EX", ARGV[3]) + return 1 + end + return 0 + '; + return $this->redis->eval($script, [$key, $value, $status, $expire], 1) > 0; + } + } ?> \ No newline at end of file