操作redis新增方法
This commit is contained in:
@@ -274,6 +274,24 @@ LUA;
|
||||
|
||||
}
|
||||
|
||||
// 获取分块进度
|
||||
public function getChunkProgress($key, $chunkIndex = '')
|
||||
{
|
||||
$redis = $this->connect();
|
||||
|
||||
if(empty($chunkIndex)){
|
||||
|
||||
$aChunkValue = $redis->hGetAll($key);
|
||||
}else{
|
||||
$sChunkKey = "chunk_{$chunkIndex}";
|
||||
// 获取值
|
||||
$aChunkValue = $redis->hGet($key, $sChunkKey);
|
||||
}
|
||||
|
||||
|
||||
return $aChunkValue;
|
||||
}
|
||||
|
||||
public function getJobStatus($jobId)
|
||||
{
|
||||
try {
|
||||
@@ -292,5 +310,26 @@ LUA;
|
||||
}
|
||||
}
|
||||
|
||||
// 强制释放锁(Lua脚本保证原子性)
|
||||
public function forceReleaseLock($key, $value)
|
||||
{
|
||||
$script = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
|
||||
return $this->redis->eval($script, [$key, $value], 1) > 0;
|
||||
}
|
||||
|
||||
// 在QueueRedis类中新增原子化方法(替换原有分散调用)
|
||||
public function atomicJobUpdate($key, $status, $expire, $value)
|
||||
{
|
||||
// 使用Lua脚本原子执行"判断锁值+更新状态+设置过期"
|
||||
$script = '
|
||||
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||||
redis.call("set", KEYS[1], ARGV[2], "EX", ARGV[3])
|
||||
return 1
|
||||
end
|
||||
return 0
|
||||
';
|
||||
return $this->redis->eval($script, [$key, $value, $status, $expire], 1) > 0;
|
||||
}
|
||||
|
||||
}
|
||||
?>
|
||||
Reference in New Issue
Block a user