config = \think\Config::get('queue'); $this->connect(); } public static function getInstance() { if (!self::$instance) { self::$instance = new self(); } return self::$instance; } private function connect() { // 只在首次调用或连接断开时创建连接 if (!$this->redis) { $this->redis = new \Redis(); // 使用长连接(pconnect)避免频繁创建连接 $connectMethod = $this->config['persistent'] ? 'pconnect' : 'connect'; $this->redis->$connectMethod( $this->config['host'] ?? '127.0.0.1', $this->config['port'] ?? 6379 ); // 始终执行认证(空密码会被 Redis 忽略) if (!empty($this->config['password'])) { $this->redis->auth($this->config['password']); } $this->redis->select($this->config['select'] ?? 0); } return $this->redis; } // 使用 SET 命令原子操作设置锁 public function setRedisLock($key, $value, $expire) { try { return $this->connect()->set($key, $value, ['nx', 'ex' => $expire]); } catch (\Exception $e) { return false; } } // 设置Redis值 public function setRedisValue($key, $value, $expire = null) { try { $redis = $this->connect(); if ($expire) { return $redis->setex($key, $expire, $value); } else { return $redis->set($key, $value); } } catch (\Exception $e) { return false; } } // 获取Redis值 public function getRedisValue($key) { try { return $this->connect()->get($key); } catch (\Exception $e) { return null; } } // 安全释放锁(仅当值匹配时删除) public function releaseRedisLock($key, $value) { // 使用Lua脚本确保原子性 $script = <<connect(); $result = $redis->eval($script, [$key, $value], 1); return $result; } // 获取锁剩余时间 public function getLockTtl($key) { try { return $this->connect()->ttl($key); } catch (\Exception $e) { return -1; } } // 任务开始时的批量操作 // public function startJob($sRedisKey, $sRedisValue, $expire) // { // try { // $redis = $this->connect(); // // 先尝试设置锁,成功后再设置状态 // if ($redis->set($sRedisKey, $sRedisValue, ['nx', 'ex' => $expire])) { // $redis->set($sRedisKey . ':status', 'processing', $expire); // return true; // } // return false; // } catch (\Exception $e) { // Log::error("Redis批量操作失败: {$e->getMessage()}"); // return false; // } // } public function startJob($sRedisKey, $sRedisValue, $expire){ try { $redis = $this->connect(); // Lua脚本:原子化执行"检查-加锁-设状态" $luaScript = <<eval( $luaScript, [ $sRedisKey, // KEYS[1]:主锁键 $sRedisKey . ':status', // KEYS[2]:状态键 $sRedisValue, // ARGV[1]:锁值(含进程标识) $expire // ARGV[2]:过期时间(秒) ], 2 // KEYS数量 ); return $result === 1; } catch (\Exception $e) { return false; } } /** * 标记任务状态并释放锁(带所有权验证) * @param string $sRedisKey 任务锁的键名 * @param string $status 状态(completed/failed) * @param int $expire 状态的过期时间(秒) * @param string $sRedisValue 锁的值(用于验证所有权) * @return bool 是否执行成功 */ // public function finishJob($sRedisKey, $status, $expire, $sRedisValue){ // try { // $redis = $this->connect(); // // Lua 脚本:先验证锁所有权,再设置状态并删除锁 // $script = <<eval($script, [$sRedisKey, $sRedisValue, $status, $expire], 1); // return $result === 1; // } catch (\Exception $e) { // Log::error("Redis完成任务失败: {$e->getMessage()} | 键: {$sRedisKey}"); // return false; // } // } public function finishJob($sRedisKey, $status, $expire, $sRedisValue) { try { $redis = $this->connect(); // Lua脚本:原子化"验证-设状态-删锁" $script = <<eval( $script, [ $sRedisKey, $sRedisKey . ':status', $sRedisValue, $status, $expire // 建议设为86400(1天),覆盖队列最大重试周期 ], 2 // KEYS数量为2(锁键+状态键) ); return $result === 1; } catch (\Exception $e) { return false; } } // 记录处理进度 public function recordProcessingStart($key, $totalQuestions) { try { $redis = $this->connect(); //判断是否执行 $sStatus = $redis->hGet($key, 'status'); if (!empty($sStatus) && $sStatus == 'completed') { return 2; } if (!empty($sStatus) && $sStatus == 'processing') { return 3; } $redis->hMSet($key, [ 'status' => 'processing', 'total' => $totalQuestions, 'completed' => 0, 'start_time' => time() ]); $redis->expire($key, 10800); // 6小时过期 return 1; } catch (\Exception $e) { return 4; } } // 多问题按条件拆分成两个队列新增日志记录 public function recordQuestionProcessingStart($key, $totalQuestions) { try { $redis = $this->connect(); //判断是否执行 $sStatus = $redis->hGet($key, 'status'); if (!empty($sStatus) && $sStatus == 'completed') { return 2; } if (!empty($sStatus) && $sStatus == 'processing') { return 3; } $redis->hMSet($key, [ 'status' => 'processing', 'total' => $totalQuestions, 'completed' => 0, 'start_time' => time(), 'queue_1_completed' => 0, 'queue_2_completed' => 0 ]); $redis->expire($key, 10800); // 6小时过期 return 1; } catch (\Exception $e) { return 4; } } // 多问题按条件拆分成两个队列更新日志记录 public function updateQuestionProcessingProgress($key, $sKeyName) { $redis = $this->connect(); // 获取总数 $total = $redis->hGet($key, 'total'); if (!$total) { return 0; } if(!empty($sKeyName)){ $redis->hIncrBy($key, $sKeyName, 1); } //获取每个队列完成数量 $queue_1_completed = $redis->hGet($key, 'queue_1_completed'); $queue_2_completed = $redis->hGet($key, 'queue_2_completed'); $completed = $queue_1_completed + $queue_2_completed; if($completed > $total){ return 100; } // 计算进度 $iProgress = round(($completed / $total) * 100, 2); // 事务更新多个字段 $redis->hSet($key, 'completed', $completed); $redis->hSet($key, 'progress', $iProgress); if ($iProgress >= 100) { $redis->hSet($key, 'status', 'completed'); $redis->hSet($key, 'end_time', time()); } return $iProgress; } // 更新处理进度 public function updateProcessingProgress($key, $completed) { $redis = $this->connect(); // 获取总数 $total = $redis->hGet($key, 'total'); if (!$total) { return 0; } // 计算进度 $iProgress = round(($completed / $total) * 100, 2); // 事务更新多个字段 $redis->hSet($key, 'completed', $completed); $redis->hSet($key, 'progress', $iProgress); if ($iProgress >= 100) { $redis->hSet($key, 'status', 'completed'); $redis->hSet($key, 'end_time', time()); } return $iProgress; } // 保存分块进度 public function saveChunkProgress($key, $chunkIndex, $content) { $redis = $this->connect(); $redis->hSet($key, "chunk_{$chunkIndex}", $content); // 确保设置过期时间(如果已设置则忽略) $redis->expire($key, 10800); return true; } // 获取分块进度 public function getChunkProgress($key, $chunkIndex = '-1') { $redis = $this->connect(); if($chunkIndex == '-1'){ $aChunkValue = $redis->hGetAll($key); }else{ $sChunkKey = "chunk_{$chunkIndex}"; // 获取值 $aChunkValue = $redis->hGet($key, $sChunkKey); } return $aChunkValue; } public function getJobStatus($jobId) { try { return $this->getRedisValue($jobId . ':status'); } catch (\Exception $e) { return null; } } public function getConnectionStatus() { try { return $this->connect()->ping(); } catch (\Exception $e) { return false; } } // 强制释放锁(Lua脚本保证原子性) public function forceReleaseLock($key, $value) { $script = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end'; return $this->redis->eval($script, [$key, $value], 1) > 0; } // 在QueueRedis类中新增原子化方法(替换原有分散调用) public function atomicJobUpdate($key, $status, $expire, $value) { // 使用Lua脚本原子执行"判断锁值+更新状态+设置过期" $script = ' if redis.call("get", KEYS[1]) == ARGV[1] then redis.call("set", KEYS[1], ARGV[2], "EX", ARGV[3]) return 1 end return 0 '; return $this->redis->eval($script, [$key, $value, $status, $expire], 1) > 0; } } ?>