From 1ba1c6a4189535c2b35651489daa48fa91c157d0 Mon Sep 17 00:00:00 2001 From: wuchunlei Date: Mon, 12 May 2025 13:10:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86ragflow=E5=8F=82=E6=95=B0=E5=86=99?= =?UTF-8?q?=E9=81=93=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../peanut/common/utils/RagFlowApiUtil.java | 149 ++++++++++++++++++ .../java/com/peanut/config/AsyncConfig.java | 16 ++ .../modules/app/config/WebMvcConfig.java | 14 ++ .../controller/RagFlowApiController.java | 133 ++-------------- src/main/resources/application.yml | 8 +- 5 files changed, 197 insertions(+), 123 deletions(-) diff --git a/src/main/java/com/peanut/common/utils/RagFlowApiUtil.java b/src/main/java/com/peanut/common/utils/RagFlowApiUtil.java index d735d2a5..cc97367a 100644 --- a/src/main/java/com/peanut/common/utils/RagFlowApiUtil.java +++ b/src/main/java/com/peanut/common/utils/RagFlowApiUtil.java @@ -1,13 +1,162 @@ package com.peanut.common.utils; +import com.alibaba.fastjson.JSONObject; +import org.apache.http.Consts; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; @Component public class RagFlowApiUtil { + @Value("${ragflow.url}") + private String url; + @Value("${ragflow.authorization}") + private String authorization; + //聊天助手列表 + public R getChatAssistants(String chatId) throws Exception{ + CloseableHttpClient httpClient = HttpClients.createDefault(); + HttpGet get = new HttpGet(url+"/api/v1/chats?id="+chatId); + get.setHeader("Authorization", authorization); + get.setHeader("Content-Type", "application/json;chartset=utf-8"); + CloseableHttpResponse response = httpClient.execute(get); + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode >= 400) { + throw new RuntimeException("API调用失败,状态码:" + statusCode); + } + HttpEntity responseEntity = response.getEntity(); + String responseString = EntityUtils.toString(responseEntity, Consts.UTF_8); + JSONObject jsonObject = JSONObject.parseObject(responseString); + List> list = new ArrayList(); + if ("0".equals(jsonObject.get("code").toString())){ + List l = jsonObject.getJSONArray("data"); + for (Object o : l) { + Map map = new HashMap<>(); + Map m = (Map)o; + map.put("id",m.get("id")); + map.put("name",m.get("name")); + list.add(map); + } + } + return R.ok().put("list",list); + } + //聊天助手下对话列表 + public R getChats(Map params) throws Exception{ + CloseableHttpClient httpClient = HttpClients.createDefault(); + String chatId = params.get("chatId").toString(); + String page = params.get("page").toString(); + String pageSize = params.get("pageSize").toString(); + String sessionId = params.get("sessionId").toString(); + String userId = params.get("userId").toString(); + HttpGet get = new HttpGet(url+"/api/v1/chats/"+chatId+"/sessions?page="+page+"&page_size="+pageSize+"&id="+sessionId+"&user_id="+userId); + get.setHeader("Authorization", authorization); + get.setHeader("Content-Type", "application/json;chartset=utf-8"); + CloseableHttpResponse response = httpClient.execute(get); + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode >= 400) { + throw new RuntimeException("API调用失败,状态码:" + statusCode); + } + HttpEntity responseEntity = response.getEntity(); + String responseString = EntityUtils.toString(responseEntity, Consts.UTF_8); + JSONObject jsonObject = JSONObject.parseObject(responseString); + List> list = new ArrayList(); + if ("0".equals(jsonObject.get("code").toString())){ + List l = jsonObject.getJSONArray("data"); + for (Object o : l) { + Map map = new HashMap<>(); + Map m = (Map)o; + map.put("id",m.get("id")); + map.put("name",m.get("name")); + map.put("messages",m.get("messages")); + list.add(map); + } + } + return R.ok().put("list",list); + } + + //创建会话 + public String createChat(Map params) throws Exception{ + CloseableHttpClient httpClient = HttpClients.createDefault(); + String chatId = params.get("chatId").toString(); + Map entity = new HashMap<>(); + entity.put("name", params.get("name").toString()); + entity.put("user_id", params.get("userId").toString()); + HttpPost post = new HttpPost(url+"/api/v1/chats/"+chatId+"/sessions"); + post.setHeader("Authorization", authorization); + post.setHeader("Content-Type", "application/json;chartset=utf-8"); + post.setEntity(new StringEntity(JSONObject.toJSONString(entity),"utf-8")); + CloseableHttpResponse response = httpClient.execute(post); + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode >= 400) { + throw new RuntimeException("API调用失败,状态码:" + statusCode); + } + HttpEntity responseEntity = response.getEntity(); + String responseString = EntityUtils.toString(responseEntity, Consts.UTF_8); + JSONObject jsonObject = JSONObject.parseObject(responseString); + if ("0".equals(jsonObject.get("code").toString())){ + return ((JSONObject)jsonObject.get("data")).get("id").toString(); + } + return ""; + } + + //与助手聊天 + public R chatToAssistant(Map params) throws Exception{ + CloseableHttpClient httpClient = HttpClients.createDefault(); + String chatId = params.get("chatId").toString(); + Map entity = new HashMap<>(); + entity.put("question", params.get("question").toString()); + entity.put("session_id", params.get("sessionId").toString()); + entity.put("user_id", params.get("userId").toString()); + HttpPost post = new HttpPost(url+"/api/v1/chats/"+chatId+"/completions"); + post.setHeader("Authorization", authorization); + post.setHeader("Content-Type", "application/json;chartset=utf-8"); + post.setEntity(new StringEntity(JSONObject.toJSONString(entity),"utf-8")); + CloseableHttpResponse response = httpClient.execute(post); + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode >= 400) { + throw new RuntimeException("API调用失败,状态码:" + statusCode); + } + HttpEntity responseEntity = response.getEntity(); + String responseString = EntityUtils.toString(responseEntity, Consts.UTF_8); + return R.ok().put("res",responseString); + } + + //与助手聊天流式 + public Flux chatToAssistantStream(Map params) { + try { + String chatId = params.get("chatId").toString(); + Map entity = new HashMap<>(); + entity.put("question", params.get("question").toString()); + entity.put("stream", true); + entity.put("session_id", params.get("sessionId").toString()); + entity.put("user_id", params.get("userId").toString()); + return WebClient.create().post() + .uri(url+"/api/v1/chats/"+chatId+"/completions") + .header("Authorization", authorization) + .header("Content-Type", "application/json;chartset=utf-8") + .bodyValue(JSONObject.toJSONString(entity)) + .retrieve() + .bodyToFlux(String.class); + }catch (Exception e){ + e.printStackTrace(); + } + return null; + } } diff --git a/src/main/java/com/peanut/config/AsyncConfig.java b/src/main/java/com/peanut/config/AsyncConfig.java index af100663..b2145fc8 100644 --- a/src/main/java/com/peanut/config/AsyncConfig.java +++ b/src/main/java/com/peanut/config/AsyncConfig.java @@ -2,11 +2,13 @@ package com.peanut.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync @@ -21,4 +23,18 @@ public class AsyncConfig implements AsyncConfigurer { executor.initialize(); return executor; } + + @Bean("fluxTaskExecutor") + public AsyncTaskExecutor fluxTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); // CPU密集型建议值 + executor.setMaxPoolSize(50); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("flux-exec-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + + } diff --git a/src/main/java/com/peanut/modules/app/config/WebMvcConfig.java b/src/main/java/com/peanut/modules/app/config/WebMvcConfig.java index bb763a3b..62e90718 100644 --- a/src/main/java/com/peanut/modules/app/config/WebMvcConfig.java +++ b/src/main/java/com/peanut/modules/app/config/WebMvcConfig.java @@ -11,12 +11,16 @@ package com.peanut.modules.app.config; import com.peanut.modules.app.interceptor.AuthorizationInterceptor; import com.peanut.modules.app.resolver.LoginUserHandlerMethodArgumentResolver; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.web.method.support.HandlerMethodArgumentResolver; +import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import java.util.List; +import java.util.concurrent.Executor; /** * MVC配置 @@ -30,6 +34,15 @@ public class WebMvcConfig implements WebMvcConfigurer { @Autowired private LoginUserHandlerMethodArgumentResolver loginUserHandlerMethodArgumentResolver; + @Autowired + @Qualifier("fluxTaskExecutor") + private AsyncTaskExecutor taskExecutor; + + @Override + public void configureAsyncSupport(AsyncSupportConfigurer configurer) { + configurer.setTaskExecutor(taskExecutor); + configurer.setDefaultTimeout(30_000); // 超时时间设为30秒 + } @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(authorizationInterceptor).addPathPatterns("/app/**"); @@ -39,4 +52,5 @@ public class WebMvcConfig implements WebMvcConfigurer { public void addArgumentResolvers(List argumentResolvers) { argumentResolvers.add(loginUserHandlerMethodArgumentResolver); } + } \ No newline at end of file diff --git a/src/main/java/com/peanut/modules/common/controller/RagFlowApiController.java b/src/main/java/com/peanut/modules/common/controller/RagFlowApiController.java index 1e439ded..2c9800f0 100644 --- a/src/main/java/com/peanut/modules/common/controller/RagFlowApiController.java +++ b/src/main/java/com/peanut/modules/common/controller/RagFlowApiController.java @@ -1,26 +1,14 @@ package com.peanut.modules.common.controller; -import com.alibaba.fastjson.JSONObject; import com.peanut.common.utils.R; +import com.peanut.common.utils.RagFlowApiUtil; import lombok.extern.slf4j.Slf4j; -import org.apache.http.Consts; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.Map; @Slf4j @@ -28,130 +16,31 @@ import java.util.Map; @RequestMapping("common/ragFlowApi") public class RagFlowApiController { + @Autowired + private RagFlowApiUtil ragFlowApiUtil; + + //聊天助手列表 @RequestMapping("/getChatAssistants") public R getChatAssistants() throws Exception{ - CloseableHttpClient httpClient = HttpClients.createDefault(); - HttpGet get = new HttpGet("http://125.39.141.154:81/api/v1/chats"); - get.setHeader("Authorization", "Bearer ragflow-kyM2JjZWM4MjFiNDExZjA5MTgzOGE1Nm"); - get.setHeader("Content-Type", "application/json;chartset=utf-8"); - CloseableHttpResponse response = httpClient.execute(get); - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode >= 400) { - throw new RuntimeException("API调用失败,状态码:" + statusCode); - } - HttpEntity responseEntity = response.getEntity(); - String responseString = EntityUtils.toString(responseEntity, Consts.UTF_8); - JSONObject jsonObject = JSONObject.parseObject(responseString); - List> list = new ArrayList(); - if ("0".equals(jsonObject.get("code").toString())){ - List l = jsonObject.getJSONArray("data"); - for (Object o : l) { - Map map = new HashMap<>(); - Map m = (Map)o; - map.put("id",m.get("id")); - map.put("name",m.get("name")); - list.add(map); - } - } - return R.ok().put("list",list); + return R.ok().put("list",ragFlowApiUtil.getChatAssistants("")); } //聊天助手下对话列表 @RequestMapping("/getChats") public R getChats(@RequestBody Map params) throws Exception{ - CloseableHttpClient httpClient = HttpClients.createDefault(); - String chatId = params.get("chatId").toString(); - String page = params.get("page").toString(); - String pageSize = params.get("pageSize").toString(); - String sessionId = params.get("sessionId").toString(); - String userId = params.get("userId").toString(); - HttpGet get = new HttpGet("http://125.39.141.154:81/api/v1/chats/"+chatId+"/sessions?page="+page+"&page_size="+pageSize+"&id="+sessionId+"&user_id="+userId); - get.setHeader("Authorization", "Bearer ragflow-kyM2JjZWM4MjFiNDExZjA5MTgzOGE1Nm"); - get.setHeader("Content-Type", "application/json;chartset=utf-8"); - CloseableHttpResponse response = httpClient.execute(get); - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode >= 400) { - throw new RuntimeException("API调用失败,状态码:" + statusCode); - } - HttpEntity responseEntity = response.getEntity(); - String responseString = EntityUtils.toString(responseEntity, Consts.UTF_8); - JSONObject jsonObject = JSONObject.parseObject(responseString); - List> list = new ArrayList(); - if ("0".equals(jsonObject.get("code").toString())){ - List l = jsonObject.getJSONArray("data"); - for (Object o : l) { - Map map = new HashMap<>(); - Map m = (Map)o; - map.put("id",m.get("id")); - map.put("name",m.get("name")); - map.put("messages",m.get("messages")); - list.add(map); - } - } - return R.ok().put("list",list); + return R.ok().put("list",ragFlowApiUtil.getChats(params)); } //创建会话 @RequestMapping("/createChat") public R createChat(@RequestBody Map params) throws Exception{ - CloseableHttpClient httpClient = HttpClients.createDefault(); - String chatId = params.get("chatId").toString(); - Map entity = new HashMap<>(); - entity.put("name", params.get("name").toString()); - entity.put("user_id", params.get("userId").toString()); - HttpPost post = new HttpPost("http://125.39.141.154:81/api/v1/chats/"+chatId+"/sessions"); - post.setHeader("Authorization", "Bearer ragflow-kyM2JjZWM4MjFiNDExZjA5MTgzOGE1Nm"); - post.setHeader("Content-Type", "application/json;chartset=utf-8"); - post.setEntity(new StringEntity(JSONObject.toJSONString(entity),"utf-8")); - CloseableHttpResponse response = httpClient.execute(post); - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode >= 400) { - throw new RuntimeException("API调用失败,状态码:" + statusCode); - } - HttpEntity responseEntity = response.getEntity(); - String responseString = EntityUtils.toString(responseEntity, Consts.UTF_8); - return R.ok().put("res",responseString); - } - - //与助手聊天 - @RequestMapping("/chatToAssistant") - public R chatToAssistant(@RequestBody Map params) throws Exception{ - CloseableHttpClient httpClient = HttpClients.createDefault(); - String chatId = params.get("chatId").toString(); - Map entity = new HashMap<>(); - entity.put("question", params.get("question").toString()); - entity.put("session_id", params.get("sessionId").toString()); - entity.put("user_id", params.get("userId").toString()); - HttpPost post = new HttpPost("http://125.39.141.154:81/api/v1/chats/"+chatId+"/completions"); - post.setHeader("Authorization", "Bearer ragflow-kyM2JjZWM4MjFiNDExZjA5MTgzOGE1Nm"); - post.setHeader("Content-Type", "application/json;chartset=utf-8"); - post.setEntity(new StringEntity(JSONObject.toJSONString(entity),"utf-8")); - CloseableHttpResponse response = httpClient.execute(post); - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode >= 400) { - throw new RuntimeException("API调用失败,状态码:" + statusCode); - } - HttpEntity responseEntity = response.getEntity(); - String responseString = EntityUtils.toString(responseEntity, Consts.UTF_8); - return R.ok().put("res",responseString); + return R.ok().put("id",ragFlowApiUtil.createChat(params)); } //与助手聊天流式 @RequestMapping(value = "/chatToAssistantStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public Flux chatToAssistantStream(@RequestBody Map params) { - String chatId = params.get("chatId").toString(); - Map entity = new HashMap<>(); - entity.put("question", params.get("question").toString()); - entity.put("stream", true); - entity.put("session_id", params.get("sessionId").toString()); - entity.put("user_id", params.get("userId").toString()); - return WebClient.create().post() - .uri("http://125.39.141.154:81/api/v1/chats/"+chatId+"/completions") - .header("Authorization", "Bearer ragflow-kyM2JjZWM4MjFiNDExZjA5MTgzOGE1Nm") - .header("Content-Type", "application/json;chartset=utf-8") - .bodyValue(JSONObject.toJSONString(entity)) - .retrieve() - .bodyToFlux(String.class);//输出格式 + public Flux chatToAssistantStream(@RequestBody Map params){ + return ragFlowApiUtil.chatToAssistantStream(params); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e14a3b5d..22cbd17d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -11,7 +11,7 @@ connection-timeout: 6000000ms spring: # 环境 dev/dev1|test|prod profiles: - active: dev1 + active: dev # jackson时间格式化 jackson: time-zone: GMT+8 @@ -20,6 +20,7 @@ spring: multipart: max-file-size: 1000MB max-request-size: 1000MB + enabled: true mvc: throw-exception-if-no-handler-found: true @@ -86,3 +87,8 @@ express: expAreaName: 南开 address: 秋泽园149号顺丰 +ragflow: + url: http://125.39.141.154:81 + authorization: Bearer ragflow-kyM2JjZWM4MjFiNDExZjA5MTgzOGE1Nm + +