From 9f447f6f03d1cdb5b4c7ad6580075f4723542b35 Mon Sep 17 00:00:00 2001 From: chengxl Date: Thu, 21 Aug 2025 10:38:15 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/common/QueueRedis.php | 124 +++++++++++++++++++++++------- 1 file changed, 98 insertions(+), 26 deletions(-) diff --git a/application/common/QueueRedis.php b/application/common/QueueRedis.php index ba1582f..b96e2d3 100644 --- a/application/common/QueueRedis.php +++ b/application/common/QueueRedis.php @@ -108,22 +108,54 @@ LUA; $redis = $this->connect(); } // 任务开始时的批量操作 - public function startJob($sRedisKey, $sRedisValue, $expire) - { + // public function startJob($sRedisKey, $sRedisValue, $expire) + // { + // try { + // $redis = $this->connect(); + // // 先尝试设置锁,成功后再设置状态 + // if ($redis->set($sRedisKey, $sRedisValue, ['nx', 'ex' => $expire])) { + // $redis->set($sRedisKey . ':status', 'processing', $expire); + // return true; + // } + // return false; + // } catch (\Exception $e) { + // Log::error("Redis批量操作失败: {$e->getMessage()}"); + // return false; + // } + // } + public function startJob($sRedisKey, $sRedisValue, $expire){ try { $redis = $this->connect(); - // 先尝试设置锁,成功后再设置状态 - if ($redis->set($sRedisKey, $sRedisValue, ['nx', 'ex' => $expire])) { - $redis->set($sRedisKey . ':status', 'processing', $expire); - return true; - } - return false; + + // Lua脚本:原子化执行"检查-加锁-设状态" + $luaScript = <<eval( + $luaScript, + [ + $sRedisKey, // KEYS[1]:主锁键 + $sRedisKey . ':status', // KEYS[2]:状态键 + $sRedisValue, // ARGV[1]:锁值(含进程标识) + $expire // ARGV[2]:过期时间(秒) + ], + 2 // KEYS数量 + ); + + return $result === 1; } catch (\Exception $e) { - Log::error("Redis批量操作失败: {$e->getMessage()}"); return false; } } - /** * 标记任务状态并释放锁(带所有权验证) * @param string $sRedisKey 任务锁的键名 @@ -132,29 +164,69 @@ LUA; $redis = $this->connect(); * @param string $sRedisValue 锁的值(用于验证所有权) * @return bool 是否执行成功 */ - public function finishJob($sRedisKey, $status, $expire, $sRedisValue){ +// public function finishJob($sRedisKey, $status, $expire, $sRedisValue){ +// try { +// $redis = $this->connect(); +// // Lua 脚本:先验证锁所有权,再设置状态并删除锁 +// $script = <<eval($script, [$sRedisKey, $sRedisValue, $status, $expire], 1); +// return $result === 1; +// } catch (\Exception $e) { +// Log::error("Redis完成任务失败: {$e->getMessage()} | 键: {$sRedisKey}"); +// return false; +// } +// } + public function finishJob($sRedisKey, $status, $expire, $sRedisValue) + { try { - $redis = $this->connect(); - // Lua 脚本:先验证锁所有权,再设置状态并删除锁 - $script = <<connect(); + + // Lua脚本:原子化"验证-设状态-删锁" + $script = <<eval($script, [$sRedisKey, $sRedisValue, $status, $expire], 1); - return $result === 1; - } catch (\Exception $e) { - Log::error("Redis完成任务失败: {$e->getMessage()} | 键: {$sRedisKey}"); - return false; - } + // 执行脚本:参数为锁键、状态键、锁值、状态、过期时间 + $result = $redis->eval( + $script, + [ + $sRedisKey, + $sRedisKey . ':status', + $sRedisValue, + $status, + $expire // 建议设为86400(1天),覆盖队列最大重试周期 + ], + 2 // KEYS数量为2(锁键+状态键) + ); + + return $result === 1; + } catch (\Exception $e) { + return false; + } } // 记录处理进度 public function recordProcessingStart($key, $totalQuestions)