RIX9Kz7szh 发表于 2025-2-25 02:42:46

SpringBoot 2.x 接入非标准SSE格式大模型流式响应实践 🚀

近期DeepSeek等国产大模型热度持续攀升,其关注度甚至超过了OpenAI(被戏称为CloseAI)。在SpringBoot3.x环境中,可以使用官方的Spring AI轻松接入,但对于仍在使用JDK8和SpringBoot2.7.3的企业级应用来说,往往需要自定义实现。特别是当大模型团队返回的数据格式不符合标准SSE规范时,更需要灵活处理。本文将分享我们的实战解决方案。
<hr>📦 引入Gradle依赖

核心依赖说明:

[*]spring-boot-starter-web:基础Web支持
[*]spring-boot-starter-webflux:响应式编程支持(WebClient所在模块)
implementation 'org.springframework.boot:spring-boot-starter-web'implementation 'org.springframework.boot:spring-boot-starter-webflux'<hr>🌐 WebClient配置要点

初始化时特别注意Header配置:
@Beanpublic WebClient init() {    return WebClient.builder()            .baseUrl(baseUrl)            .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi)            // ⚠️ 必须设置为JSON格式            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)            .build();}🚨 关键踩坑点:初始设置MediaType.TEXT_EVENT_STREAM_VALUE会导致请求失败,必须使用APPLICATION_JSON_VALUE
<hr>🧠 核心处理逻辑

流式请求入口

@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {    // 请求体构建    String requestBody = String.format("""      {            "model": "%s",            "messages": [{"role": "user", "content": "%s"}],            "stream": true      }      """, model, prompt);                                           return webClient.post()            // 请求配置            .uri("/v1/chat/completions")            .bodyValue(requestBody)            .accept(MediaType.TEXT_EVENT_STREAM)            .retrieve()            .bodyToFlux(DataBuffer.class)// 🔑 关键配置点            .transform(this::processStream)            // 重试和超时配置            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))            .timeout(Duration.ofSeconds(180));            // 错误处理            .doOnError(e -> log.error("Stream error", e))            .doFinally(signal -> log.info("Stream completed: {}", signal));}技术原理说明

当使用bodyToFlux(DataBuffer.class)时:

[*]✅ 获得原始字节流控制权
[*]❌ 避免自动SSE格式解析(适用于非标准响应)
[*]📡 动态数据流处理:类似Java Stream,但数据持续追加
<hr>🔧 非标准SSE数据处理

核心处理流程

private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {    return dataBufferFlux            .transform(DataBufferUtils::join)          // 字节流合并            .map(buffer -> {                        // 字节转字符串                String content = buffer.toString(StandardCharsets.UTF_8);                DataBufferUtils.release(buffer);                return content;            })            .flatMap(content ->                     // 处理粘包问题                Flux.fromArray(content.split("\\r?\\n\\r?\\n")))            .filter(event -> !event.trim().isEmpty()) // 过滤空事件            .map(event -> {                           // 格式标准化处理                String trimmed = event.trim();                if (trimmed.startsWith("data:")) {                  String substring = trimmed.substring(5);                  return substring.startsWith(" ") ? substring.substring(1) : substring;                }                return trimmed;            })            .filter(event -> !event.startsWith("data:")); // 二次过滤}三大关键技术点


[*]粘包处理
通过split("\\r?\\n\\r?\\n")解决网络传输中的消息边界问题,示例原始数据:
data:{response1}\n\ndata:{response2}\n\n
[*]格式兼容处理
自动去除服务端可能返回的data:前缀,同时保留Spring自动添加SSE前缀的能力
[*]双重过滤机制
确保最终输出不包含任何残留的SSE格式标识
<hr>⚠️ 特别注意

当接口设置produces = MediaType.TEXT_EVENT_STREAM_VALUE时:

[*]Spring WebFlux会自动添加data: 前缀
[*]前端收到的格式示例:
data: {实际内容}
[*]若手动添加
data: 前缀会导致重复:
data: data: {错误内容}// ❌ 错误格式
<hr>🛠️ 完整实现代码

// 包声明和导入...@Service@Slf4jpublic class OpenAiService {    // 配置项和初始化    private String openAiApiKey = "sk-xxxxxx";      private String baseUrl = "https://openai.com/xxxx";    private String model = "gpt-4o";    private WebClient webClient;    @PostConstruct    public void init() {      webClient = WebClient.builder()                .baseUrl(baseUrl)                .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAiApiKey)                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)                .build();    }    @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)    public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {      // 构建请求体      String requestBody = String.format("""                {                  "model": "gpt-4o-mini",                  "messages": [{"role": "user", "content": "%s"}],                  "stream": true                }                """, prompt);                                                   // 发送流式请求      return webClient.post()            .uri("/v1/chat/completions")            .bodyValue(requestBody)            .retrieve()            .onStatus(HttpStatusCode::isError, response ->                  response.bodyToMono(String.class)                            .flatMap(error -> Mono.error(new RuntimeException("API Error: " + error)))            )            .bodyToFlux(DataBuffer.class)            .transform(this::processStream)            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))            .timeout(Duration.ofSeconds(180))            .doOnError(e -> log.error("Stream error", e))            .doFinally(signal -> log.info("Stream completed: {}", signal));    }    private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {      return dataBufferFlux                // 使用字节流处理                .transform(DataBufferUtils::join)                .map(buffer -> {                  String content = buffer.toString(StandardCharsets.UTF_8);                  DataBufferUtils.release(buffer);                  return content;                })                // 按 SSE 事件边界,防止粘包的问题                .flatMap(content -> Flux.fromArray(content.split("\\r?\\n\\r?\\n")))                // 过滤空事件                .filter(event -> !event.trim().isEmpty())                // 规范 SSE 事件格式                .map(event -> {                  String trimmed = event.trim();                  // 由于webflux设置了"produces = MediaType.TEXT_EVENT_STREAM_VALUE",                  // 所以在返回数据时会自动添加“data:”,因此如果返回的格式带了“data:”需要手动去除                  if (trimmed.startsWith("data:")) {                        trimmed = trimmed.replaceFirst("data:","").trim();                  }                  return trimmed;                })                .filter(event -> !event.startsWith("data:"));    }}
页: [1]
查看完整版本: SpringBoot 2.x 接入非标准SSE格式大模型流式响应实践 🚀