oQueueJob = new QueueJob(); $this->QueueRedis = QueueRedis::getInstance(); } public function fire(Job $job, $data) { $this->oQueueJob->init($job); $rawBody = empty($job->getRawBody()) ? '' : $job->getRawBody(); $jobData = empty($rawBody) ? [] : json_decode($rawBody, true); $jobId = empty($jobData['id']) ? 'unknown' : $jobData['id']; $sRedisKey = ''; $sRedisValue = ''; $this->oQueueJob->log("-----------队列任务开始-----------"); $this->oQueueJob->log("当前任务ID: {$jobId}, 尝试次数: {$job->attempts()}"); try { $checkId = intval(isset($data['check_id']) ? $data['check_id'] : 0); $sClassName = get_class($this); $sRedisKey = "queue_job:{$sClassName}:{$checkId}"; $sRedisValue = uniqid() . '_' . getmypid(); if (!$this->oQueueJob->acquireLock($sRedisKey, $sRedisValue, $job)) { return; } if ($checkId <= 0) { $job->delete(); return; } $row = Db::name('article_reference_check_result')->where('id', $checkId)->find(); if (empty($row)) { $job->delete(); return; } if (intval($row['status']) === 1) { $job->delete(); return; } try { $mainInfo = Db::name('article_main')->where('am_id', $row['am_id'])->find(); $contentA = trim($mainInfo['content']);//trim((string)(isset($row['origin_text']) ? $row['origin_text'] : '')); if ($contentA === '' && !empty($row['content_a'])) { $contentA = trim((string)$row['content_a']); } $contentB = trim((string)(isset($row['refer_text']) ? $row['refer_text'] : '')); if ($contentB === '' && intval($row['p_refer_id']) > 0) { $refer = Db::name('production_article_refer') ->where('p_refer_id', intval($row['p_refer_id'])) ->where('status', 0) ->find(); if ($refer) { $contentB = (new ReferenceCheckService())->formatReferForLlm($refer); } } if ($contentA === '' || $contentB === '') { $this->markFailed($checkId, 'Missing content_a or reference text'); $job->delete(); return; } $llm = new LLMService(); $llmResult = $llm->checkReference($contentA, $contentB); $isMatch = !empty($llmResult['is_match']); Db::name('article_reference_check_result')->where('id', $checkId)->update([ 'is_match' => $isMatch ? 1 : 0, 'confidence' => $llmResult['confidence'], 'reason' => $llmResult['reason'], 'status' => 1, 'error_msg' => '', 'updated_at' => date('Y-m-d H:i:s'), ]); $amId = intval(isset($row['am_id']) ? $row['am_id'] : 0); if ($amId > 0) { (new ReferenceCheckService())->syncAmRefCheckStatus($amId); } $this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue); $job->delete(); $this->oQueueJob->log("任务执行成功 | 日志ID: {$sRedisKey}"); } catch (\Exception $e) { $this->oQueueJob->log('ReferenceCheck error: ' . $e->getMessage()); if ($job->attempts() >= 3) { $this->markFailed($checkId, $e->getMessage()); $job->delete(); return; } $job->release(30); } } catch (\RuntimeException $e) { $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job); } catch (\LogicException $e) { $this->oQueueJob->handleNonRetryableException($e, $sRedisKey, $sRedisValue, $job); } catch (\Exception $e) { $this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job); } finally { $this->oQueueJob->finnal(); } } private function markFailed($checkId, $msg) { $row = Db::name('article_reference_check_result')->where('id', $checkId)->find(); Db::name('article_reference_check_result')->where('id', $checkId)->update([ 'status' => 2, 'error_msg' => mb_substr($msg, 0, 500), 'updated_at' => date('Y-m-d H:i:s'), ]); $amId = empty($row) ? 0 : intval(isset($row['am_id']) ? $row['am_id'] : 0); if ($amId > 0) { (new ReferenceCheckService())->syncAmRefCheckStatus($amId); } } }