队列提取公共方法

This commit is contained in:
chengxl
2025-07-22 11:19:38 +08:00
parent 73cbf94a53
commit 58c4fc27aa
3 changed files with 633 additions and 0 deletions

View File

@@ -0,0 +1,212 @@
<?php
namespace app\common;
use think\Db;
class QueueRedis
{
private $redis;
private $config;
private static $instance;
private function __construct()
{
$this->config = \think\Config::get('queue');
$this->connect();
}
public static function getInstance()
{
if (!self::$instance) {
self::$instance = new self();
}
return self::$instance;
}
private function connect()
{
// 只在首次调用或连接断开时创建连接
if (!$this->redis) {
$this->redis = new \Redis();
// 使用长连接pconnect避免频繁创建连接
$connectMethod = $this->config['persistent'] ? 'pconnect' : 'connect';
$this->redis->$connectMethod(
$this->config['host'] ?? '127.0.0.1',
$this->config['port'] ?? 6379
);
// 始终执行认证(空密码会被 Redis 忽略)
$this->redis->auth($this->config['password'] ?? '');
$this->redis->select($this->config['select'] ?? 0);
}
return $this->redis;
}
// 使用 SET 命令原子操作设置锁
public function setRedisLock($key, $value, $expire)
{
try {
return $this->connect()->set($key, $value, ['nx', 'ex' => $expire]);
} catch (\Exception $e) {
return false;
}
}
// 设置Redis值
public function setRedisValue($key, $value, $expire = null)
{
try {
$redis = $this->connect();
if ($expire) {
return $redis->setex($key, $expire, $value);
} else {
return $redis->set($key, $value);
}
} catch (\Exception $e) {
return false;
}
}
// 获取Redis值
public function getRedisValue($key)
{
try {
return $this->connect()->get($key);
} catch (\Exception $e) {
return null;
}
}
// 安全释放锁(仅当值匹配时删除)
public function releaseRedisLock($key, $value)
{
// 使用Lua脚本确保原子性
$script = <<<LUA
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
LUA; $redis = $this->connect();
$result = $redis->eval($script, [$key, $value], 1);
return $result;
}
// 获取锁剩余时间
public function getLockTtl($key)
{
try {
return $this->connect()->ttl($key);
} catch (\Exception $e) {
return -1;
}
}
// 任务开始时的批量操作
public function startJob($sRedisKey, $sRedisValue, $expire)
{
try {
$redis = $this->connect();
// 先尝试设置锁,成功后再设置状态
if ($redis->set($sRedisKey, $sRedisValue, ['nx', 'ex' => $expire])) {
$redis->set($sRedisKey . ':status', 'processing', $expire);
return true;
}
return false;
} catch (\Exception $e) {
Log::error("Redis批量操作失败: {$e->getMessage()}");
return false;
}
}
// 任务结束时的批量操作
public function finishJob($sRedisKey, $status, $expire)
{
try {
$redis = $this->connect();
// 使用Lua脚本确保原子性
$script = <<<LUA
redis.call('SET', KEYS[1] .. ':status', ARGV[1], 'EX', ARGV[2])
return redis.call('DEL', KEYS[1])
LUA;
return $redis->eval($script, [$sRedisKey, $status, $expire], 1) === 1;
} catch (\Exception $e) {
Log::error("Redis完成任务失败: {$e->getMessage()}");
return false;
}
}
// 记录处理进度
public function recordProcessingStart($key, $totalQuestions)
{
try {
$redis = $this->connect();
$redis->hMSet($key, [
'status' => 'processing',
'total' => $totalQuestions,
'completed' => 0,
'start_time' => time()
]);
$redis->expire($key, 21600); // 6小时过期
return true;
} catch (\Exception $e) {
return false;
}
}
// 更新处理进度
public function updateProcessingProgress($key, $completed)
{
$redis = $this->connect();
// 获取总数
$total = $redis->hGet($key, 'total');
if (!$total) {
return false;
}
// 计算进度
$iProgress = round(($completed / $total) * 100, 2);
// 事务更新多个字段
$redis->hSet($key, 'completed', $completed);
$redis->hSet($key, 'progress', $iProgress);
if ($iProgress >= 100) {
$redis->hSet($key, 'status', 'completed');
$redis->hSet($key, 'end_time', time());
}
return $iProgress;
}
// 保存分块进度
public function saveChunkProgress($key, $chunkIndex, $content)
{
$redis = $this->connect();
$redis->hSet($key, "chunk_{$chunkIndex}", $content);
// 确保设置过期时间(如果已设置则忽略)
$redis->expire($key, 86400);
return true;
}
public function getJobStatus($jobId)
{
try {
return $this->getRedisValue($jobId . ':status');
} catch (\Exception $e) {
return null;
}
}
public function getConnectionStatus()
{
try {
return $this->connect()->ping() === '+PONG';
} catch (\Exception $e) {
return false;
}
}
}
?>