Files
tougao/application/common/ExpertFinderService.php
wangjinlei 4417a7ea28 1
2026-04-17 09:51:09 +08:00

751 lines
27 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\common;
use think\Db;
use think\Queue;
use GuzzleHttp\Client;
use think\Env;
class ExpertFinderService
{
private $httpClient;
private $ncbiBaseUrl = 'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/';
private $logFile;
public function __construct()
{
$this->httpClient = new Client([
'timeout' => 180,
'connect_timeout' => 15,
'verify' => false,
]);
$this->logFile = ROOT_PATH . 'runtime' . DS . 'expert_finder.log';
}
public function doFetchForField($field, $source = 'pubmed', $perPage = 100, $minYear = null)
{
if ($minYear === null) {
$minYear = date('Y') - 3;
}
$fetchLog = $this->getFetchLog($field, $source);
$page = $fetchLog['last_page'] + 1;
if ($source === 'pmc') {
$result = $this->searchViaPMC($field, $perPage, $minYear, $page);
} else {
$result = $this->searchViaPubMed($field, $perPage, $minYear, $page);
}
if(!isset($result['total'])){
return [
"has_more"=>"no"
];
}
$saveResult = $this->saveExperts($result['experts'], $field, $source);
$nextPage = $result['has_more'] ? $page : $fetchLog['last_page'];
$totalPages = $result['total_pages'] ?? $fetchLog['total_pages'];
$this->updateFetchLog($field, $source, $nextPage, $totalPages);
return [
'keyword' => $field,
'page' => $page,
'experts_found' => $result['total'],
'saved_new' => $saveResult['inserted'],
'saved_exist' => $saveResult['existing'],
'list' => $result['experts'],
'field_enriched' => $saveResult['field_enriched'],
'has_more' => $result['has_more'],
];
}
public function searchExperts($keyword, $perPage, $minYear, $page, $source)
{
if ($source === 'pmc') {
return $this->searchViaPMC($keyword, $perPage, $minYear, $page);
}
return $this->searchViaPubMed($keyword, $perPage, $minYear, $page);
}
public function saveExperts($experts, $field, $source)
{
$inserted = 0;
$existing = 0;
$fieldEnrich = 0;
foreach ($experts as $expert) {
$email = strtolower(trim($expert['email']));
if (empty($email)) {
continue;
}
$exists = Db::name('expert')->where('email', $email)->find();
$expertId = null;
if ($exists) {
$existing++;
$expertId = intval($exists['expert_id']);
} else {
try {
$expertId = Db::name('expert')->insertGetId([
'name' => mb_substr($expert['name'], 0, 255),
'email' => mb_substr($email, 0, 128),
'affiliation' => mb_substr($expert['affiliation'], 0, 128),
'source' => mb_substr($source, 0, 128),
'ctime' => time(),
'ltime' => 0,
'state' => 0,
]);
$inserted++;
} catch (\Exception $e) {
$existing++;
continue;
}
}
$papers = (isset($expert['papers']) && is_array($expert['papers'])) ? $expert['papers'] : [];
$fieldEnrich += $this->saveFieldWithPapers($expertId, $field, $source, $papers);
}
return ['inserted' => $inserted, 'existing' => $existing, 'field_enriched' => $fieldEnrich];
}
/**
* 保存领域与论文的关联。
* 有论文时每篇论文一行expert_id + field + source + paper_article_id 去重)。
* 无论文时只存一条领域行expert_id + field 去重)。
*/
private function saveFieldWithPapers($expertId, $field, $source, $papers)
{
$field = trim($field);
if (empty($field)) return 0;
$added = 0;
if (empty($papers)) {
$exists = Db::name('expert_field')
->where('expert_id', $expertId)
->where('field', $field)
->where('state', 0)
->find();
if (!$exists) {
Db::name('expert_field')->insert([
'expert_id' => $expertId,
'source' => mb_substr((string)$source, 0, 64),
'field' => mb_substr($field, 0, 128),
'paper_title' => '',
'paper_article_id' => '',
'paper_journal' => '',
'state' => 0,
]);
$added = 1;
}
} else {
foreach ($papers as $paper) {
$articleId = isset($paper['article_id']) ? (string)$paper['article_id'] : '';
if ($articleId === '' || $articleId === '0') {
continue;
}
$check = Db::name('expert_field')
->where('expert_id', $expertId)
->where('field', $field)
->where('source', $source)
->where('paper_article_id', $articleId)
->where('state', 0)
->find();
if ($check) {
continue;
}
Db::name('expert_field')->insert([
'expert_id' => $expertId,
'source' => mb_substr((string)$source, 0, 64),
'paper_title' => isset($paper['title']) ? mb_substr((string)$paper['title'], 0, 255) : '',
'paper_article_id' => mb_substr($articleId, 0, 64),
'paper_journal' => isset($paper['journal']) ? mb_substr((string)$paper['journal'], 0, 255) : '',
'field' => mb_substr($field, 0, 128),
'state' => 0,
]);
$added++;
}
}
return $added;
}
public function getFetchLog($field, $source)
{
$log = Db::name('expert_fetch')
->where('field', $field)
->where('source', $source)
->find();
if (!$log) {
return ['last_page' => 0, 'total_pages' => 0, 'last_time' => 0];
}
return $log;
}
public function updateFetchLog($field, $source, $lastPage, $totalPages)
{
$exists = Db::name('expert_fetch')
->where('field', $field)
->where('source', $source)
->find();
if ($exists) {
Db::name('expert_fetch')
->where('expert_fetch_id', $exists['expert_fetch_id'])
->update([
'last_page' => $lastPage,
'total_pages' => $totalPages,
'last_time' => time(),
]);
} else {
Db::name('expert_fetch')->insert([
'field' => mb_substr($field, 0, 128),
'source' => mb_substr($source, 0, 128),
'last_page' => $lastPage,
'total_pages' => $totalPages,
'last_time' => time(),
]);
}
}
// ==================== PubMed Search ====================
private function searchViaPubMed($keyword, $perPage, $minYear, $page = 1)
{
set_time_limit(600);
$searchResult = $this->esearch('pubmed', $keyword, $perPage, $minYear, $page);
$ids = $searchResult['ids'];
$totalArticles = $searchResult['total'];
if (empty($ids)) {
return $this->buildPagedResult([], 0, 0, $totalArticles, $page, $perPage, 'pubmed');
}
$allAuthors = [];
$batches = array_chunk($ids, 50);
foreach ($batches as $batch) {
$xml = $this->efetchWithRetry('pubmed', $batch);
if ($xml) {
$authors = $this->parsePubMedXml($xml);
$allAuthors = array_merge($allAuthors, $authors);
}
usleep(400000);
}
$experts = $this->aggregateExperts($allAuthors);
return $this->buildPagedResult($experts, count($experts), count($ids), $totalArticles, $page, $perPage, 'pubmed');
}
// ==================== PMC Search ====================
private function searchViaPMC($keyword, $perPage, $minYear, $page = 1)
{
set_time_limit(600);
$searchResult = $this->esearch('pmc', $keyword, $perPage, $minYear, $page);
$ids = $searchResult['ids'];
$totalArticles = $searchResult['total'];
if (empty($ids)) {
return $this->buildPagedResult([], 0, 0, $totalArticles, $page, $perPage, 'pmc');
}
$allAuthors = [];
$batches = array_chunk($ids, 5);
foreach ($batches as $batch) {
$xml = $this->efetchWithRetry('pmc', $batch);
if ($xml) {
$authors = $this->parsePMCXml($xml);
$allAuthors = array_merge($allAuthors, $authors);
}
usleep(500000);
}
$experts = $this->aggregateExperts($allAuthors);
return $this->buildPagedResult($experts, count($experts), count($ids), $totalArticles, $page, $perPage, 'pmc');
}
// ==================== NCBI API ====================
private function esearch($db, $keyword, $perPage, $minYear, $page = 1)
{
$term = $keyword . ' AND ' . $minYear . ':' . date('Y') . '[pdat]';
$retstart = ($page - 1) * $perPage;
$response = $this->httpClient->get($this->ncbiBaseUrl . 'esearch.fcgi', [
'query' => [
'db' => $db,
'term' => $term,
'retstart' => $retstart,
'retmax' => $perPage,
'retmode' => 'json',
'sort' => 'relevance',
],
]);
$data = json_decode($response->getBody()->getContents(), true);
$ids = $data['esearchresult']['idlist'] ?? [];
$total = intval($data['esearchresult']['count'] ?? 0);
return ['ids' => $ids, 'total' => $total];
}
private function efetch($db, $ids)
{
$response = $this->httpClient->post($this->ncbiBaseUrl . 'efetch.fcgi', [
'form_params' => [
'db' => $db,
'id' => implode(',', $ids),
'retmode' => 'xml',
],
]);
return $response->getBody()->getContents();
}
private function efetchWithRetry($db, $ids, $maxRetries = 3)
{
for ($attempt = 1; $attempt <= $maxRetries; $attempt++) {
try {
return $this->efetch($db, $ids);
} catch (\Exception $e) {
if ($attempt === $maxRetries) {
if (count($ids) > 1) {
$half = ceil(count($ids) / 2);
$firstHalf = array_slice($ids, 0, $half);
$secondHalf = array_slice($ids, $half);
$xml1 = $this->efetchWithRetry($db, $firstHalf, 2);
$xml2 = $this->efetchWithRetry($db, $secondHalf, 2);
return $this->mergeXml($xml1, $xml2);
}
return null;
}
sleep($attempt * 2);
}
}
return null;
}
private function mergeXml($xml1, $xml2)
{
if (empty($xml1)) return $xml2;
if (empty($xml2)) return $xml1;
return $xml1 . "\n" . $xml2;
}
// ==================== PubMed XML Parsing ====================
private function parsePubMedXml($xmlString)
{
$results = [];
libxml_use_internal_errors(true);
$xml = simplexml_load_string($xmlString);
if ($xml === false) {
return $results;
}
foreach ($xml->PubmedArticle as $article) {
$citation = $article->MedlineCitation;
$articleData = $citation->Article;
$title = $this->xmlNodeToString($articleData->ArticleTitle);
$pmid = (string) $citation->PMID;
$journal = '';
if (isset($articleData->Journal->Title)) {
$journal = (string) $articleData->Journal->Title;
}
if (!isset($articleData->AuthorList->Author)) {
continue;
}
foreach ($articleData->AuthorList->Author as $author) {
$lastName = (string) ($author->LastName ?? '');
$foreName = (string) ($author->ForeName ?? '');
$fullName = trim($foreName . ' ' . $lastName);
if (empty($fullName)) continue;
$email = '';
$affiliation = '';
if (isset($author->AffiliationInfo)) {
foreach ($author->AffiliationInfo as $affInfo) {
$affText = (string) $affInfo->Affiliation;
if (empty($affiliation)) $affiliation = $affText;
if (empty($email)) $email = $this->extractEmailFromText($affText);
}
}
if (empty($email)) continue;
$results[] = [
'name' => $fullName,
'email' => strtolower($email),
'affiliation' => $this->cleanAffiliation($affiliation),
'article_title' => $title,
'article_id' => $pmid,
'journal' => $journal,
];
}
}
return $results;
}
// ==================== PMC XML Parsing ====================
private function parsePMCXml($xmlString)
{
$results = [];
libxml_use_internal_errors(true);
$xml = simplexml_load_string($xmlString);
if ($xml === false) {
return $results;
}
$articles = $xml->article ?? $xml->children();
foreach ($articles as $article) {
if ($article->getName() !== 'article') continue;
$front = $article->front;
if (!$front) continue;
$articleMeta = $front->{'article-meta'};
if (!$articleMeta) continue;
$title = $this->xmlNodeToString($articleMeta->{'title-group'}->{'article-title'} ?? null);
$pmcId = '';
if (isset($articleMeta->{'article-id'})) {
foreach ($articleMeta->{'article-id'} as $idNode) {
if ((string) $idNode['pub-id-type'] === 'pmc') {
$pmcId = (string) $idNode;
}
}
}
$journal = '';
if (isset($front->{'journal-meta'}->{'journal-title'})) {
$journal = (string) $front->{'journal-meta'}->{'journal-title'};
} elseif (isset($front->{'journal-meta'}->{'journal-title-group'}->{'journal-title'})) {
$journal = (string) $front->{'journal-meta'}->{'journal-title-group'}->{'journal-title'};
}
$correspEmails = [];
if (isset($articleMeta->{'author-notes'})) {
$this->extractEmailsFromNode($articleMeta->{'author-notes'}, $correspEmails);
}
$affiliationMap = [];
if (isset($articleMeta->{'contrib-group'})) {
foreach ($articleMeta->{'contrib-group'}->children() as $child) {
if ($child->getName() === 'aff') {
$affId = (string) ($child['id'] ?? '');
$affText = $this->xmlNodeToString($child);
if ($affId) $affiliationMap[$affId] = $affText;
}
}
}
if (isset($front->{'article-meta'}->{'aff'})) {
foreach ($front->{'article-meta'}->{'aff'} as $aff) {
$affId = (string) ($aff['id'] ?? '');
$affText = $this->xmlNodeToString($aff);
if ($affId) $affiliationMap[$affId] = $affText;
}
}
if (!isset($articleMeta->{'contrib-group'})) continue;
foreach ($articleMeta->{'contrib-group'}->contrib as $contrib) {
if ((string) ($contrib['contrib-type'] ?? '') !== 'author') continue;
$nameNode = $contrib->name;
if (!$nameNode) continue;
$surname = (string) ($nameNode->surname ?? '');
$givenNames = (string) ($nameNode->{'given-names'} ?? '');
$fullName = trim($givenNames . ' ' . $surname);
if (empty($fullName)) continue;
$email = '';
if (isset($contrib->email)) {
$email = strtolower(trim((string) $contrib->email));
}
$affiliation = '';
if (isset($contrib->xref)) {
foreach ($contrib->xref as $xref) {
if ((string) $xref['ref-type'] === 'aff') {
$rid = (string) $xref['rid'];
if (isset($affiliationMap[$rid])) {
$affiliation = $affiliationMap[$rid];
break;
}
}
}
}
if (empty($affiliation) && isset($contrib->aff)) {
$affiliation = $this->xmlNodeToString($contrib->aff);
}
$isCorresponding = false;
if (isset($contrib->xref)) {
foreach ($contrib->xref as $xref) {
if ((string) $xref['ref-type'] === 'corresp') $isCorresponding = true;
}
}
if ((string) ($contrib['corresp'] ?? '') === 'yes') $isCorresponding = true;
if (empty($email) && $isCorresponding && !empty($correspEmails)) {
$email = $correspEmails[0];
}
if (empty($email)) {
$extracted = $this->extractEmailFromText($affiliation);
if ($extracted) $email = $extracted;
}
if (empty($email)) continue;
$results[] = [
'name' => $fullName,
'email' => strtolower($email),
'affiliation' => $this->cleanAffiliation($affiliation),
'article_title' => $title,
'article_id' => $pmcId,
'journal' => $journal,
];
}
}
return $results;
}
// ==================== Aggregation / Pagination ====================
private function aggregateExperts($authorRecords)
{
$map = [];
foreach ($authorRecords as $record) {
$key = strtolower(trim($record['email']));
if (empty($key)) continue;
if (!isset($map[$key])) {
$map[$key] = [
'name' => $record['name'],
'email' => $record['email'],
'affiliation' => $record['affiliation'],
'paper_count' => 0,
'papers' => [],
];
}
$map[$key]['paper_count']++;
if (count($map[$key]['papers']) < 10) {
$map[$key]['papers'][] = [
'title' => $record['article_title'],
'article_id' => $record['article_id'],
'journal' => $record['journal'],
];
}
if (empty($map[$key]['affiliation']) && !empty($record['affiliation'])) {
$map[$key]['affiliation'] = $record['affiliation'];
}
}
$experts = array_values($map);
usort($experts, function ($a, $b) {
return $b['paper_count'] - $a['paper_count'];
});
return $experts;
}
private function buildPagedResult($experts, $expertCount, $articlesScanned, $totalArticles, $page, $perPage, $source)
{
$totalPages = $totalArticles > 0 ? ceil($totalArticles / $perPage) : 0;
return [
'experts' => $experts,
'total' => $expertCount,
'articles_scanned' => $articlesScanned,
'total_articles' => $totalArticles,
'page' => $page,
'per_page' => $perPage,
'total_pages' => $totalPages,
'has_more' => $page < $totalPages,
'source' => $source,
];
}
// ==================== Country Resolution ====================
/**
* 启动国家解析链:找到下一个缺国家的专家推入队列。
* 队列 Job 处理完一个后会再调此方法,自动找下一个,直到全部处理完。
* 控制器只需调一次即可。
*
* @param int $delay 延迟秒数防止打满模型默认1秒
* @return bool 是否成功推入了一条
*/
/**
* 启动国家解析链:找到下一个缺国家的专家推入指定队列。
*
* @param int $delay 延迟秒数
* @param string $queue 队列名(不同队列跑不同 worker互不阻塞
* @param string $chatUrl 该链使用的模型地址(为空则用默认)
* @return bool
*/
public function enqueueNextCountryFill($delay = 1, $queue = 'FetchExperts', $chatUrl = '')
{
$row = Db::name('expert')
->where('affiliation', '<>', '')
->where('country_id', 0)
->where('state', '<>', 5)
->field('expert_id, affiliation')
->order('expert_id asc')
->find();
if (!$row) {
$this->log('[CountryFill] no more pending experts, queue=' . $queue);
return false;
}
$data = [
'expert_id' => intval($row['expert_id']),
'affiliation' => trim((string)$row['affiliation']),
'queue' => $queue,
'chat_url' => $chatUrl,
];
$jobClass = 'app\api\job\FillExpertCountry@fire';
if ($delay > 0) {
Queue::later($delay, $jobClass, $data, $queue);
} else {
Queue::push($jobClass, $data, $queue);
}
return true;
}
/**
* 对单个专家执行国家解析(同步),由队列 Job FillExpertCountry 调用,也可直接调用测试。
*/
public function fillExpertCountry($expertId, $affiliation, $chatUrl = '')
{
$affiliation = trim((string)$affiliation);
if ($affiliation === '') {
Db::name('expert')->where('expert_id', intval($expertId))->update(['country_id' => -1]);
return;
}
$defaultUrl = trim((string)Env::get('expert_country_chat_url', Env::get('citation_chat_url', 'http://chat.taimed.cn/v1/chat/completions')));
$url = ($chatUrl !== '') ? $chatUrl : $defaultUrl;
$resolver = new CountryResolverService([
'chat_url' => $url,
'chat_model' => trim((string)Env::get('expert_country_chat_model', Env::get('citation_chat_model', 'gpt-4.1'))),
'api_key' => trim((string)Env::get('expert_country_chat_api_key', Env::get('citation_chat_api_key', ''))),
'timeout' => max(20, intval(Env::get('expert_country_chat_timeout', 60))),
]);
$result = $resolver->resolve($affiliation);
$countryId = 0;
$enName = '';
if (!empty($result['code'])) {
$row = Db::name('country')->where('code', strtoupper(trim((string)$result['code'])))->find();
if ($row) {
$countryId = intval($row['country_id']);
$enName = (string)$row['en_name'];
}
}
if ($countryId === 0 && !empty($result['en_name'])) {
$row = Db::name('country')
->whereRaw("LOWER(en_name) = ?", [strtolower(trim((string)$result['en_name']))])
->find();
if ($row) {
$countryId = intval($row['country_id']);
$enName = (string)$row['en_name'];
}
}
if ($countryId > 0 && $enName !== '') {
Db::name('expert')->where('expert_id', intval($expertId))->update([
'country_id' => $countryId,
'country' => $enName,
]);
} else {
// country_id = -1 表示「已尝试但未识别」,避免链式执行时反复卡在同一条
Db::name('expert')->where('expert_id', intval($expertId))->update([
'country_id' => -1,
]);
$this->log('[CountryFill] expert_id=' . $expertId . ' unresolved, code=' . ($result['code'] ?? '') . ' en_name=' . ($result['en_name'] ?? ''));
}
}
// ==================== Text Helpers ====================
private function extractEmailFromText($text)
{
if (empty($text)) return '';
if (preg_match('/[Ee]lectronic address:\s*([^\s;,]+@[^\s;,]+)/', $text, $m)) {
return strtolower(trim($m[1], '.'));
}
if (preg_match('/[Ee]-?mail:\s*([^\s;,]+@[^\s;,]+)/', $text, $m)) {
return strtolower(trim($m[1], '.'));
}
if (preg_match('/\b([a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,})\b/', $text, $m)) {
return strtolower(trim($m[1], '.'));
}
return '';
}
private function extractEmailsFromNode($node, &$emails)
{
if ($node === null) return;
foreach ($node->children() as $child) {
if ($child->getName() === 'email') {
$email = strtolower(trim((string) $child));
if (!empty($email) && !in_array($email, $emails)) $emails[] = $email;
}
$this->extractEmailsFromNode($child, $emails);
}
$text = (string) $node;
if (preg_match_all('/\b([a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,})\b/', $text, $matches)) {
foreach ($matches[1] as $email) {
$email = strtolower(trim($email, '.'));
if (!in_array($email, $emails)) $emails[] = $email;
}
}
}
private function cleanAffiliation($text)
{
$text = preg_replace('/\s*[Ee]lectronic address:\s*[^\s;,]+@[^\s;,]+/', '', $text);
$text = preg_replace('/\s*[Ee]-?mail:\s*[^\s;,]+@[^\s;,]+/', '', $text);
$text = preg_replace('/\s*\b[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}\b/', '', $text);
return trim($text, " \t\n\r\0\x0B.,;");
}
private function xmlNodeToString($node)
{
if ($node === null) return '';
$xml = $node->asXML();
$text = strip_tags($xml);
$text = html_entity_decode($text, ENT_QUOTES | ENT_XML1, 'UTF-8');
return trim(preg_replace('/\s+/', ' ', $text));
}
// ==================== Logging ====================
public function log($msg)
{
$line = date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
@file_put_contents($this->logFile, $line, FILE_APPEND);
}
}