Files
tougao/application/common/QueueRedis.php
2025-07-23 22:12:01 +08:00

265 lines
7.4 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\common;
use think\Db;
class QueueRedis
{
private $redis;
private $config;
private static $instance;
private function __construct()
{
$this->config = \think\Config::get('queue');
$this->connect();
}
public static function getInstance()
{
if (!self::$instance) {
self::$instance = new self();
}
return self::$instance;
}
private function connect()
{
// 只在首次调用或连接断开时创建连接
if (!$this->redis) {
$this->redis = new \Redis();
// 使用长连接pconnect避免频繁创建连接
$connectMethod = $this->config['persistent'] ? 'pconnect' : 'connect';
$this->redis->$connectMethod(
$this->config['host'] ?? '127.0.0.1',
$this->config['port'] ?? 6379
);
// 始终执行认证(空密码会被 Redis 忽略)
if (!empty($this->config['password'])) {
$this->redis->auth($this->config['password']);
}
$this->redis->select($this->config['select'] ?? 0);
}
return $this->redis;
}
// 使用 SET 命令原子操作设置锁
public function setRedisLock($key, $value, $expire)
{
try {
return $this->connect()->set($key, $value, ['nx', 'ex' => $expire]);
} catch (\Exception $e) {
return false;
}
}
// 设置Redis值
public function setRedisValue($key, $value, $expire = null)
{
try {
$redis = $this->connect();
if ($expire) {
return $redis->setex($key, $expire, $value);
} else {
return $redis->set($key, $value);
}
} catch (\Exception $e) {
return false;
}
}
// 获取Redis值
public function getRedisValue($key)
{
try {
return $this->connect()->get($key);
} catch (\Exception $e) {
return null;
}
}
// 安全释放锁(仅当值匹配时删除)
public function releaseRedisLock($key, $value)
{
// 使用Lua脚本确保原子性
$script = <<<LUA
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
LUA; $redis = $this->connect();
$result = $redis->eval($script, [$key, $value], 1);
return $result;
}
// 获取锁剩余时间
public function getLockTtl($key)
{
try {
return $this->connect()->ttl($key);
} catch (\Exception $e) {
return -1;
}
}
// 任务开始时的批量操作
public function startJob($sRedisKey, $sRedisValue, $expire)
{
try {
$redis = $this->connect();
// 先尝试设置锁,成功后再设置状态
if ($redis->set($sRedisKey, $sRedisValue, ['nx', 'ex' => $expire])) {
$redis->set($sRedisKey . ':status', 'processing', $expire);
return true;
}
return false;
} catch (\Exception $e) {
Log::error("Redis批量操作失败: {$e->getMessage()}");
return false;
}
}
// 任务结束时的批量操作
public function finishJob($sRedisKey, $status, $expire)
{
try {
$redis = $this->connect();
// 使用Lua脚本确保原子性
$script = <<<LUA
redis.call('SET', KEYS[1] .. ':status', ARGV[1], 'EX', ARGV[2])
return redis.call('DEL', KEYS[1])
LUA;
return $redis->eval($script, [$sRedisKey, $status, $expire], 1) === 1;
} catch (\Exception $e) {
Log::error("Redis完成任务失败: {$e->getMessage()}");
return false;
}
}
// 记录处理进度
public function recordProcessingStart($key, $totalQuestions)
{
try {
$redis = $this->connect();
$redis->hMSet($key, [
'status' => 'processing',
'total' => $totalQuestions,
'completed' => 0,
'start_time' => time()
]);
$redis->expire($key, 10800); // 6小时过期
return true;
} catch (\Exception $e) {
return false;
}
}
// 多问题按条件拆分成两个队列新增日志记录
public function recordQuestionProcessingStart($key, $totalQuestions)
{
try {
$redis = $this->connect();
$redis->hMSet($key, [
'status' => 'processing',
'total' => $totalQuestions,
'completed' => 0,
'start_time' => time(),
'queue_1_completed' => 0,
'queue_2_completed' => 0
]);
$redis->expire($key, 10800); // 6小时过期
return true;
} catch (\Exception $e) {
return false;
}
}
// 多问题按条件拆分成两个队列更新日志记录
public function updateQuestionProcessingProgress($key, $sKeyName)
{
$redis = $this->connect();
// 获取总数
$total = $redis->hGet($key, 'total');
if (!$total) {
return 0;
}
if(!empty($sKeyName)){
$redis->hIncrBy($key, $sKeyName, 1);
}
//获取每个队列完成数量
$queue_1_completed = $redis->hGet($key, 'queue_1_completed');
$queue_2_completed = $redis->hGet($key, 'queue_2_completed');
$completed = $queue_1_completed + $queue_2_completed;
if($completed > $total){
return 100;
}
// 计算进度
$iProgress = round(($completed / $total) * 100, 2);
// 事务更新多个字段
$redis->hSet($key, 'completed', $completed);
$redis->hSet($key, 'progress', $iProgress);
if ($iProgress >= 100) {
$redis->hSet($key, 'status', 'completed');
$redis->hSet($key, 'end_time', time());
}
return $iProgress;
}
// 更新处理进度
public function updateProcessingProgress($key, $completed)
{
$redis = $this->connect();
// 获取总数
$total = $redis->hGet($key, 'total');
if (!$total) {
return 0;
}
// 计算进度
$iProgress = round(($completed / $total) * 100, 2);
// 事务更新多个字段
$redis->hSet($key, 'completed', $completed);
$redis->hSet($key, 'progress', $iProgress);
if ($iProgress >= 100) {
$redis->hSet($key, 'status', 'completed');
$redis->hSet($key, 'end_time', time());
}
return $iProgress;
}
// 保存分块进度
public function saveChunkProgress($key, $chunkIndex, $content)
{
$redis = $this->connect();
$redis->hSet($key, "chunk_{$chunkIndex}", $content);
// 确保设置过期时间(如果已设置则忽略)
$redis->expire($key, 10800);
return true;
}
public function getJobStatus($jobId)
{
try {
return $this->getRedisValue($jobId . ':status');
} catch (\Exception $e) {
return null;
}
}
public function getConnectionStatus()
{
try {
return $this->connect()->ping();
} catch (\Exception $e) {
return false;
}
}
}
?>