This commit is contained in:
wangjinlei
2026-04-09 11:04:11 +08:00
parent a802b2e923
commit d92df3e103
3 changed files with 462 additions and 14 deletions

View File

@@ -603,20 +603,22 @@ class EmailClient extends Base
$maxUid = $lastUid;
try {
if ($lastUid > 0) {
$searchRange = ($lastUid + 1) . ':*';
$emails = imap_fetch_overview($imap, $searchRange, FT_UID);
} else {
$emails = imap_fetch_overview($imap, '1:*', FT_UID);
// 仅拉取最近 7 天邮件(避免首次全量导致超时)
$since = date('d-M-Y', strtotime('-7 days'));
$uids = imap_search($imap, 'SINCE "' . $since . '"', SE_UID);
if (!$uids) {
$uids = [];
}
sort($uids);
if (!$emails) {
$emails = [];
}
foreach ($uids as $uid) {
$uid = intval($uid);
if ($uid <= $lastUid) continue;
foreach ($emails as $overview) {
$uid = intval($overview->uid);
if ($uid <= $lastUid) {
// 每封邮件单独取 overviewFT_UID 模式下传 uid
$ovArr = imap_fetch_overview($imap, (string)$uid, FT_UID);
$overview = (!empty($ovArr) && isset($ovArr[0])) ? $ovArr[0] : null;
if (!$overview) {
continue;
}

View File

@@ -5,9 +5,12 @@ namespace app\api\controller;
use app\api\controller\Base;
use app\common\CitationRelevanceService;
use app\common\CrossrefService;
use app\common\QueueRedis;
use app\common\PubmedService;
use think\Validate;
use think\Db;
use think\Env;
use think\Queue;
/**
* @title 参考文献
* @description 相关方法汇总
@@ -165,9 +168,9 @@ class References extends Base
}
$apiKey = trim((string)Env::get('citation_chat_api_key', ''));
if ($apiKey === '') {
return jsonError('Please set env citation_chat_api_key for embedding via chat');
}
// if ($apiKey === '') {
// return jsonError('Please set env citation_chat_api_key for embedding via chat');
// }
$config = [
'chat_url' => trim((string)Env::get('citation_chat_url', 'http://chat.taimed.cn/v1/chat/completions')),
@@ -204,6 +207,117 @@ class References extends Base
]);
}
/**
* 提交参考文献鉴别到队列(异步)
* 参数p_refer_id
*/
public function checkCitationRelevanceQueue($aParam = [])
{
$aParam = empty($aParam) ? $this->request->post() : $aParam;
$pReferId = intval(isset($aParam['p_refer_id']) ? $aParam['p_refer_id'] : 0);
if (!$pReferId) {
return jsonError('p_refer_id is required');
}
$redisKey = 'queue_job:app\api\job\CitationRelevanceQueue:' . $pReferId;
$queueRedis = QueueRedis::getInstance();
$status = $queueRedis->getJobStatus($redisKey);
// 若已完成,直接返回已完成状态,前端可立刻去拉结果
if ($status === 'completed') {
return jsonSuccess([
'queue_key' => $redisKey,
'status' => 'completed',
]);
}
// 若正在处理,返回 processing
if ($status === 'processing') {
return jsonSuccess([
'queue_key' => $redisKey,
'status' => 'processing',
]);
}
// 推送新任务到队列
$queueId = Queue::push('app\api\job\CitationRelevanceQueue@fire', [
'p_refer_id' => $pReferId,
], 'CitationRelevanceQueue');
if (!$queueId) {
return jsonError('queue push failed');
}
return jsonSuccess([
'queue_key' => $redisKey,
'status' => 'queued',
'queue_id' => $queueId,
]);
}
/**
* 轮询获取参考文献鉴别结果
* 参数p_refer_id
*/
public function getCitationRelevanceResult($aParam = [])
{
$aParam = empty($aParam) ? $this->request->post() : $aParam;
$pReferId = intval(isset($aParam['p_refer_id']) ? $aParam['p_refer_id'] : 0);
if (!$pReferId) {
return jsonError('p_refer_id is required');
}
$redisKey = 'queue_job:app\api\job\CitationRelevanceQueue:' . $pReferId;
$queueRedis = QueueRedis::getInstance();
$status = $queueRedis->getJobStatus($redisKey);
if ($status === null || $status === false) {
return jsonSuccess([
'status' => 'not_found',
]);
}
if ($status === 'processing') {
return jsonSuccess([
'status' => 'processing',
]);
}
if ($status === 'failed') {
$raw = $queueRedis->getRedisValue($redisKey . ':result');
$data = [];
if (is_string($raw) && $raw !== '') {
$decoded = json_decode($raw, true);
if (is_array($decoded)) {
$data = $decoded;
}
}
return jsonSuccess([
'status' => 'failed',
'data' => $data,
]);
}
// completed从 Redis 取出完整结果返回
$raw = $queueRedis->getRedisValue($redisKey . ':result');
if (!is_string($raw) || $raw === '') {
return jsonSuccess([
'status' => 'completed',
'data' => [],
]);
}
$decoded = json_decode($raw, true);
if (!is_array($decoded)) {
$decoded = ['status' => 1, 'msg' => 'success', 'data' => []];
}
return jsonSuccess([
'status' => 'completed',
'data' => $decoded,
]);
}
/**
* 从 t_article_main 拼接正文,按 [n] 定位句子并取前后各 1 句作为上下文
*/
@@ -263,6 +377,248 @@ class References extends Base
return $out;
}
/**
* 根据 t_production_article_refer 构建:正文引用序号([n] 中的 n=> p_refer_id
* 规则与 Production::convertReferencesToLatex 一致:正文序号 = index + 1
*
* @param int $pArticleId production 侧 p_article_id
* @return array<int,int> 例如 [1 => 101, 2 => 102]
*/
private function buildCitationNumberToPReferIdMap(int $pArticleId): array
{
if ($pArticleId <= 0) {
return [];
}
$rows = Db::name('production_article_refer')
->where('p_article_id', $pArticleId)
->where('state', 0)
->field('p_refer_id,index')
->order('index asc')
->select();
$map = [];
foreach ($rows as $row) {
$n = intval($row['index']) + 1;
if ($n > 0 && !empty($row['p_refer_id'])) {
$map[$n] = intval($row['p_refer_id']);
}
}
return $map;
}
/**
* 解析括号内引用串(如 1,2 / 3-5 / 1,3-5展开为正文引用序号列表保留顺序不去重
*
* @param string $referencePart 不含 [] 的内层,已规范化为英文逗号与普通连字符
* @return int[]
*/
private function expandCitationBracketInner(string $referencePart): array
{
$referencePart = trim($referencePart);
if ($referencePart === '') {
return [];
}
$out = [];
$segments = preg_split('/\s*,\s*/', $referencePart);
foreach ($segments as $seg) {
$seg = trim((string)$seg);
if ($seg === '') {
continue;
}
if (preg_match('/^(\d+)\s*-\s*(\d+)$/', $seg, $m)) {
$a = intval($m[1]);
$b = intval($m[2]);
if ($a > $b) {
$t = $a;
$a = $b;
$b = $t;
}
for ($i = $a; $i <= $b; $i++) {
$out[] = $i;
}
} else {
$out[] = intval($seg);
}
}
return $out;
}
/**
* 将正文 HTML 中的 <blue>[n]</blue>(及 [1,2]、[2-4] 等)替换为 <mycite data-id="p_refer_id"></mycite>
* 找不到对应参考文献时保留原 <blue>[…]</blue>,避免丢内容。
*
* @param string $content article_main.content 等 HTML 片段
* @param int $pArticleId t_production_article_refer.p_article_id
*/
public function rewriteMainContentCitationsToMycite(string $content, int $pArticleId)
{
$map = $this->buildCitationNumberToPReferIdMap($pArticleId);
if ($map === []) {
return $content;
}
return preg_replace_callback(
'/(?:<\s*blue[^>]*>)?\[([^\]]+)\](?:<\/\s*blue\s*>)?/iu',
function (array $matches) use ($map): string {
$inner = trim((string)$matches[1]);
if ($inner === '') {
return $matches[0];
}
// 仅处理数字引用,避免误伤 [Fig 1] 等
$innerNorm = str_replace(['', '', '—'], [',', '-', '-'], $inner);
if (!preg_match('/^[\d\s,\-]+$/', $innerNorm)) {
return $matches[0];
}
$nums = $this->expandCitationBracketInner($innerNorm);
if ($nums === []) {
return $matches[0];
}
$ids = [];
foreach ($nums as $n) {
if ($n <= 0) {
continue;
}
if (empty($map[$n])) {
// 有任意一个序号无法映射到 p_refer_id则保持原始片段不变避免丢引用信息
return $matches[0];
}
$ids[] = (string)intval($map[$n]);
}
if ($ids === []) {
return $matches[0];
}
return '<mycite data-id="' . implode(',', $ids) . '"></mycite>';
},
$content
);
}
/**
* 接口:将 content 中的 blue 引用替换为 mycite需传 p_article_id
*/
public function convertMainCitationsToMycite()
{
$data = $this->request->post();
$rule = new Validate([
"am_id"=>"require",
"article_id"=>"require"
]);
if(!$rule->check($data)){
return jsonError($rule->getError());
}
$main_info = $this->article_main_obj->where("am_id",$data['am_id'])->find();
$p_info = $this->production_article_obj->where("article_id",$data['article_id'])->where("state",0)->find();
if(!$p_info||!$main_info){
return jsonError('production_article_id not found');
}
$pArticleId = $p_info['p_article_id'];
$content = $main_info['content'];
$out = $this->rewriteMainContentCitationsToMycite($content, $pArticleId);
return jsonSuccess(['content' => $out]);
}
/**
* 批量处理并回写 t_article_main.content
* 将正文中的 <blue>[n]</blue> / [1,2] / [2-4] 改写为 <mycite data-id="..."></mycite>
*
* 参数:
* - p_article_id (必填)production 侧文章ID
* - type (可选):默认 0仅文本 main传空则处理所有 type
* - dry_run (可选)1=只预览不落库
*/
public function convertArticleMainCitationsToMycite($aParam = [])
{
$aParam = empty($aParam) ? $this->request->post() : $aParam;
$pArticleId = intval($aParam['p_article_id'] ?? 0);
if ($pArticleId <= 0) {
return jsonError('p_article_id is required');
}
// 通过 production_article -> article_id确保是当前系统存在的文章
$aArticle = $this->getArticle(['p_article_id' => $pArticleId]);
$iStatus = empty($aArticle['status']) ? 0 : $aArticle['status'];
if ($iStatus != 1) {
return json_encode($aArticle);
}
$aArticle = empty($aArticle['data']) ? [] : $aArticle['data'];
$articleId = intval($aArticle['article_id'] ?? 0);
if ($articleId <= 0) {
return jsonError('Article not found');
}
$dryRun = intval($aParam['dry_run'] ?? 0) === 1;
$type = isset($aParam['type']) ? $aParam['type'] : 0;
$query = Db::name('article_main')
->where('article_id', $articleId)
->whereIn('state', [0, 2])
->order('sort asc');
if ($type !== '' && $type !== null) {
$query->where('type', intval($type));
}
$mains = $query->field('am_id,content,type,sort')->select();
if (empty($mains)) {
return jsonError('article_main is empty');
}
$changed = 0;
$preview = [];
Db::startTrans();
try {
foreach ($mains as $row) {
$amId = intval($row['am_id']);
$old = (string)($row['content'] ?? '');
if ($old === '') {
continue;
}
$new = $this->rewriteMainContentCitationsToMycite($old, $pArticleId);
if ($new === $old) {
continue;
}
$changed++;
if (count($preview) < 3) {
$preview[] = [
'am_id' => $amId,
'type' => intval($row['type'] ?? 0),
'sort' => intval($row['sort'] ?? 0),
'before'=> $old,
'after' => $new,
];
}
if (!$dryRun) {
Db::name('article_main')
->where('am_id', $amId)
->limit(1)
->update([
'content' => $new,
'update_time' => time(),
]);
}
}
if ($dryRun) {
Db::rollback();
} else {
Db::commit();
}
} catch (\Exception $e) {
Db::rollback();
return jsonError('convert failed: ' . $e->getMessage());
}
return jsonSuccess([
'article_id' => $articleId,
'p_article_id' => $pArticleId,
'dry_run' => $dryRun ? 1 : 0,
'total' => count($mains),
'changed' => $changed,
'preview' => $preview,
]);
}
/**
* 修改参考文献的信息
* @param p_refer_id 主键ID

View File

@@ -0,0 +1,90 @@
<?php
namespace app\api\job;
use think\queue\Job;
use app\common\QueueJob;
use app\common\QueueRedis;
use app\api\controller\References;
class CitationRelevanceQueue
{
private $oQueueJob;
private $QueueRedis;
private $lockExpire = 900; // 单条参考文献鉴别锁 15 分钟
private $completedExprie = 86400; // 结果保留 1 天
public function __construct()
{
$this->oQueueJob = new QueueJob;
$this->QueueRedis = QueueRedis::getInstance();
}
public function fire(Job $job, $data)
{
// 通用初始化(进程超时 / Redis / DB 自检)
$this->oQueueJob->init($job);
// 校验参数
$pReferId = empty($data['p_refer_id']) ? 0 : intval($data['p_refer_id']);
if (empty($pReferId)) {
$this->oQueueJob->log("无效的 p_refer_id删除任务");
$job->delete();
return;
}
// 生成 Redis 键:一个参考文献只允许一个队列在跑
$sClassName = get_class($this);
$sRedisKey = "queue_job:{$sClassName}:{$pReferId}";
$sRedisValue = uniqid() . '_' . getmypid();
// 加锁并标记为 processing幂等控制
$isLocked = $this->QueueRedis->startJob($sRedisKey, $sRedisValue, $this->lockExpire);
if (!$isLocked) {
$jobStatus = $this->QueueRedis->getJobStatus($sRedisKey);
if (in_array($jobStatus, ['completed', 'failed'])) {
$this->oQueueJob->log("任务已完成或失败,删除任务 | 状态: {$jobStatus}");
$job->delete();
} else {
$attempts = $job->attempts();
if ($attempts >= 3) {
$this->oQueueJob->log("超过最大重试次数,停止重试");
$job->delete();
} else {
$lockTtl = $this->QueueRedis->getLockTtl($sRedisKey);
$delay = $lockTtl > 0 ? $lockTtl + 5 : 30;
$this->oQueueJob->log("锁竞争,{$delay}秒后重试({$attempts}/3)");
$job->release($delay);
}
}
return;
}
try {
// 实际调用 References 控制器里的同步方法,拿到完整 JSON 结果
$ctrl = new References();
$resp = $ctrl->checkCitationRelevance(['p_refer_id' => $pReferId]);
$decoded = is_string($resp) ? json_decode($resp, true) : $resp;
if (!is_array($decoded)) {
$decoded = ['status' => 2, 'msg' => 'Unexpected response from checkCitationRelevance', 'raw' => $resp];
}
// 结果写入 Redis供 HTTP 轮询读取
$this->QueueRedis->setRedisValue($sRedisKey . ':result', json_encode($decoded, JSON_UNESCAPED_UNICODE));
// 标记完成并释放锁
$this->QueueRedis->finishJob($sRedisKey, 'completed', $this->completedExprie, $sRedisValue);
$job->delete();
$this->oQueueJob->log("CitationRelevanceQueue 任务执行成功 | p_refer_id: {$pReferId}");
} catch (\RuntimeException $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job);
} catch (\LogicException $e) {
$this->oQueueJob->handleNonRetryableException($e, $sRedisKey, $sRedisValue, $job);
} catch (\Exception $e) {
$this->oQueueJob->handleRetryableException($e, $sRedisKey, $sRedisValue, $job);
} finally {
$this->oQueueJob->finnal();
}
}
}