redis = new \Redis(); $this->redis->connect($config['host'], $config['port']); if (!empty($config['password'])) { $this->redis->auth($config['password']); } $this->redis->select($config['select']); // 初始化 Redis 连接 // $this->redis = Cache::store('redis')->handler(); } // 记录任务开始 public function addLog($aParam = []) { //数据处理 $aField = $this->aField; $aInsert = []; foreach ($aField as $key => $value) { if(isset($aParam[$value])){ $aInsert[$value] = $aParam[$value]; } } $result = 0; if(!empty($aInsert)){ $result = DB::name('wechat_queue_logs')->insertGetId($aParam); } return $result; } // 记录任务成功 public function updateLog($aParam = []) { $iLogId = empty($aParam['log_id']) ? 0 : $aParam['log_id']; if(empty($iLogId)){ return false; } //数据处理 $aField = $this->aField; $aUpdate = []; foreach ($aField as $key => $value) { if(isset($aParam[$value])){ $aUpdate[$value] = $aParam[$value]; } } unset($aParam['log_id']); $result = DB::name('wechat_queue_logs')->where('log_id',$iLogId)->limit(1)->update($aUpdate); return $result; } /** * 写入Reids 防止1小时内重复操作 */ public function setRedisLabel($aParam = []){ //判断数据是否为空 if(empty($aParam['redis_key'])){ return 3; } //获取值 $sValue = $this->getRedisLabel($aParam['redis_key']); if($sValue == $aParam['redis_key']){ return 4; } $result = Cache::set($aParam['redis_key'], $aParam['redis_key'], 3600); if($result == true){ return 1; } //写入 return 2; } /** * 获取Reids值 */ public function getRedisLabel($sRedisKey = ''){ if(empty($sRedisKey)){ return ''; } return Cache::get($sRedisKey); } // 使用SETNX原子操作设置锁 public function setRedisLock($key, $value, $expire) { return $this->redis->set($key, $value, ['nx', 'ex' => $expire]); } // 获取Redis值 public function getRedisValue($key) { return $this->redis->get($key); } // 安全释放锁(仅当值匹配时删除) public function releaseRedisLock($key, $value) { // 使用Lua脚本确保原子性 $script = <<redis->eval($script, [$key, $value], 1); } } ?>