logFile = ROOT_PATH . 'runtime' . DS . 'plagiarism_task.log'; } // ---------- 顶层入口 ---------- /** * 提交查重(入队,立即返回 check_id) * * @param int $articleId 投稿 ID * @param string $filePath 本地可读的 PDF/DOCX 绝对路径 * @param int $triggeredBy 触发人 user_id(手工触发时编辑后台的 user_id) * @param string $source 'manual' / 'auto_xxx' * @return int check_id */ public function submit($articleId, $filePath, $triggeredBy = 0, $source = 'manual') { if (!is_file($filePath) || !is_readable($filePath)) { throw new Exception("File not readable: {$filePath}"); } $journalId = (int) Db::name('article') ->where('article_id', $articleId) ->value('journal_id'); $this->log("plagiarism submit is running"); $now = time(); $checkId = Db::name('plagiarism_check')->insertGetId([ 'article_id' => $articleId, 'journal_id' => $journalId, 'triggered_by' => $triggeredBy, 'trigger_source' => $source, 'state' => 1, // 上传中 'source_file_name' => basename($filePath), 'source_file_size' => filesize($filePath) ?: 0, 'ctime' => $now, 'utime' => $now, ]); Queue::push( 'app\\api\\job\\PlagiarismRun', ['check_id' => $checkId, 'file_path' => $filePath], self::QUEUE_CHAIN ); return (int)$checkId; } /** * Job 调用:仅创建 submission + 上传文件,随后由 PlagiarismWaitIngest 链式轮询 ingest,再 PlagiarismTriggerSimilarity。 */ public function runUploadOnly($checkId, $filePath) { $check = $this->mustGetCheck($checkId); $this->log('runUploadOnly start check_id=' . $checkId); $tii = new TurnitinService(); $articleTitle = (string) Db::name('article') ->where('article_id', $check['article_id']) ->value('title'); if ($articleTitle === '') { $articleTitle = 'Article #' . $check['article_id']; } $createResp = $tii->createSubmission([ 'title' => mb_substr($articleTitle, 0, 250), 'owner' => 'editor_' . $check['triggered_by'], 'submitter' => 'editor_' . $check['triggered_by'], 'metadata' => [ 'article_id' => (string) $check['article_id'], 'check_id' => (string) $check['check_id'], ], ]); $submissionId = isset($createResp['id']) ? $createResp['id'] : ''; if ($submissionId === '') { throw new Exception('Turnitin createSubmission returned empty id: ' . json_encode($createResp)); } $this->updateCheck($checkId, [ 'tii_submission_id' => $submissionId, 'raw_response' => json_encode($createResp, JSON_UNESCAPED_UNICODE), ]); $tii->uploadFile($submissionId, $filePath, basename($filePath)); $firstDelay = $this->ingestChainFirstDelaySec(); Queue::later( $firstDelay, self::JOB_WAIT_INGEST, ['check_id' => $checkId, 'attempt' => 1], self::QUEUE_CHAIN ); } /** * 单次 ingest 检查(由 PlagiarismWaitIngest 调用)。不在本方法内 sleep 长循环。 */ public function runIngestPollStep($checkId, $attempt = 1) { $check = $this->mustGetCheck($checkId); if (empty($check['tii_submission_id'])) { $this->markFailed($checkId, '[ingest] tii_submission_id empty'); return; } $this->log("runIngestPollStep is running"); $maxAttempts = $this->ingestChainMaxAttempts(); $interval = $this->ingestChainPollIntervalSec(); $tii = new TurnitinService(); try { $parsed = $tii->parseSubmissionIngestState($check['tii_submission_id']); } catch (\Throwable $e) { if ($attempt >= $maxAttempts) { $this->markFailed($checkId, '[ingest] request failed after ' . $attempt . ' tries: ' . $e->getMessage()); return; } Queue::later($interval, self::JOB_WAIT_INGEST, ['check_id' => $checkId, 'attempt' => $attempt + 1], self::QUEUE_CHAIN); return; } if (!empty($parsed['failed'])) { $this->markFailed($checkId, '[ingest] submission failed status=' . $parsed['status'] . ' ' . $parsed['snippet']); return; } if (!empty($parsed['ready'])) { Queue::push(self::JOB_TRIGGER_SIM, ['check_id' => $checkId, 'ingest_attempt' => $attempt], self::QUEUE_CHAIN); return; } if ($attempt >= $maxAttempts) { $this->markFailed($checkId, '[ingest] timeout last_status=' . ($parsed['status'] !== '' ? $parsed['status'] : '(empty)')); return; } Queue::later($interval, self::JOB_WAIT_INGEST, ['check_id' => $checkId, 'attempt' => $attempt + 1], self::QUEUE_CHAIN); } /** * 在 ingest 就绪后触发 similarity,并入队 PlagiarismPoll。 * 若仍返回 409,则重新入队 PlagiarismWaitIngest(不抛异常,避免误标失败)。 * * @param int $ingestAttempt 来自 WaitIngest 的 attempt,供 409 时继续轮询 */ public function runTriggerSimilarityOnly($checkId, $ingestAttempt = 1) { $check = $this->mustGetCheck($checkId); if (empty($check['tii_submission_id'])) { $this->markFailed($checkId, '[similarity] tii_submission_id empty'); return; } $this->log("runTriggerSimilarityOnly is running"); $tii = new TurnitinService(); $sid = $check['tii_submission_id']; try { $simResp = $tii->triggerSimilarity($sid); } catch (\Throwable $e) { $msg = $e->getMessage(); $is409 = (stripos($msg, '409') !== false || stripos($msg, 'CONFLICT') !== false) && (stripos($msg, 'not been completed') !== false || stripos($msg, 'completed yet') !== false); if ($is409) { $maxAttempts = $this->ingestChainMaxAttempts(); $next = $ingestAttempt + 1; if ($next > $maxAttempts) { $this->markFailed($checkId, '[similarity] still not ready after ingest attempts: ' . $msg); return; } $delay = max($this->ingestChainPollIntervalSec(), 20); Queue::later($delay, self::JOB_WAIT_INGEST, ['check_id' => $checkId, 'attempt' => $next], self::QUEUE_CHAIN); return; } throw $e; } $this->updateCheck($checkId, [ 'state' => 2, 'tii_report_status' => 'PROCESSING', 'raw_response' => json_encode($simResp, JSON_UNESCAPED_UNICODE), ]); Queue::later( self::POLL_INTERVAL, self::JOB_POLL, ['check_id' => $checkId, 'attempt' => 1], self::QUEUE_CHAIN ); } /** * @deprecated 与 runUploadOnly 等价;长耗时 ingest 已拆到队列 PlagiarismWaitIngest,勿在本方法内同步 wait。 */ public function runUploadAndTrigger($checkId, $filePath) { $this->runUploadOnly($checkId, $filePath); } /** * Job 调用:轮询 similarity 状态,完成后下载 PDF。未完成则重新入队。 */ public function runPollStatus($checkId, $attempt = 1) { $check = $this->mustGetCheck($checkId); if (empty($check['tii_submission_id'])) { $this->markFailed($checkId, '[poll] tii_submission_id empty'); return; } // try { $tii = new TurnitinService(); $statusResp = $tii->getSimilarityStatus($check['tii_submission_id']); $status = isset($statusResp['status']) ? strtoupper($statusResp['status']) : ''; $this->updateCheck($checkId, [ 'tii_report_status' => $status, 'attempts' => $attempt, 'raw_response' => json_encode($statusResp, JSON_UNESCAPED_UNICODE), ]); if ($status === 'COMPLETE') { $score = isset($statusResp['overall_match_percentage']) ? floatval($statusResp['overall_match_percentage']) : 0; // 下载 PDF + 取在线查看 URL $localPdf = $this->downloadAndStorePdf($tii, $check['tii_submission_id'], $checkId); $viewerInfo = $this->refreshViewerUrl($tii, $check['tii_submission_id']); $this->updateCheck($checkId, [ 'state' => 3, 'similarity_score' => $score, 'pdf_local_path' => $localPdf, 'view_only_url' => $viewerInfo['url'], 'view_only_url_expire' => $viewerInfo['expire'], 'error_msg' => '', ]); return; } if ($status === 'ERROR') { $errMsg = isset($statusResp['error_code']) ? (string)$statusResp['error_code'] : 'Turnitin reported ERROR'; $this->markFailed($checkId, '[poll] ' . $errMsg); return; } // PROCESSING 或其它中间态:继续轮询 if ($attempt >= self::MAX_POLL_ATTEMPTS) { $this->markFailed($checkId, '[poll] timeout after ' . $attempt . ' attempts'); return; } Queue::later( self::POLL_INTERVAL, self::JOB_POLL, ['check_id' => $checkId, 'attempt' => $attempt + 1], self::QUEUE_CHAIN ); // } catch (\Throwable $e) { // // 网络抖动不要直接 fail,给一定容错次数 // if ($attempt < self::MAX_POLL_ATTEMPTS) { // Queue::later( // self::POLL_INTERVAL, // self::JOB_POLL, // ['check_id' => $checkId, 'attempt' => $attempt + 1], // self::QUEUE_CHAIN // ); // $this->updateCheck($checkId, [ // 'attempts' => $attempt, // 'error_msg' => '[poll] transient: ' . $e->getMessage(), // ]); // return; // } // $this->markFailed($checkId, '[poll] exhausted: ' . $e->getMessage()); // throw $e; // } } /** * 重新生成在线查看 URL(已有的过期了用) * * @return array{url:string, expire:int, local_pdf:string} */ public function refreshViewerUrlFor($checkId) { $check = $this->mustGetCheck($checkId); if (empty($check['tii_submission_id'])) { throw new Exception('check has no tii_submission_id'); } $tii = new TurnitinService(); $info = $this->refreshViewerUrl($tii, $check['tii_submission_id']); $this->updateCheck($checkId, [ 'view_only_url' => $info['url'], 'view_only_url_expire' => $info['expire'], ]); return [ 'url' => $info['url'], 'expire' => $info['expire'], 'local_pdf' => $check['pdf_local_path'], ]; } // ---------- 内部 ---------- private function refreshViewerUrl($tii, $submissionId) { $resp = $tii->getViewerUrl($submissionId); $url = ''; if (isset($resp['viewer_url'])) { $url = (string)$resp['viewer_url']; } elseif (isset($resp['url'])) { $url = (string)$resp['url']; } // 默认 2 小时过期,保守起见 return ['url' => $url, 'expire' => time() + 7200]; } /** * 触发生成 + 轮询 + 下载 PDF 到本地,返回相对路径 */ private function downloadAndStorePdf($tii, $submissionId, $checkId) { // 1. 请求生成 $req = $tii->requestPdfReport($submissionId); $pdfId = isset($req['id']) ? $req['id'] : ''; if ($pdfId === '') { throw new Exception('requestPdfReport empty id: ' . json_encode($req)); } // 2. 内联轮询 PDF 状态(最多 3 分钟,每 6 秒一次) $maxLoops = 30; for ($i = 0; $i < $maxLoops; $i++) { $st = $tii->getPdfReportStatus($submissionId, $pdfId); $stCode = isset($st['status']) ? strtoupper($st['status']) : ''; if ($stCode === 'SUCCESS') { break; } if ($stCode === 'FAILED') { throw new Exception('PDF report generation failed: ' . json_encode($st)); } sleep(6); } // 3. 下载 $binary = $tii->downloadPdfReport($submissionId, $pdfId); if (!is_string($binary) || strlen($binary) < 100) { throw new Exception('downloaded pdf is empty/too small'); } // 4. 落盘 $rootDir = ROOT_PATH ?: dirname(dirname(__DIR__)); $absDir = rtrim($rootDir, '/\\') . DIRECTORY_SEPARATOR . self::REPORT_DIR; if (!is_dir($absDir)) { @mkdir($absDir, 0755, true); } $filename = sprintf('check_%d_%s.pdf', $checkId, date('Ymd_His')); $absPath = $absDir . DIRECTORY_SEPARATOR . $filename; $bytes = file_put_contents($absPath, $binary); if ($bytes === false || $bytes < 100) { throw new Exception('failed to save pdf to ' . $absPath); } return self::REPORT_DIR . '/' . $filename; } private function mustGetCheck($checkId) { $row = Db::name('plagiarism_check')->where('check_id', $checkId)->find(); if (!$row) { throw new Exception("plagiarism_check #{$checkId} not found"); } return $row; } private function updateCheck($checkId, array $data) { $data['utime'] = time(); Db::name('plagiarism_check')->where('check_id', $checkId)->update($data); } public function markFailed($checkId, $errMsg) { $this->log('markFailed check_id=' . $checkId); $this->updateCheck($checkId, [ 'state' => 4, 'error_msg' => mb_substr($errMsg, 0, 1000), ]); } private function ingestChainFirstDelaySec() { return max(3, (int) Env::get('turnitin.ingest_chain_first_delay', 10)); } private function ingestChainPollIntervalSec() { return max(60, (int) Env::get('turnitin.ingest_chain_poll_interval', 15)); } private function ingestChainMaxAttempts() { return max(10, (int) Env::get('turnitin.ingest_chain_max_attempts', 80)); } /** * 从 t_article_file 找到投稿主稿(manuscirpt)的本地绝对路径。 * file_url 在系统里可能是 URL 或相对路径,调用方负责保证可读。 * * @return string 文件绝对路径,找不到时抛异常 */ public function locateArticleManuscript($articleId) { $row = Db::name('article_file') ->where('article_id', $articleId) ->where('type_name', 'manuscirpt') // 历史拼写 ->order('file_id desc') ->find(); if (!$row || empty($row['file_url'])) { throw new Exception("article #{$articleId} has no manuscirpt file"); } return $this->resolveFileUrlToLocal($row['file_url']); } /** * 把 file_url(可能是 http URL 或相对路径)解析成本地绝对路径。 * 不同环境部署可能有差异,这里用 .env 配置的 STATIC_ROOT 作前缀。 */ public function resolveFileUrlToLocal($fileUrl) { $fileUrl = trim((string)$fileUrl); if ($fileUrl === '') { throw new Exception('empty file_url'); } // 已是绝对路径 if (preg_match('/^([a-zA-Z]:[\\\\\/]|\/)/', $fileUrl) && is_file($fileUrl)) { return $fileUrl; } $staticRoot = trim((string)Env::get('plagiarism.static_root', '')); $cdnPrefix = trim((string)Env::get('plagiarism.cdn_prefix', '')); // 是 http URL:先试着剥掉 cdn 前缀,映射到本地 if (preg_match('#^https?://#i', $fileUrl)) { if ($cdnPrefix !== '' && stripos($fileUrl, $cdnPrefix) === 0) { $rel = ltrim(substr($fileUrl, strlen($cdnPrefix)), '/'); $local = rtrim($staticRoot, '/\\') . DIRECTORY_SEPARATOR . $rel; if (is_file($local)) { return $local; } } // 实在不行,下载到 runtime/plagiarism/tmp 临时目录 return $this->downloadRemoteFile($fileUrl); } // 相对路径:拼 static_root if ($staticRoot !== '') { $local = rtrim($staticRoot, '/\\') . DIRECTORY_SEPARATOR . ltrim($fileUrl, '/\\'); if (is_file($local)) { return $local; } } throw new Exception("cannot resolve file_url to local path: {$fileUrl} (set [plagiarism] STATIC_ROOT/CDN_PREFIX in .env)"); } private function downloadRemoteFile($url) { $rootDir = ROOT_PATH ?: dirname(dirname(__DIR__)); $tmpDir = rtrim($rootDir, '/\\') . DIRECTORY_SEPARATOR . self::REPORT_DIR . DIRECTORY_SEPARATOR . 'tmp'; if (!is_dir($tmpDir)) { @mkdir($tmpDir, 0755, true); } $ext = pathinfo(parse_url($url, PHP_URL_PATH), PATHINFO_EXTENSION) ?: 'pdf'; $local = $tmpDir . DIRECTORY_SEPARATOR . md5($url) . '_' . time() . '.' . $ext; $ch = curl_init($url); $fh = fopen($local, 'wb'); curl_setopt_array($ch, [ CURLOPT_FILE => $fh, CURLOPT_FOLLOWLOCATION => true, CURLOPT_TIMEOUT => 120, CURLOPT_SSL_VERIFYPEER => false, ]); $ok = curl_exec($ch); $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch); fclose($fh); if (!$ok || $code !== 200 || filesize($local) < 100) { @unlink($local); throw new Exception("download failed url={$url} http={$code}"); } return $local; } public function getCheck($checkId) { return Db::name('plagiarism_check')->where('check_id', $checkId)->find(); } public function log($msg) { $line = date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL; @file_put_contents($this->logFile, $line, FILE_APPEND); } }