调整
This commit is contained in:
@@ -108,22 +108,54 @@ LUA; $redis = $this->connect();
|
||||
}
|
||||
|
||||
// 任务开始时的批量操作
|
||||
public function startJob($sRedisKey, $sRedisValue, $expire)
|
||||
{
|
||||
// public function startJob($sRedisKey, $sRedisValue, $expire)
|
||||
// {
|
||||
// try {
|
||||
// $redis = $this->connect();
|
||||
// // 先尝试设置锁,成功后再设置状态
|
||||
// if ($redis->set($sRedisKey, $sRedisValue, ['nx', 'ex' => $expire])) {
|
||||
// $redis->set($sRedisKey . ':status', 'processing', $expire);
|
||||
// return true;
|
||||
// }
|
||||
// return false;
|
||||
// } catch (\Exception $e) {
|
||||
// Log::error("Redis批量操作失败: {$e->getMessage()}");
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
public function startJob($sRedisKey, $sRedisValue, $expire){
|
||||
try {
|
||||
$redis = $this->connect();
|
||||
// 先尝试设置锁,成功后再设置状态
|
||||
if ($redis->set($sRedisKey, $sRedisValue, ['nx', 'ex' => $expire])) {
|
||||
$redis->set($sRedisKey . ':status', 'processing', $expire);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
||||
// Lua脚本:原子化执行"检查-加锁-设状态"
|
||||
$luaScript = <<<LUA
|
||||
-- 检查锁是否已存在
|
||||
if redis.call('exists', KEYS[1]) == 1 then
|
||||
return 0 -- 锁已存在,返回失败
|
||||
end
|
||||
-- 原子化设置锁和状态
|
||||
redis.call('set', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2])
|
||||
redis.call('set', KEYS[2], 'processing', 'EX', ARGV[2])
|
||||
return 1 -- 操作成功
|
||||
LUA;
|
||||
|
||||
// 执行脚本:KEYS为锁键和状态键,ARGV为锁值和过期时间
|
||||
$result = $redis->eval(
|
||||
$luaScript,
|
||||
[
|
||||
$sRedisKey, // KEYS[1]:主锁键
|
||||
$sRedisKey . ':status', // KEYS[2]:状态键
|
||||
$sRedisValue, // ARGV[1]:锁值(含进程标识)
|
||||
$expire // ARGV[2]:过期时间(秒)
|
||||
],
|
||||
2 // KEYS数量
|
||||
);
|
||||
|
||||
return $result === 1;
|
||||
} catch (\Exception $e) {
|
||||
Log::error("Redis批量操作失败: {$e->getMessage()}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 标记任务状态并释放锁(带所有权验证)
|
||||
* @param string $sRedisKey 任务锁的键名
|
||||
@@ -132,29 +164,69 @@ LUA; $redis = $this->connect();
|
||||
* @param string $sRedisValue 锁的值(用于验证所有权)
|
||||
* @return bool 是否执行成功
|
||||
*/
|
||||
public function finishJob($sRedisKey, $status, $expire, $sRedisValue){
|
||||
// public function finishJob($sRedisKey, $status, $expire, $sRedisValue){
|
||||
// try {
|
||||
// $redis = $this->connect();
|
||||
// // Lua 脚本:先验证锁所有权,再设置状态并删除锁
|
||||
// $script = <<<LUA
|
||||
// if redis.call('GET', KEYS[1]) ~= ARGV[1] then
|
||||
// return 0
|
||||
// end
|
||||
|
||||
// redis.call('SET', KEYS[1] .. ':status', ARGV[2], 'EX', ARGV[3])
|
||||
|
||||
// redis.call('DEL', KEYS[1])
|
||||
|
||||
// return 1
|
||||
// LUA;
|
||||
|
||||
// // 执行脚本:参数依次为 键名列表、锁的值、状态、过期时间
|
||||
// $result = $redis->eval($script, [$sRedisKey, $sRedisValue, $status, $expire], 1);
|
||||
// return $result === 1;
|
||||
// } catch (\Exception $e) {
|
||||
// Log::error("Redis完成任务失败: {$e->getMessage()} | 键: {$sRedisKey}");
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
public function finishJob($sRedisKey, $status, $expire, $sRedisValue)
|
||||
{
|
||||
try {
|
||||
$redis = $this->connect();
|
||||
// Lua 脚本:先验证锁所有权,再设置状态并删除锁
|
||||
$script = <<<LUA
|
||||
if redis.call('GET', KEYS[1]) ~= ARGV[1] then
|
||||
return 0
|
||||
$redis = $this->connect();
|
||||
|
||||
// Lua脚本:原子化"验证-设状态-删锁"
|
||||
$script = <<<LUA
|
||||
-- 1. 验证锁所有权
|
||||
local currentLock = redis.call('get', KEYS[1])
|
||||
if currentLock ~= ARGV[1] then
|
||||
return 0 -- 非持有者,拒绝操作
|
||||
end
|
||||
|
||||
redis.call('SET', KEYS[1] .. ':status', ARGV[2], 'EX', ARGV[3])
|
||||
-- 2. 设置最终状态(延长过期时间,确保被后续重试进程读取)
|
||||
redis.call('set', KEYS[2], ARGV[2], 'EX', ARGV[3])
|
||||
|
||||
redis.call('DEL', KEYS[1])
|
||||
-- 3. 原子化删除主锁
|
||||
redis.call('del', KEYS[1])
|
||||
|
||||
return 1
|
||||
return 1 -- 操作成功
|
||||
LUA;
|
||||
|
||||
// 执行脚本:参数依次为 键名列表、锁的值、状态、过期时间
|
||||
$result = $redis->eval($script, [$sRedisKey, $sRedisValue, $status, $expire], 1);
|
||||
return $result === 1;
|
||||
} catch (\Exception $e) {
|
||||
Log::error("Redis完成任务失败: {$e->getMessage()} | 键: {$sRedisKey}");
|
||||
return false;
|
||||
}
|
||||
// 执行脚本:参数为锁键、状态键、锁值、状态、过期时间
|
||||
$result = $redis->eval(
|
||||
$script,
|
||||
[
|
||||
$sRedisKey,
|
||||
$sRedisKey . ':status',
|
||||
$sRedisValue,
|
||||
$status,
|
||||
$expire // 建议设为86400(1天),覆盖队列最大重试周期
|
||||
],
|
||||
2 // KEYS数量为2(锁键+状态键)
|
||||
);
|
||||
|
||||
return $result === 1;
|
||||
} catch (\Exception $e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// 记录处理进度
|
||||
public function recordProcessingStart($key, $totalQuestions)
|
||||
|
||||
Reference in New Issue
Block a user