407 lines
12 KiB
PHP
407 lines
12 KiB
PHP
<?php
|
||
namespace app\common;
|
||
use think\Db;
|
||
class QueueRedis
|
||
{
|
||
|
||
private $redis;
|
||
private $config;
|
||
private static $instance;
|
||
|
||
private function __construct()
|
||
{
|
||
$this->config = \think\Config::get('queue');
|
||
$this->connect();
|
||
}
|
||
|
||
public static function getInstance()
|
||
{
|
||
if (!self::$instance) {
|
||
self::$instance = new self();
|
||
}
|
||
return self::$instance;
|
||
}
|
||
|
||
private function connect()
|
||
{
|
||
// 只在首次调用或连接断开时创建连接
|
||
if (!$this->redis) {
|
||
$this->redis = new \Redis();
|
||
|
||
// 使用长连接(pconnect)避免频繁创建连接
|
||
$connectMethod = $this->config['persistent'] ? 'pconnect' : 'connect';
|
||
$this->redis->$connectMethod(
|
||
$this->config['host'] ?? '127.0.0.1',
|
||
$this->config['port'] ?? 6379
|
||
);
|
||
// 始终执行认证(空密码会被 Redis 忽略)
|
||
if (!empty($this->config['password'])) {
|
||
$this->redis->auth($this->config['password']);
|
||
}
|
||
|
||
|
||
|
||
$this->redis->select($this->config['select'] ?? 0);
|
||
}
|
||
return $this->redis;
|
||
}
|
||
|
||
// 使用 SET 命令原子操作设置锁
|
||
public function setRedisLock($key, $value, $expire)
|
||
{
|
||
try {
|
||
return $this->connect()->set($key, $value, ['nx', 'ex' => $expire]);
|
||
} catch (\Exception $e) {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
// 设置Redis值
|
||
public function setRedisValue($key, $value, $expire = null)
|
||
{
|
||
try {
|
||
$redis = $this->connect();
|
||
if ($expire) {
|
||
return $redis->setex($key, $expire, $value);
|
||
} else {
|
||
return $redis->set($key, $value);
|
||
}
|
||
} catch (\Exception $e) {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
// 获取Redis值
|
||
public function getRedisValue($key)
|
||
{
|
||
try {
|
||
return $this->connect()->get($key);
|
||
} catch (\Exception $e) {
|
||
return null;
|
||
}
|
||
}
|
||
|
||
// 安全释放锁(仅当值匹配时删除)
|
||
public function releaseRedisLock($key, $value)
|
||
{
|
||
|
||
// 使用Lua脚本确保原子性
|
||
$script = <<<LUA
|
||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||
return redis.call("DEL", KEYS[1])
|
||
else
|
||
return 0
|
||
end
|
||
LUA; $redis = $this->connect();
|
||
$result = $redis->eval($script, [$key, $value], 1);
|
||
return $result;
|
||
}
|
||
|
||
// 获取锁剩余时间
|
||
public function getLockTtl($key)
|
||
{
|
||
try {
|
||
return $this->connect()->ttl($key);
|
||
} catch (\Exception $e) {
|
||
return -1;
|
||
}
|
||
}
|
||
|
||
// 任务开始时的批量操作
|
||
// public function startJob($sRedisKey, $sRedisValue, $expire)
|
||
// {
|
||
// try {
|
||
// $redis = $this->connect();
|
||
// // 先尝试设置锁,成功后再设置状态
|
||
// if ($redis->set($sRedisKey, $sRedisValue, ['nx', 'ex' => $expire])) {
|
||
// $redis->set($sRedisKey . ':status', 'processing', $expire);
|
||
// return true;
|
||
// }
|
||
// return false;
|
||
// } catch (\Exception $e) {
|
||
// Log::error("Redis批量操作失败: {$e->getMessage()}");
|
||
// return false;
|
||
// }
|
||
// }
|
||
public function startJob($sRedisKey, $sRedisValue, $expire){
|
||
try {
|
||
$redis = $this->connect();
|
||
|
||
// Lua脚本:原子化执行"检查-加锁-设状态"
|
||
$luaScript = <<<LUA
|
||
-- 检查锁是否已存在
|
||
if redis.call('exists', KEYS[1]) == 1 then
|
||
return 0 -- 锁已存在,返回失败
|
||
end
|
||
-- 原子化设置锁和状态
|
||
redis.call('set', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2])
|
||
redis.call('set', KEYS[2], 'processing', 'EX', ARGV[2])
|
||
return 1 -- 操作成功
|
||
LUA;
|
||
|
||
// 执行脚本:KEYS为锁键和状态键,ARGV为锁值和过期时间
|
||
$result = $redis->eval(
|
||
$luaScript,
|
||
[
|
||
$sRedisKey, // KEYS[1]:主锁键
|
||
$sRedisKey . ':status', // KEYS[2]:状态键
|
||
$sRedisValue, // ARGV[1]:锁值(含进程标识)
|
||
$expire // ARGV[2]:过期时间(秒)
|
||
],
|
||
2 // KEYS数量
|
||
);
|
||
|
||
return $result === 1;
|
||
} catch (\Exception $e) {
|
||
return false;
|
||
}
|
||
}
|
||
/**
|
||
* 标记任务状态并释放锁(带所有权验证)
|
||
* @param string $sRedisKey 任务锁的键名
|
||
* @param string $status 状态(completed/failed)
|
||
* @param int $expire 状态的过期时间(秒)
|
||
* @param string $sRedisValue 锁的值(用于验证所有权)
|
||
* @return bool 是否执行成功
|
||
*/
|
||
// public function finishJob($sRedisKey, $status, $expire, $sRedisValue){
|
||
// try {
|
||
// $redis = $this->connect();
|
||
// // Lua 脚本:先验证锁所有权,再设置状态并删除锁
|
||
// $script = <<<LUA
|
||
// if redis.call('GET', KEYS[1]) ~= ARGV[1] then
|
||
// return 0
|
||
// end
|
||
|
||
// redis.call('SET', KEYS[1] .. ':status', ARGV[2], 'EX', ARGV[3])
|
||
|
||
// redis.call('DEL', KEYS[1])
|
||
|
||
// return 1
|
||
// LUA;
|
||
|
||
// // 执行脚本:参数依次为 键名列表、锁的值、状态、过期时间
|
||
// $result = $redis->eval($script, [$sRedisKey, $sRedisValue, $status, $expire], 1);
|
||
// return $result === 1;
|
||
// } catch (\Exception $e) {
|
||
// Log::error("Redis完成任务失败: {$e->getMessage()} | 键: {$sRedisKey}");
|
||
// return false;
|
||
// }
|
||
// }
|
||
public function finishJob($sRedisKey, $status, $expire, $sRedisValue)
|
||
{
|
||
try {
|
||
$redis = $this->connect();
|
||
|
||
// Lua脚本:原子化"验证-设状态-删锁"
|
||
$script = <<<LUA
|
||
-- 1. 验证锁所有权
|
||
local currentLock = redis.call('get', KEYS[1])
|
||
if currentLock ~= ARGV[1] then
|
||
return 0 -- 非持有者,拒绝操作
|
||
end
|
||
|
||
-- 2. 设置最终状态(延长过期时间,确保被后续重试进程读取)
|
||
redis.call('set', KEYS[2], ARGV[2], 'EX', ARGV[3])
|
||
|
||
-- 3. 原子化删除主锁
|
||
redis.call('del', KEYS[1])
|
||
|
||
return 1 -- 操作成功
|
||
LUA;
|
||
|
||
// 执行脚本:参数为锁键、状态键、锁值、状态、过期时间
|
||
$result = $redis->eval(
|
||
$script,
|
||
[
|
||
$sRedisKey,
|
||
$sRedisKey . ':status',
|
||
$sRedisValue,
|
||
$status,
|
||
$expire // 建议设为86400(1天),覆盖队列最大重试周期
|
||
],
|
||
2 // KEYS数量为2(锁键+状态键)
|
||
);
|
||
|
||
return $result === 1;
|
||
} catch (\Exception $e) {
|
||
return false;
|
||
}
|
||
}
|
||
// 记录处理进度
|
||
public function recordProcessingStart($key, $totalQuestions)
|
||
{
|
||
try {
|
||
$redis = $this->connect();
|
||
//判断是否执行
|
||
$sStatus = $redis->hGet($key, 'status');
|
||
if (!empty($sStatus) && $sStatus == 'completed') {
|
||
return 2;
|
||
}
|
||
if (!empty($sStatus) && $sStatus == 'processing') {
|
||
return 3;
|
||
}
|
||
$redis->hMSet($key, [
|
||
'status' => 'processing',
|
||
'total' => $totalQuestions,
|
||
'completed' => 0,
|
||
'start_time' => time()
|
||
]);
|
||
$redis->expire($key, 10800); // 6小时过期
|
||
return 1;
|
||
} catch (\Exception $e) {
|
||
return 4;
|
||
}
|
||
}
|
||
// 多问题按条件拆分成两个队列新增日志记录
|
||
public function recordQuestionProcessingStart($key, $totalQuestions)
|
||
{
|
||
try {
|
||
$redis = $this->connect();
|
||
//判断是否执行
|
||
$sStatus = $redis->hGet($key, 'status');
|
||
if (!empty($sStatus) && $sStatus == 'completed') {
|
||
return 2;
|
||
}
|
||
if (!empty($sStatus) && $sStatus == 'processing') {
|
||
return 3;
|
||
}
|
||
$redis->hMSet($key, [
|
||
'status' => 'processing',
|
||
'total' => $totalQuestions,
|
||
'completed' => 0,
|
||
'start_time' => time(),
|
||
'queue_1_completed' => 0,
|
||
'queue_2_completed' => 0
|
||
]);
|
||
$redis->expire($key, 10800); // 6小时过期
|
||
return 1;
|
||
} catch (\Exception $e) {
|
||
return 4;
|
||
}
|
||
}
|
||
// 多问题按条件拆分成两个队列更新日志记录
|
||
public function updateQuestionProcessingProgress($key, $sKeyName)
|
||
{
|
||
$redis = $this->connect();
|
||
// 获取总数
|
||
$total = $redis->hGet($key, 'total');
|
||
if (!$total) {
|
||
return 0;
|
||
}
|
||
if(!empty($sKeyName)){
|
||
$redis->hIncrBy($key, $sKeyName, 1);
|
||
}
|
||
//获取每个队列完成数量
|
||
$queue_1_completed = $redis->hGet($key, 'queue_1_completed');
|
||
$queue_2_completed = $redis->hGet($key, 'queue_2_completed');
|
||
$completed = $queue_1_completed + $queue_2_completed;
|
||
if($completed > $total){
|
||
return 100;
|
||
}
|
||
// 计算进度
|
||
$iProgress = round(($completed / $total) * 100, 2);
|
||
// 事务更新多个字段
|
||
$redis->hSet($key, 'completed', $completed);
|
||
$redis->hSet($key, 'progress', $iProgress);
|
||
if ($iProgress >= 100) {
|
||
$redis->hSet($key, 'status', 'completed');
|
||
$redis->hSet($key, 'end_time', time());
|
||
}
|
||
return $iProgress;
|
||
|
||
}
|
||
|
||
// 更新处理进度
|
||
public function updateProcessingProgress($key, $completed)
|
||
{
|
||
$redis = $this->connect();
|
||
// 获取总数
|
||
$total = $redis->hGet($key, 'total');
|
||
if (!$total) {
|
||
return 0;
|
||
}
|
||
|
||
// 计算进度
|
||
$iProgress = round(($completed / $total) * 100, 2);
|
||
// 事务更新多个字段
|
||
$redis->hSet($key, 'completed', $completed);
|
||
$redis->hSet($key, 'progress', $iProgress);
|
||
if ($iProgress >= 100) {
|
||
$redis->hSet($key, 'status', 'completed');
|
||
$redis->hSet($key, 'end_time', time());
|
||
}
|
||
return $iProgress;
|
||
|
||
}
|
||
|
||
// 保存分块进度
|
||
public function saveChunkProgress($key, $chunkIndex, $content)
|
||
{
|
||
$redis = $this->connect();
|
||
$redis->hSet($key, "chunk_{$chunkIndex}", $content);
|
||
// 确保设置过期时间(如果已设置则忽略)
|
||
$redis->expire($key, 10800);
|
||
return true;
|
||
|
||
}
|
||
|
||
// 获取分块进度
|
||
public function getChunkProgress($key, $chunkIndex = '-1')
|
||
{
|
||
$redis = $this->connect();
|
||
|
||
if($chunkIndex == '-1'){
|
||
|
||
$aChunkValue = $redis->hGetAll($key);
|
||
}else{
|
||
$sChunkKey = "chunk_{$chunkIndex}";
|
||
// 获取值
|
||
$aChunkValue = $redis->hGet($key, $sChunkKey);
|
||
}
|
||
|
||
|
||
return $aChunkValue;
|
||
}
|
||
|
||
public function getJobStatus($jobId)
|
||
{
|
||
try {
|
||
return $this->getRedisValue($jobId . ':status');
|
||
} catch (\Exception $e) {
|
||
return null;
|
||
}
|
||
}
|
||
|
||
public function getConnectionStatus()
|
||
{
|
||
try {
|
||
return $this->connect()->ping();
|
||
} catch (\Exception $e) {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
// 强制释放锁(Lua脚本保证原子性)
|
||
public function forceReleaseLock($key, $value)
|
||
{
|
||
$script = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
|
||
return $this->redis->eval($script, [$key, $value], 1) > 0;
|
||
}
|
||
|
||
// 在QueueRedis类中新增原子化方法(替换原有分散调用)
|
||
public function atomicJobUpdate($key, $status, $expire, $value)
|
||
{
|
||
// 使用Lua脚本原子执行"判断锁值+更新状态+设置过期"
|
||
$script = '
|
||
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||
redis.call("set", KEYS[1], ARGV[2], "EX", ARGV[3])
|
||
return 1
|
||
end
|
||
return 0
|
||
';
|
||
return $this->redis->eval($script, [$key, $value, $status, $expire], 1) > 0;
|
||
}
|
||
|
||
}
|
||
?>
|