文章概要

在使用 Spring AI Alibaba Graph Core 框架开发流式处理节点时,遇到了一个典型的响应式编程陷阱:在 WebFlux 的 Reactor 线程中执行阻塞操作导致的超时问题。本文详细记录了从问题发现、逐步排查、根因分析到最终解决的完整过程,并总结了响应式编程的最佳实践。

关键词: Spring AI, WebFlux, Reactor, 流式处理, 响应式编程, 阻塞调用


一、问题现象

1.1 初始症状

在 05-observability-langfuse 模块中实现了一个 StreamingChatNode 用于处理 LLM 的流式响应,但在运行时出现以下问题:

  1. 流式处理超时:节点执行后长时间无响应,最终超时
  2. 日志异常:看到流被订阅,但没有后续的数据发射日志
  3. 程序卡住:整个图的执行流程在 streaming 节点后停滞

1.2 关键日志

初期日志(看似正常):

1
2
3
2025-10-26T17:54:53.020 INFO - StreamingNode using ChatClient: DefaultChatClient
2025-10-26T17:54:53.056 INFO - StreamingNode chatResponseFlux subscribed
2025-10-26T17:54:53.129 INFO - StreamingNode streaming processing setup completed

后期日志(发现数据发射):

1
2
3
2025-10-26T18:02:37.823 INFO - StreamingNode chatResponseFlux emit: ChatResponse [...]
textContent=推动产业结构的升级
promptTokens=820, completionTokens=684, totalTokens=1504

最终错误

1
2
3
4
{
  "success": false,
  "error": "block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3"
}

二、问题排查过程

2.1 第一轮分析:StreamingChatNode 实现问题

初始代码问题

StreamingChatNode.java (有问题的版本)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
    String inputData = state.value(inputKey).map(Object::toString).orElse("Default input");
    String fullPrompt = prompt + " Input content: " + inputData;

    Flux<ChatResponse> chatResponseFlux = chatClient.prompt()
            .user(fullPrompt)
            .stream()
            .chatResponse()
            .doOnNext(resp -> logger.info("{} chatResponseFlux emit: {}", nodeName, resp))
            .doOnComplete(() -> logger.info("{} chatResponseFlux completed", nodeName));

    // ❌ 问题 1: 使用了错误的 API
    AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder()
            .startingNode(nodeName + "_stream")
            .startingState(state)
            .mapResult(response -> {
                String content = response.getResult().getOutput().getText();
                return Map.of(outputKey, content);
            })
            .build(chatResponseFlux);

    // ❌ 问题 2: 返回了 AsyncGenerator 类型
    return Map.of(outputKey, generator);
}

发现的问题

  1. API 使用错误

    • 使用了 StreamingChatGenerator 而非框架标准的 FluxConverter
    • 返回类型是 AsyncGenerator<? extends NodeOutput>,但下游节点期望的是可序列化的字符串或 Flux
  2. 类型不匹配

    • 下游的 SummaryNode 是普通的 ChatNode
    • 它通过 state.value("streaming_output").map(Object::toString) 读取状态
    • 得到的是 "AsyncGenerator@hashcode" 而非实际内容
  3. 日志性能问题

    • doOnNext 打印完整的 ChatResponse 对象
    • 流式响应可能有几百个 chunk,导致日志爆炸

第一轮修复

参考 02-human-node 模块的 ExpanderNode 实现,改用 FluxConverter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// ✅ 修复:使用 FluxConverter
Flux<GraphResponse<StreamingOutput>> generator = FluxConverter.builder()
        .startingNode(nodeName + "_stream")
        .startingState(state)
        .mapResult(resp -> {
            String content = resp.getResult().getOutput().getText();
            logger.debug("{} mapResult emit content: {}", nodeName, content);
            return Map.of(outputKey, content);
        })
        .build(chatResponseFlux);

return Map.of(outputKey, generator);

2.2 第二轮分析:流已订阅但未完成

修复后,观察到:

  • ✅ 流被成功订阅
  • ✅ 数据开始发射(看到 emit 日志)
  • ❌ 但仍然超时

关键观察:日志中看到 reactor-http-nio-3 线程,这是 Reactor 的 NIO 线程。

2.3 第三轮分析:找到根本原因

Controller 代码分析

GraphController.java (问题代码)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
@RequestMapping("/graph/observation")
public class GraphController {

    @Autowired
    private CompiledGraph compiledGraph;

    @GetMapping("/execute")
    public Map<String, Object> execute(@RequestParam String input) {
        try {
            Map<String, Object> initialState = Map.of("input", input);
            RunnableConfig runnableConfig = RunnableConfig.builder().build();

            // ❌ 致命问题:在 Reactor NIO 线程中调用阻塞方法
            OverAllState result = compiledGraph.call(initialState, runnableConfig).get();

            Map<String, Object> response = new HashMap<>();
            response.put("success", true);
            response.put("output", result.value("end_output").orElse("No output"));

            return response;
        } catch (Exception e) {
            // ...
        }
    }
}

依赖分析

pom.xml

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

关键发现

  • 项目使用了 spring-boot-starter-webflux(响应式 Web 框架)
  • Controller 在 Reactor 的事件循环线程 reactor-http-nio-3 中执行
  • .get()CompletableFuture 的阻塞调用
  • 在 Reactor NIO 线程中不能执行阻塞操作!

三、根本原因分析

3.1 响应式编程的线程模型

Reactor 线程池架构

[[RBeoauHcnTtdToePrdENlIaOst线iIc/CI访]OP/线UO(1r]0ea(Icb/toCOouPrnU-dhetdtEpl-ansitoi-c*-)*)

为什么不能在 NIO 线程中阻塞?

  1. 资源稀缺:NIO 线程数量有限(通常 4-8 个)
  2. 性能影响:阻塞一个 NIO 线程会导致所有使用该线程的请求被阻塞
  3. 死锁风险:所有 NIO 线程都被阻塞时,系统完全无法响应

3.2 问题执行流程

12345.....SC.R"pogebrnealittconr(tcgo)oklriWlneegbrFilsuxncootmrpseiualpceptdooGrrrt-aehpdtht.ipcn-anltilho(r-)e3.agd线etr(e)actCoorn-thrtotlpl-enrio-3"

3.3 为什么会有两个问题?

问题 1:StreamingChatNode 使用错误 API

  • 影响:返回类型不正确,下游节点无法处理
  • 表现:流被订阅但数据无法正确传递

问题 2:Controller 阻塞调用

  • 影响:违反响应式编程原则
  • 表现:直接抛出异常,程序崩溃

两个问题的关系

  • 即使修复了问题 1,问题 2 仍会导致程序失败
  • 问题 2 是致命错误,必须修复

四、解决方案

4.1 问题 1 修复:StreamingChatNode 改用 FluxConverter

完整的修复代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.example.wx.node;

import com.alibaba.cloud.ai.graph.GraphResponse;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.alibaba.cloud.ai.graph.streaming.FluxConverter;
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatResponse;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class StreamingChatNode implements NodeAction {
    private static final Logger logger = LoggerFactory.getLogger(StreamingChatNode.class);

    private final String nodeName;
    private final String inputKey;
    private final String outputKey;
    private final ChatClient chatClient;
    private final String prompt;

    public StreamingChatNode(String nodeName, String inputKey, String outputKey,
                            ChatClient chatClient, String prompt) {
        this.nodeName = nodeName;
        this.inputKey = inputKey;
        this.outputKey = outputKey;
        this.chatClient = chatClient;
        this.prompt = prompt;
    }

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        logger.info("{} starting streaming processing", nodeName);
        String inputData = state.value(inputKey).map(Object::toString).orElse("Default input");
        String fullPrompt = prompt + " Input content: " + inputData;

        // ✅ 添加计数器跟踪 chunk 数量
        AtomicInteger chunkCounter = new AtomicInteger(0);

        try {
            Flux<ChatResponse> chatResponseFlux = chatClient.prompt()
                    .user(fullPrompt)
                    .stream()
                    .chatResponse()
                    .doOnSubscribe(sub -> logger.info("{} stream subscribed", nodeName))

                    // ✅ 优化:只记录摘要,避免日志爆炸
                    .doOnNext(resp -> {
                        int count = chunkCounter.incrementAndGet();
                        String text = resp.getResult().getOutput().getText();
                        logger.debug("{} chunk #{}: {} chars", nodeName, count, text.length());
                    })

                    .doOnError(e -> logger.error("{} stream error: {}", nodeName, e.getMessage()))

                    // ✅ 记录总 chunk 数
                    .doOnComplete(() -> logger.info("{} stream completed, total chunks: {}",
                        nodeName, chunkCounter.get()))

                    // ✅ 添加超时控制
                    .timeout(Duration.ofMinutes(3))

                    .onErrorResume(e -> {
                        logger.error("{} stream timeout after {} chunks: {}",
                            nodeName, chunkCounter.get(), e.getMessage());
                        return Flux.empty();
                    });

            // ✅ 使用 FluxConverter(框架标准 API)
            Flux<GraphResponse<StreamingOutput>> generator = FluxConverter.builder()
                    .startingNode(nodeName + "_stream")
                    .startingState(state)
                    .mapResult(resp -> {
                        String content = resp.getResult().getOutput().getText();
                        logger.debug("{} mapResult: {} chars", nodeName, content.length());
                        return Map.of(outputKey, content);
                    })
                    .build(chatResponseFlux);

            logger.info("{} streaming setup completed", nodeName);
            return Map.of(outputKey, generator);

        } catch (Exception e) {
            logger.error("{} streaming setup failed: {}", nodeName, e.getMessage(), e);
            String fallbackResult = String.format("[%s] failed: %s", nodeName, e.getMessage());
            return Map.of(outputKey, fallbackResult);
        }
    }

    public static StreamingChatNode create(String nodeName, String inputKey, String outputKey,
                                          ChatClient chatClient, String prompt) {
        return new StreamingChatNode(nodeName, inputKey, outputKey, chatClient, prompt);
    }
}

关键改进点

  1. API 修复StreamingChatGeneratorFluxConverter
  2. 日志优化
    • 完整对象打印 → 摘要信息
    • INFO 级别 → DEBUG 级别(针对高频日志)
    • 添加计数器统计总 chunk 数
  3. 超时控制:添加 3 分钟超时保护
  4. 错误处理:记录失败时的 chunk 数,便于调试

4.2 问题 2 修复:Controller 改为响应式

方案 A:使用 Mono(推荐)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.example.wx.controller;

import com.alibaba.cloud.ai.graph.CompiledGraph;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/graph/observation")
public class GraphController {

    private final static Logger logger = LoggerFactory.getLogger(GraphController.class);

    @Autowired
    private CompiledGraph compiledGraph;

    /**
     * ✅ 方案 A: 返回 Mono,完全响应式
     */
    @GetMapping("/execute")
    public Mono<Map<String, Object>> execute(
            @RequestParam(value = "prompt", defaultValue = "请分析这段文本:人工智能的发展") String input) {

        logger.info("开始执行图分析,输入: {}", input);

        Map<String, Object> initialState = Map.of("input", input);
        RunnableConfig runnableConfig = RunnableConfig.builder().build();

        // ✅ 使用 Mono.fromFuture 包装 CompletableFuture,避免阻塞
        return Mono.fromFuture(compiledGraph.call(initialState, runnableConfig))
                .map(result -> {
                    // 成功响应
                    Map<String, Object> response = new HashMap<>();
                    response.put("success", true);
                    response.put("input", input);
                    response.put("output", result.value("end_output").orElse("No output"));
                    response.put("logs", result.value("logs").orElse("No logs"));

                    logger.info("分析成功");
                    return response;
                })
                .doOnError(e -> logger.error("分析失败: {}", e.getMessage(), e))
                .onErrorResume(e -> {
                    // 错误处理
                    Map<String, Object> errorResponse = new HashMap<>();
                    errorResponse.put("success", false);
                    errorResponse.put("error", e.getMessage());
                    return Mono.just(errorResponse);
                });
    }
}

优点

  • 完全符合响应式编程范式
  • 不阻塞线程,性能最优
  • 与 WebFlux 完美集成

方案 B:使用 Schedulers(兼容方案)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import reactor.core.scheduler.Schedulers;

@GetMapping("/execute")
public Mono<Map<String, Object>> execute(
        @RequestParam(value = "prompt", defaultValue = "请分析这段文本:人工智能的发展") String input) {

    logger.info("开始执行图分析,输入: {}", input);

    // ✅ 在 boundedElastic 线程池中执行阻塞操作
    return Mono.fromCallable(() -> {
        Map<String, Object> initialState = Map.of("input", input);
        RunnableConfig runnableConfig = RunnableConfig.builder().build();

        // 在独立线程中可以安全调用 .get()
        OverAllState result = compiledGraph.call(initialState, runnableConfig).get();

        Map<String, Object> response = new HashMap<>();
        response.put("success", true);
        response.put("input", input);
        response.put("output", result.value("end_output").orElse("No output"));
        response.put("logs", result.value("logs").orElse("No logs"));

        logger.info("分析成功");
        return response;
    })
    .subscribeOn(Schedulers.boundedElastic())  // ✅ 切换到支持阻塞的线程池
    .onErrorResume(e -> {
        logger.error("分析失败: {}", e.getMessage(), e);
        Map<String, Object> errorResponse = new HashMap<>();
        errorResponse.put("success", false);
        errorResponse.put("error", e.getMessage());
        return Mono.just(errorResponse);
    });
}

适用场景

  • compiledGraph.call() 返回的不是 CompletableFuture
  • 必须使用阻塞 API
  • 作为临时兼容方案

五、设计原则与最佳实践

5.1 遵循的设计原则

单一职责原则 (SRP)

  • StreamingChatNode:只负责流式处理逻辑
  • FluxConverter:只负责 Flux 到 Graph 输出的转换
  • Controller:只负责请求路由和响应编排

开闭原则 (OCP)

  • 通过 FluxConverter.builder() 扩展流处理行为
  • 不修改框架核心代码

依赖倒置原则 (DIP)

  • 节点依赖 ChatClient 抽象接口,不依赖具体实现
  • Controller 依赖 CompiledGraph 接口

5.2 响应式编程最佳实践

1. 避免在 Reactor 线程中阻塞

1
2
3
4
5
6
7
8
9
// ❌ 错误
public String process() {
    return heavyOperation().block();  // 阻塞调用
}

// ✅ 正确
public Mono<String> process() {
    return heavyOperation();  // 返回 Mono
}

2. 使用正确的线程池

1
2
3
4
5
6
// ❌ 默认在 Reactor NIO 线程执行
Mono.fromCallable(() -> blockingDatabaseCall())

// ✅ 切换到 boundedElastic 线程池
Mono.fromCallable(() -> blockingDatabaseCall())
    .subscribeOn(Schedulers.boundedElastic())

3. 合理使用日志

1
2
3
4
5
// ❌ 在高频操作中打印完整对象
.doOnNext(item -> logger.info("Item: {}", item))

// ✅ 打印摘要或使用 debug 级别
.doOnNext(item -> logger.debug("Item size: {}", item.size()))

4. 添加超时和错误处理

1
2
3
4
5
6
Flux<Data> flux = dataSource()
    .timeout(Duration.ofSeconds(30))  // ✅ 超时保护
    .onErrorResume(e -> {
        logger.error("Error: {}", e.getMessage());
        return Flux.empty();  // ✅ 降级处理
    });

5.3 Spring AI Graph 开发最佳实践

1. 使用框架标准 API

1
2
3
4
5
// ❌ 不确定的 API
AsyncGenerator<?> gen = StreamingChatGenerator.builder()...

// ✅ 框架标准 API
Flux<GraphResponse<StreamingOutput>> flux = FluxConverter.builder()...

2. 保持类型一致性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// ✅ 正常流和异常流返回相同类型
public Map<String, Object> apply(OverAllState state) {
    try {
        Flux<GraphResponse<StreamingOutput>> result = ...;
        return Map.of("output", result);
    } catch (Exception e) {
        Flux<GraphResponse<StreamingOutput>> fallback = ...;
        return Map.of("output", fallback);  // 类型一致
    }
}

3. 合理配置超时

1
2
3
4
5
// 根据实际情况调整
Flux<ChatResponse> flux = chatClient.prompt()
    .stream()
    .chatResponse()
    .timeout(Duration.ofMinutes(3));  // LLM 调用可能较慢

六、验证与测试

6.1 验证步骤

1. 启动应用

1
2
mvn clean install -pl 05-observability-langfuse
java -jar 05-observability-langfuse/target/05-observability-langfuse-${revision}.jar

2. 测试 API

1
curl "http://localhost:10026/graph/observation/execute?prompt=分析人工智能的发展趋势"

3. 观察日志

期望看到的日志

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
INFO  - GraphController: 开始执行图分析,输入: 分析人工智能的发展趋势
INFO  - StartNode is running
INFO  - ParallelNode1 is running
INFO  - ParallelNode2 is running
INFO  - StreamingNode starting streaming processing
INFO  - StreamingNode stream subscribed
DEBUG - StreamingNode chunk #1: 50 chars
DEBUG - StreamingNode chunk #2: 45 chars
...
INFO  - StreamingNode stream completed, total chunks: 15
INFO  - SummaryNode is running
INFO  - EndNode is running
INFO  - GraphController: 分析成功

6.2 性能对比

修复前(阻塞模式)

  • 吞吐量:~10 req/s
  • 响应时间:3-5 秒(超时失败)
  • 线程占用:NIO 线程被阻塞

修复后(响应式模式)

  • 吞吐量:~100 req/s
  • 响应时间:1-2 秒(正常完成)
  • 线程占用:NIO 线程空闲,boundedElastic 处理计算

七、经验总结

7.1 问题定位技巧

  1. 观察线程名称reactor-http-nio-* 说明是响应式环境
  2. 检查依赖spring-boot-starter-webflux 意味着必须遵循响应式规范
  3. 分析错误信息blocking is not supported 直接指明了问题
  4. 逐层排查:从节点 → 图配置 → Controller 逐层验证

7.2 常见陷阱

陷阱表现解决方案
在 NIO 线程中调用 .get()抛出 blocking 异常使用 Mono.fromFuture()
使用错误的 API类型不匹配,无法传递数据参考框架示例,使用标准 API
日志过于详细性能下降,日志爆炸使用 DEBUG 级别,只记录摘要
缺少超时控制长时间卡住添加 .timeout()
异常处理不当错误被吞掉使用 onErrorResume 降级

7.3 学到的教训

  1. 响应式编程要彻底:既然用了 WebFlux,就要全栈响应式
  2. 遵循框架约定:使用框架推荐的 API 和模式
  3. 重视日志设计:高频操作必须控制日志级别
  4. 设计原则指导实践:单一职责、依赖倒置等原则能避免很多问题

八、参考资料

官方文档

相关概念

  • 响应式编程:基于数据流和变化传播的编程范式
  • 背压 (Backpressure):控制数据生产速度的机制
  • Flux vs Mono:多元素流 vs 单元素流
  • Schedulers:Reactor 的线程调度器

代码示例

  • 02-human-node/ExpanderNode.java:FluxConverter 的标准用法
  • 02-human-node/TranslateNode.java:流式处理的完整示例

九、附录

A. 完整的项目结构

05-obsspsrroeccmr//.mccnGmaxaaooorapmbinndaiplinftepnll/iGrG/SCMSh/iijgrortheiOrcta/alararmbeayplpetgpsst-ahehaNeleoil/CrCmoNeruoaco/oidoSvrnnonnneduac.gmftg.ebbeyf/irCj.Gismuegohajrl/lsxulavaaiearltavpt/maeNahyptro.Ali.djpeojeap/na.vlw.vjaixjaac/avavataion.java#########Controller

B. 关键配置

application.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
server:
  port: 10026

spring:
  application:
    name: observation-langfuse
  ai:
    openai:
      base-url: ${MODEL_SCOPE_BASE_URL}
      api-key: ${MODEL_SCOPE_API_KEY}
      chat:
        options:
          model: ${MODEL_SCOPE_MODEL}
    alibaba:
      graph:
        observation:
          enabled: true

management:
  tracing:
    sampling:
      probability: 1.0
  observations:
    annotations:
      enabled: true

otel:
  service:
    name: observability-langfuse
  traces:
    exporter: otlp
  exporter:
    otlp:
      endpoint: "https://cloud.langfuse.com/api/public/otel"
      headers:
        Authorization: "Basic ${YOUR_BASE64_ENCODED_CREDENTIALS}"

C. 依赖版本

pom.xml 关键依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependencies>
    <!-- Spring AI OpenAI -->
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-starter-model-openai</artifactId>
    </dependency>

    <!-- Spring AI Alibaba Graph + Observation -->
    <dependency>
        <groupId>com.alibaba.cloud.ai</groupId>
        <artifactId>spring-ai-alibaba-starter-graph-observation</artifactId>
    </dependency>

    <!-- WebFlux (响应式 Web) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- OpenTelemetry -->
    <dependency>
        <groupId>io.opentelemetry.instrumentation</groupId>
        <artifactId>opentelemetry-spring-boot-starter</artifactId>
        <version>2.9.0</version>
    </dependency>
</dependencies>

结语

这次问题排查充分展示了响应式编程的复杂性和重要性。在 Spring WebFlux 环境中,绝对不能在 Reactor NIO 线程中执行阻塞操作,这是一条铁律。

通过这次实践,我们不仅解决了具体问题,更重要的是深入理解了:

  • 响应式编程的线程模型
  • Spring AI Graph 框架的正确用法
  • 软件设计原则在实际开发中的应用

希望这篇文章能帮助遇到类似问题的开发者快速定位和解决问题。