svc = new ReferenceCheckService(); } public function handleMessage(array $payload) { $pArticleId = intval(isset($payload['p_article_id']) ? $payload['p_article_id'] : 0); $batchId = intval(isset($payload['batch_id']) ? $payload['batch_id'] : 0); if ($pArticleId <= 0 || $batchId <= 0) { $this->svc->log('ReferenceCheckArticleWorker invalid payload'); return; } if (!$this->canStartArticleWork($batchId)) { $this->svc->log('ReferenceCheckArticleWorker defer batch_id=' . $batchId . ' other article running'); (new ReferenceCheckMqPublisher())->publishArticleStart($pArticleId, $batchId, isset($payload['trigger']) ? $payload['trigger'] : 'enqueue'); sleep(3); return; } if (!$this->claimBatch($batchId)) { $batch = $this->getBatch($batchId); if (empty($batch) || intval($batch['batch_status']) === self::BATCH_DONE) { return; } } $this->svc->log('ReferenceCheckArticleWorker start p_article_id=' . $pArticleId . ' batch_id=' . $batchId); $done = 0; $failed = 0; while (true) { $row = $this->fetchNextPendingRow($pArticleId); if (empty($row)) { break; } $checkId = $this->svc->resolveCheckRowId($row); if ($checkId <= 0) { continue; } $result = $this->processOneRow($checkId, $row); if ($result === 'ok') { $done++; } elseif ($result === 'failed') { $failed++; } } $this->finalizeBatch($batchId, $done, $failed); $this->svc->log('ReferenceCheckArticleWorker done p_article_id=' . $pArticleId . ' batch_id=' . $batchId . ' done=' . $done . ' failed=' . $failed); $this->publishNextWaitingBatch(); } private function canStartArticleWork($batchId) { $running = Db::name('article_reference_check_batch') ->where('batch_status', self::BATCH_RUNNING) ->where('id', '<>', intval($batchId)) ->count(); return intval($running) === 0; } private function claimBatch($batchId) { $now = date('Y-m-d H:i:s'); $affected = Db::name('article_reference_check_batch') ->where('id', intval($batchId)) ->whereIn('batch_status', [self::BATCH_WAITING, self::BATCH_RUNNING]) ->update([ 'batch_status' => self::BATCH_RUNNING, 'updated_at' => $now, ]); return intval($affected) > 0; } private function getBatch($batchId) { return Db::name('article_reference_check_batch')->where('id', intval($batchId))->find(); } private function fetchNextPendingRow($pArticleId) { return Db::name('article_reference_check_result') ->where('p_article_id', intval($pArticleId)) ->where('queue_status', ReferenceCheckService::QUEUE_PENDING) ->where('status', ReferenceCheckService::RECORD_PENDING) ->order('reference_no asc,am_id asc,text_start asc,id asc') ->find(); } /** * @return string ok|failed|skip */ private function processOneRow($checkId, array $row) { $claimed = Db::name('article_reference_check_result') ->where('id', intval($checkId)) ->where('queue_status', ReferenceCheckService::QUEUE_PENDING) ->update(['queue_status' => ReferenceCheckService::QUEUE_RUNNING]); if (intval($claimed) <= 0) { return 'skip'; } $retryCount = intval(isset($row['retry_count']) ? $row['retry_count'] : 0); try { $this->svc->runReferenceCheckOnce($checkId); $amId = intval(isset($row['am_id']) ? $row['am_id'] : 0); if ($amId > 0) { $this->svc->syncAmRefCheckStatus($amId); } $this->svc->markQueueRuntime($checkId, ReferenceCheckService::QUEUE_COMPLETED, $retryCount); return 'ok'; } catch (\Exception $e) { $this->svc->log('ReferenceCheckArticleWorker check_id=' . $checkId . ' err=' . $e->getMessage()); if ($retryCount < ReferenceCheckService::QUEUE_MAX_RETRY) { $this->svc->markQueueRuntime($checkId, ReferenceCheckService::QUEUE_PENDING, $retryCount + 1); return $this->processOneRow($checkId, array_merge($row, ['retry_count' => $retryCount + 1])); } try { $this->svc->updateCheckResult($checkId, [ 'status' => ReferenceCheckService::RECORD_FAILED, 'error_msg' => $e->getMessage(), ]); $this->svc->markQueueRuntime($checkId, ReferenceCheckService::QUEUE_FAILED, $retryCount); } catch (\Exception $e2) { \think\Log::error('ReferenceCheckArticleWorker markFailed: ' . $e2->getMessage()); } $amId = intval(isset($row['am_id']) ? $row['am_id'] : 0); if ($amId > 0) { $this->svc->syncAmRefCheckStatus($amId); } return 'failed'; } } private function finalizeBatch($batchId, $done, $failed) { $batch = $this->getBatch($batchId); if (empty($batch)) { return; } $total = intval($batch['total_count']); $status = self::BATCH_DONE; if ($failed > 0) { $status = self::BATCH_PARTIAL_FAILED; } Db::name('article_reference_check_batch')->where('id', intval($batchId))->update([ 'batch_status' => $status, 'done_count' => intval($done), 'failed_count' => intval($failed), 'updated_at' => date('Y-m-d H:i:s'), ]); if ($total > 0 && ($done + $failed) < $total) { $this->svc->log('ReferenceCheckArticleWorker batch_id=' . $batchId . ' incomplete total=' . $total); } } private function publishNextWaitingBatch() { $next = Db::name('article_reference_check_batch') ->where('batch_status', self::BATCH_WAITING) ->order('id asc') ->find(); if (empty($next)) { return; } try { (new ReferenceCheckMqPublisher())->publishArticleStart( intval($next['p_article_id']), intval($next['id']), isset($next['trigger']) ? $next['trigger'] : 'enqueue' ); } catch (\Exception $e) { $this->svc->log('publishNextWaitingBatch failed: ' . $e->getMessage()); \think\Log::error('publishNextWaitingBatch: ' . $e->getMessage()); } } }