Files
tougao/application/common/QueueRedis.php
2025-08-15 14:48:10 +08:00

335 lines
9.8 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\common;
use think\Db;
class QueueRedis
{
private $redis;
private $config;
private static $instance;
private function __construct()
{
$this->config = \think\Config::get('queue');
$this->connect();
}
public static function getInstance()
{
if (!self::$instance) {
self::$instance = new self();
}
return self::$instance;
}
private function connect()
{
// 只在首次调用或连接断开时创建连接
if (!$this->redis) {
$this->redis = new \Redis();
// 使用长连接pconnect避免频繁创建连接
$connectMethod = $this->config['persistent'] ? 'pconnect' : 'connect';
$this->redis->$connectMethod(
$this->config['host'] ?? '127.0.0.1',
$this->config['port'] ?? 6379
);
// 始终执行认证(空密码会被 Redis 忽略)
if (!empty($this->config['password'])) {
$this->redis->auth($this->config['password']);
}
$this->redis->select($this->config['select'] ?? 0);
}
return $this->redis;
}
// 使用 SET 命令原子操作设置锁
public function setRedisLock($key, $value, $expire)
{
try {
return $this->connect()->set($key, $value, ['nx', 'ex' => $expire]);
} catch (\Exception $e) {
return false;
}
}
// 设置Redis值
public function setRedisValue($key, $value, $expire = null)
{
try {
$redis = $this->connect();
if ($expire) {
return $redis->setex($key, $expire, $value);
} else {
return $redis->set($key, $value);
}
} catch (\Exception $e) {
return false;
}
}
// 获取Redis值
public function getRedisValue($key)
{
try {
return $this->connect()->get($key);
} catch (\Exception $e) {
return null;
}
}
// 安全释放锁(仅当值匹配时删除)
public function releaseRedisLock($key, $value)
{
// 使用Lua脚本确保原子性
$script = <<<LUA
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
LUA; $redis = $this->connect();
$result = $redis->eval($script, [$key, $value], 1);
return $result;
}
// 获取锁剩余时间
public function getLockTtl($key)
{
try {
return $this->connect()->ttl($key);
} catch (\Exception $e) {
return -1;
}
}
// 任务开始时的批量操作
public function startJob($sRedisKey, $sRedisValue, $expire)
{
try {
$redis = $this->connect();
// 先尝试设置锁,成功后再设置状态
if ($redis->set($sRedisKey, $sRedisValue, ['nx', 'ex' => $expire])) {
$redis->set($sRedisKey . ':status', 'processing', $expire);
return true;
}
return false;
} catch (\Exception $e) {
Log::error("Redis批量操作失败: {$e->getMessage()}");
return false;
}
}
/**
* 标记任务状态并释放锁(带所有权验证)
* @param string $sRedisKey 任务锁的键名
* @param string $status 状态completed/failed
* @param int $expire 状态的过期时间(秒)
* @param string $sRedisValue 锁的值(用于验证所有权)
* @return bool 是否执行成功
*/
public function finishJob($sRedisKey, $status, $expire, $sRedisValue){
try {
$redis = $this->connect();
// Lua 脚本:先验证锁所有权,再设置状态并删除锁
$script = <<<LUA
if redis.call('GET', KEYS[1]) ~= ARGV[1] then
return 0
end
redis.call('SET', KEYS[1] .. ':status', ARGV[2], 'EX', ARGV[3])
redis.call('DEL', KEYS[1])
return 1
LUA;
// 执行脚本:参数依次为 键名列表、锁的值、状态、过期时间
$result = $redis->eval($script, [$sRedisKey, $sRedisValue, $status, $expire], 1);
return $result === 1;
} catch (\Exception $e) {
Log::error("Redis完成任务失败: {$e->getMessage()} | 键: {$sRedisKey}");
return false;
}
}
// 记录处理进度
public function recordProcessingStart($key, $totalQuestions)
{
try {
$redis = $this->connect();
//判断是否执行
$sStatus = $redis->hGet($key, 'status');
if (!empty($sStatus) && $sStatus == 'completed') {
return 2;
}
if (!empty($sStatus) && $sStatus == 'processing') {
return 3;
}
$redis->hMSet($key, [
'status' => 'processing',
'total' => $totalQuestions,
'completed' => 0,
'start_time' => time()
]);
$redis->expire($key, 10800); // 6小时过期
return 1;
} catch (\Exception $e) {
return 4;
}
}
// 多问题按条件拆分成两个队列新增日志记录
public function recordQuestionProcessingStart($key, $totalQuestions)
{
try {
$redis = $this->connect();
//判断是否执行
$sStatus = $redis->hGet($key, 'status');
if (!empty($sStatus) && $sStatus == 'completed') {
return 2;
}
if (!empty($sStatus) && $sStatus == 'processing') {
return 3;
}
$redis->hMSet($key, [
'status' => 'processing',
'total' => $totalQuestions,
'completed' => 0,
'start_time' => time(),
'queue_1_completed' => 0,
'queue_2_completed' => 0
]);
$redis->expire($key, 10800); // 6小时过期
return 1;
} catch (\Exception $e) {
return 4;
}
}
// 多问题按条件拆分成两个队列更新日志记录
public function updateQuestionProcessingProgress($key, $sKeyName)
{
$redis = $this->connect();
// 获取总数
$total = $redis->hGet($key, 'total');
if (!$total) {
return 0;
}
if(!empty($sKeyName)){
$redis->hIncrBy($key, $sKeyName, 1);
}
//获取每个队列完成数量
$queue_1_completed = $redis->hGet($key, 'queue_1_completed');
$queue_2_completed = $redis->hGet($key, 'queue_2_completed');
$completed = $queue_1_completed + $queue_2_completed;
if($completed > $total){
return 100;
}
// 计算进度
$iProgress = round(($completed / $total) * 100, 2);
// 事务更新多个字段
$redis->hSet($key, 'completed', $completed);
$redis->hSet($key, 'progress', $iProgress);
if ($iProgress >= 100) {
$redis->hSet($key, 'status', 'completed');
$redis->hSet($key, 'end_time', time());
}
return $iProgress;
}
// 更新处理进度
public function updateProcessingProgress($key, $completed)
{
$redis = $this->connect();
// 获取总数
$total = $redis->hGet($key, 'total');
if (!$total) {
return 0;
}
// 计算进度
$iProgress = round(($completed / $total) * 100, 2);
// 事务更新多个字段
$redis->hSet($key, 'completed', $completed);
$redis->hSet($key, 'progress', $iProgress);
if ($iProgress >= 100) {
$redis->hSet($key, 'status', 'completed');
$redis->hSet($key, 'end_time', time());
}
return $iProgress;
}
// 保存分块进度
public function saveChunkProgress($key, $chunkIndex, $content)
{
$redis = $this->connect();
$redis->hSet($key, "chunk_{$chunkIndex}", $content);
// 确保设置过期时间(如果已设置则忽略)
$redis->expire($key, 10800);
return true;
}
// 获取分块进度
public function getChunkProgress($key, $chunkIndex = '')
{
$redis = $this->connect();
if(empty($chunkIndex)){
$aChunkValue = $redis->hGetAll($key);
}else{
$sChunkKey = "chunk_{$chunkIndex}";
// 获取值
$aChunkValue = $redis->hGet($key, $sChunkKey);
}
return $aChunkValue;
}
public function getJobStatus($jobId)
{
try {
return $this->getRedisValue($jobId . ':status');
} catch (\Exception $e) {
return null;
}
}
public function getConnectionStatus()
{
try {
return $this->connect()->ping();
} catch (\Exception $e) {
return false;
}
}
// 强制释放锁Lua脚本保证原子性
public function forceReleaseLock($key, $value)
{
$script = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
return $this->redis->eval($script, [$key, $value], 1) > 0;
}
// 在QueueRedis类中新增原子化方法替换原有分散调用
public function atomicJobUpdate($key, $status, $expire, $value)
{
// 使用Lua脚本原子执行"判断锁值+更新状态+设置过期"
$script = '
if redis.call("get", KEYS[1]) == ARGV[1] then
redis.call("set", KEYS[1], ARGV[2], "EX", ARGV[3])
return 1
end
return 0
';
return $this->redis->eval($script, [$key, $value, $status, $expire], 1) > 0;
}
}
?>