RocketMQ 事务消息
分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。
Apache RocketMQ 事务消息的方案,具备高性能、可扩展、业务开发简单的优势,是实现分布式事务最终一致性的优秀解决方案。
一、应用场景
RocketMQ 事务消息适用于以下典型的分布式业务场景:
1.1 电商交易链路
典型场景:用户下单后的多系统协同
- 核心业务:订单创建成功
- 下游业务:
- 扣减库存
- 增加积分
- 发放优惠券
- 通知物流系统
- 推送大数据分析
为什么使用事务消息:
- 订单系统不能直接调用所有下游服务(耦合度高)
- 需要保证订单创建成功后,后续操作最终会执行
- 允许秒级延迟,不要求强一致性
1.2 支付结算系统
典型场景:支付成功后的账务处理
- 核心业务:用户支付成功
- 下游业务:
- 更新订单状态
- 扣除账户余额
- 记录流水账单
- 通知商户
- 触发分账逻辑
为什么使用事务消息:
- 支付系统与账务系统独立部署
- 资金安全要求高,必须保证数据一致性
- 需要完整的审计日志和可追溯性
1.3 用户注册流程
典型场景:新用户注册后的自动化处理
- 核心业务:用户账号创建成功
- 下游业务:
- 发送欢迎邮件/短信
- 初始化用户画像
- 创建会员档案
- 推送营销系统
- 分配默认权限
为什么使用事务消息:
- 注册流程需要快速响应(不能等待所有下游完成)
- 后续处理可以异步执行
- 允许部分失败后重试
1.4 内容发布系统
典型场景:文章/视频发布后的多平台同步
- 核心业务:内容审核通过并入库
- 下游业务:
- 推送到 APP 首页
- 同步到 CDN
- 通知订阅用户
- 生成推荐索引
- 数据统计分析
1.5 不适合的场景
❌ 强一致性场景:银行转账等需要实时一致性的业务
❌ 低延迟场景:要求毫秒级响应的实时计算
❌ 简单 CRUD 场景:单体应用或无需跨系统协调的业务
❌ 消息量极小场景:直接使用本地事务更简单
二、功能原理
2.1 什么是事务消息
事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
核心思想:将分布式事务拆分为两个阶段,通过”半消息”机制和本地事务绑定,确保”本地事务成功”与”消息发送成功”的原子性。
2.2 事务消息处理流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Producer│ │ MQ │ │Consumer │ │ Local │ │ │ │ Server │ │ │ │ DB │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ │ 1.发送半消息 │ │ │ │───────────────>│ │ │ │ │ │ │ │ 2.ACK(暂存) │ │ │ │<───────────────│ │ │ │ │ │ │ │ 3.执行本地事务 │ │ │ │────────────────────────────────────────────────>│ │ │ │ │ │ 4.提交二次确认 │ │ │ │───────────────>│ │ │ │ │ │ │ │ │ 5.投递消息 │ │ │ │───────────────>│ │ │ │ │ │ │ │ 6.消费消息 │ │ │ │────────────────────────────────>│ │ │ │ │
|
详细步骤说明:
第一阶段:发送半消息
生产者发送半消息
- 生产者向 MQ 服务端发送一条特殊类型的消息(半消息)
- 半消息与普通消息的区别:标记为”暂不可投递”
MQ 服务端持久化
- MQ 将半消息存储到特殊的 Topic(RMQ_SYS_TRANS_HALF_TOPIC)
- 消费者无法看到该消息
- 返回 ACK 确认,表示半消息发送成功
生产者接收 ACK
- 收到 ACK 后,生产者知道半消息已成功存储
- 开始执行本地事务
第二阶段:执行本地事务并提交确认
执行本地事务
- 生产者在本地数据库执行业务操作
- 例如:更新订单状态、扣减库存等
- 记录事务执行结果(成功/失败/未知)
提交二次确认
- 根据本地事务执行结果,向 MQ 发送二次确认:
- Commit:本地事务成功,消息可投递给消费者
- Rollback:本地事务失败,删除半消息
- Unknown:事务状态未知,请求回查
MQ 处理确认结果
- Commit:将半消息转为正常消息,投递给消费者
- Rollback:删除半消息,不投递
- Unknown:启动事务回查机制
异常处理:事务回查机制
触发条件:
- 生产者未在规定时间内(默认 60 秒)提交二次确认
- 网络故障导致确认消息丢失
- 生产者宕机
回查流程:
- MQ 定时扫描超时的半消息
- 主动向生产者发送事务状态回查请求
- 生产者检查本地事务日志,返回事务状态
- MQ 根据回查结果执行相应操作
2.3 关键设计
2.3.1 半消息机制
什么是半消息:
- 一种特殊的消息类型,对消费者不可见
- 存储在独立的系统 Topic 中
- 只有收到 Commit 确认后才会转为可投递消息
为什么需要半消息:
- 避免本地事务失败后,消息仍被消费
- 保证”本地事务执行”与”消息投递”的原子性
2.3.2 本地事务表
作用:记录本地事务执行状态,用于事务回查
表结构示例:
1 2 3 4 5 6 7 8 9 10
| CREATE TABLE transaction_log ( id BIGINT PRIMARY KEY, transaction_id VARCHAR(64) NOT NULL, message_id VARCHAR(64), status TINYINT NOT NULL, create_time DATETIME, update_time DATETIME, retry_count INT DEFAULT 0, INDEX idx_transaction_id (transaction_id) );
|
2.3.3 事务监听器
职责:
- 监听本地事务执行结果
- 决定提交 Commit 还是 Rollback
- 处理事务回查请求
接口定义:
1 2 3 4 5 6 7
| public interface TransactionListener { LocalTransactionState executeLocalTransaction(Message msg, Object arg); LocalTransactionState checkLocalTransaction(MessageExt msg); }
|
2.4 一致性保障
最终一致性保证:
- ✅ 本地事务成功 → 消息最终会被消费
- ✅ 本地事务失败 → 消息不会被消费
- ✅ 网络异常 → 通过回查机制恢复
不保证什么:
- ❌ 不保证强一致性(存在秒级延迟)
- ❌ 不保证实时性(消费者异步处理)
- ❌ 不保证顺序性(可能乱序消费)
三、使用限制
3.1 一致性级别限制
仅支持最终一致性:
- ❌ 不适用于强一致性场景(如银行核心转账)
- ⚠️ 存在秒级延迟(从本地事务完成到消息被消费)
- ⚠️ 极端情况下可能出现分钟级延迟(触发事务回查)
延迟来源:
- 网络传输延迟
- MQ 服务端处理延迟
- 消费者异步处理延迟
- 事务回查间隔(默认 60 秒)
3.2 性能限制
吞吐量影响:
- 事务消息的 TPS 约为普通消息的 60%-70%
- 原因:
- 额外的二次确认交互
- 本地事务执行开销
- 事务日志记录开销
资源消耗:
- 需要额外的数据库表存储事务日志
- MQ 服务端需要维护半消息状态
- 定时回查任务占用系统资源
3.3 开发复杂度
代码侵入性:
- 需要实现
TransactionListener 接口
- 需要维护本地事务日志表
- 需要处理事务回查逻辑
调试难度:
- 分布式链路追踪复杂
- 需要同时关注 MQ 日志和本地事务日志
- 问题定位涉及多个系统
3.4 运维要求
依赖组件:
- ✅ RocketMQ Broker(4.x 及以上版本)
- ✅ 关系型数据库(MySQL/PostgreSQL 等)
- ✅ 稳定的网络连接
监控要求:
- 需要监控半消息积压情况
- 需要监控事务回查频率
- 需要告警机制处理异常事务
3.5 消息特性限制
不支持的消息类型:
消息大小限制:
- 单条消息最大 4MB(与普通消息一致)
- 建议控制在 100KB 以内
3.6 事务超时限制
默认超时时间:
- 二次确认超时:60 秒(可配置)
- 超过超时时间未确认,MQ 主动发起回查
超时影响:
- 超时时间过短:可能频繁触发回查
- 超时时间过长:半消息积压,占用存储
3.7 不适用场景汇总
| 场景类型 |
原因 |
替代方案 |
| 强一致性要求 |
无法保证实时一致 |
Seata AT/TCC 模式 |
| 低延迟要求 (<100ms) |
异步处理延迟 |
本地事务 + 同步调用 |
| 简单业务场景 |
过度设计 |
直接本地事务 |
| 无 MQ 基础设施 |
依赖组件过多 |
数据库事件表轮询 |
| 消息量极小 (<100/天) |
成本效益低 |
定时任务补偿 |
四、使用方法
4.1 环境准备
4.1.1 添加依赖
Maven 项目引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> </dependencies>
|
4.1.2 配置文件
application.yml 配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| rocketmq: name-server: 127.0.0.1:9876 producer: group: order-transaction-group send-message-timeout: 3000 retry-times-when-send-failed: 2 retry-times-when-send-async-failed: 2 tx-producer: group: order-transaction-check-group core-pool-size: 10 max-pool-size: 30 keep-alive-time: 60 queue-capacity: 100
|
4.2 代码实现
4.2.1 完整示例:订单创建流程
业务场景:用户下单后,扣减库存并发送积分
步骤 1:定义事务监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| @Component @RocketMQTransactionListener(txProducerGroup = "order-transaction-check-group") public class OrderTransactionListener implements TransactionListener { @Autowired private OrderService orderService; @Autowired private InventoryService inventoryService;
@Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { String body = new String(message.getBody(), StandardCharsets.UTF_8); OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class); TransactionStatus status = orderService.createOrderWithTransaction(orderDTO); if (status.isCompleted() && status.isSuccess()) { return LocalTransactionState.COMMIT_MESSAGE; } else { return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { log.error("执行本地事务失败", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { try { String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class); boolean isOrderCreated = orderService.checkOrderExists(orderDTO.getOrderId()); if (isOrderCreated) { log.info("事务回查:订单已创建,提交消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { log.warn("事务回查:订单不存在,回滚消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { log.error("事务回查失败", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } }
|
步骤 2:发送事务消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @Service public class OrderServiceImpl implements OrderService { @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private OrderMapper orderMapper; @Autowired private InventoryService inventoryService;
@Transactional(rollbackFor = Exception.class) public TransactionStatus createOrderWithTransaction(OrderDTO orderDTO) { try { OrderPO order = convertToPO(orderDTO); orderMapper.insert(order); inventoryService.decreaseStock(orderDTO.getProductId(), orderDTO.getCount()); MessageBuilder<OrderDTO> builder = MessageBuilder .withPayload(orderDTO) .setHeader(MessageConst.PROPERTY_TRANSACTION_ID, generateTransactionId()); rocketMQTemplate.sendMessageInTransaction( "order-topic:order-tag", builder.build(), orderDTO ); return TransactionStatus.success(); } catch (Exception e) { log.error("创建订单失败", e); return TransactionStatus.failure(e.getMessage()); } }
public boolean checkOrderExists(Long orderId) { OrderPO order = orderMapper.selectById(orderId); return order != null; } private String generateTransactionId() { return UUID.randomUUID().toString().replace("-", ""); } }
|
步骤 3:消费者处理消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Component @RocketMQMessageListener( topic = "order-topic", consumerGroup = "inventory-consumer-group", messageModel = MessageModel.CLUSTERING ) public class InventoryConsumer implements RocketMQListener<OrderDTO> { @Autowired private InventoryService inventoryService; @Autowired private PointsService pointsService; @Override public void onMessage(OrderDTO orderDTO) { try { log.info("收到订单消息,开始处理:{}", orderDTO.getOrderId()); pointsService.addPoints(orderDTO.getUserId(), orderDTO.getAmount()); log.info("订单消息处理成功:{}", orderDTO.getOrderId()); } catch (Exception e) { log.error("订单消息处理失败:{}", orderDTO.getOrderId(), e); throw new RuntimeException("处理失败", e); } } }
|
4.2.2 简化版本(适合简单场景)
如果业务逻辑较简单,可以使用 Lambda 表达式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Service public class SimpleOrderService { @Autowired private RocketMQTemplate rocketMQTemplate; public void createOrder(OrderDTO orderDTO) { orderMapper.insert(convertToPO(orderDTO)); rocketMQTemplate.sendMessageInTransaction( "order-topic", MessageBuilder.withPayload(orderDTO).build(), (msg, args) -> { return LocalTransactionState.COMMIT_MESSAGE; }, null ); } }
|
4.3 测试验证
4.3.1 单元测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @SpringBootTest public class OrderTransactionTest { @Autowired private OrderService orderService; @Autowired private OrderMapper orderMapper; @Test public void testCreateOrderSuccess() { OrderDTO orderDTO = new OrderDTO(); orderDTO.setOrderId(10001L); orderDTO.setUserId(888L); orderDTO.setAmount(new BigDecimal("199.00")); orderService.createOrderWithTransaction(orderDTO); OrderPO order = orderMapper.selectById(10001L); assertNotNull(order); Thread.sleep(2000); } }
|
4.3.2 集成测试
使用 Docker 启动 RocketMQ 进行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13
| docker run -d --name rmqnamesrv \ -p 9876:9876 \ apache/rocketmq:4.9.4 \ sh mqnamesrv
docker run -d --name rmqbroker \ -p 10911:10911 -p 10909:10909 \ --link rmqnamesrv:rmqnamesrv \ -e "NAMESRV_ADDR=rmqnamesrv:9876" \ apache/rocketmq:4.9.4 \ sh mqbroker
|
4.4 监控和排查
4.4.1 查看事务消息状态
1 2 3 4 5
| sh mqadmin queryMsgByUniqueKey -n "localhost:9876" -t RMQ_SYS_TRANS_HALF_TOPIC -i <messageId>
sh mqadmin queryDlqByGroupId -n "localhost:9876" -g order-transaction-check-group
|
4.4.2 日志配置
1 2 3 4 5 6 7 8
| <logger name="org.apache.rocketmq" level="INFO"> <appender-ref ref="ROCKETMQ_FILE" /> </logger>
<logger name="com.example.order.transaction" level="DEBUG"> <appender-ref ref="TRANSACTION_FILE" /> </logger>
|
五、使用建议
5.1 架构设计建议
5.1.1 何时选择事务消息
✅ 推荐使用:
- 核心业务完成后,需要通知多个下游系统
- 可以接受秒级延迟的最终一致性
- 追求低耦合、高可扩展的架构
- 已有 RocketMQ 基础设施
❌ 不推荐使用:
- 需要强一致性保证(选 Seata TCC)
- 要求毫秒级响应(选同步调用)
- 业务逻辑极其简单(选本地事务)
- 没有 MQ 运维能力(选数据库事件表)
5.1.2 与其他方案对比
| 方案 |
一致性 |
性能 |
开发成本 |
适用场景 |
| 事务消息 |
最终一致 |
高 |
中 |
异步解耦、多下游通知 |
| Seata AT |
最终一致 |
中 |
低 |
微服务间强关联事务 |
| Seata TCC |
最终一致 |
最高 |
高 |
高并发核心链路 |
| 本地事件表 |
最终一致 |
中 |
中 |
无 MQ 基础设施 |
| 同步 RPC |
强一致 |
低 |
低 |
简单调用链 |
5.2 开发实践建议
5.2.1 事务监听器设计
✅ 最佳实践:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return orderTransactionService.executeAndReturnState((OrderDTO) arg); }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return orderTransactionService.checkState(msg); }
log.info("事务执行:orderId={}, status={}", orderDTO.getOrderId(), state);
|
❌ 避免做法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { remoteService.call(); }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { inventoryService.decreaseStock(); }
try { orderService.create(); } catch (Exception e) { log.error("失败", e); }
|
5.2.2 本地事务表设计
推荐表结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| CREATE TABLE `transaction_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `transaction_id` varchar(64) NOT NULL COMMENT '事务 ID(UUID)', `message_id` varchar(64) DEFAULT NULL COMMENT '消息 ID', `business_key` varchar(128) NOT NULL COMMENT '业务主键(订单 ID 等)', `status` tinyint(4) NOT NULL COMMENT '状态:0-未知 1-成功 2-失败', `retry_count` int(11) DEFAULT '0' COMMENT '回查重试次数', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `ext_data` text COMMENT '扩展数据(JSON)', PRIMARY KEY (`id`), UNIQUE KEY `uk_transaction_id` (`transaction_id`), KEY `idx_business_key` (`business_key`), KEY `idx_create_time` (`create_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='事务日志表';
|
使用建议:
- ✅ 事务 ID 使用 UUID,避免重复
- ✅ 记录业务主键,便于回查
- ✅ 添加重试次数,监控异常情况
- ✅ 定期归档历史数据(保留 7-30 天)
5.2.3 消息体设计
推荐格式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| { "eventId": "uuid-1234-5678", "eventType": "ORDER_CREATED", "timestamp": 1692518400000, "data": { "orderId": 10001, "userId": 888, "amount": 199.00 }, "metadata": { "source": "order-service", "version": "1.0" } }
|
设计原则:
- ✅ 包含唯一事件 ID,支持幂等消费
- ✅ 包含时间戳,支持延迟分析
- ✅ 数据结构清晰,便于扩展
- ✅ 控制大小在 100KB 以内
5.3 性能优化建议
5.3.1 提升吞吐量
批量发送(谨慎使用):
1 2 3 4 5 6 7 8 9 10 11 12
| List<OrderDTO> batch = new ArrayList<>(); for (OrderDTO order : orders) { orderService.createOrder(order); batch.add(order); if (batch.size() >= 50) { transactionSender.sendBatch(batch); batch.clear(); } }
|
异步处理:
1 2 3 4
| executorService.submit(() -> { rocketMQTemplate.sendMessageInTransaction(...); });
|
5.3.2 降低延迟
优化配置:
1 2 3 4 5
| rocketmq: producer: send-message-timeout: 1000 tx-producer: check-interval: 30
|
网络优化:
- 生产者和 Broker 同机房部署
- 使用专线或 VPC 内网通信
- 启用 TCP 长连接
5.4 容错和降级建议
5.4.1 重试策略
消费者重试:
1 2 3 4 5
| @RocketMQMessageListener( topic = "order-topic", consumerGroup = "inventory-consumer-group", maxReconsumeTimes = 5 )
|
降级方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Override public void onMessage(OrderDTO orderDTO) { try { pointsService.addPoints(orderDTO.getUserId(), orderDTO.getAmount()); } catch (Exception e) { if (message.getReconsumeTimes() >= 5) { deadLetterTable.save(orderDTO, e.getMessage()); alertService.send("积分发放失败,请人工处理"); return; } throw new RuntimeException("处理失败", e); } }
|
5.4.2 监控告警
关键指标:
- 半消息积压数量(超过 1000 告警)
- 事务回查成功率(低于 95% 告警)
- 消息消费延迟(超过 5 分钟告警)
- 死信队列增长速率
监控脚本示例:
1 2 3 4 5 6 7 8
| #!/bin/bash
HALF_MSG_COUNT=$(sh mqadmin queryMsgByUniqueKey -n "localhost:9876" \ -t RMQ_SYS_TRANS_HALF_TOPIC | wc -l)
if [ $HALF_MSG_COUNT -gt 1000 ]; then echo "警告:半消息积压超过 1000 条" | mail -s "MQ 告警" ops@example.com fi
|
5.5 常见问题 FAQ
Q1: 如何保证消息不丢失?
A: 三层保障:
- Broker 持久化:同步刷盘 + 同步复制
- 事务回查:超时未确认自动回查
- 死信队列:消费失败的消息单独存储
Q2: 如何处理重复消费?
A: 消费者必须实现幂等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public void onMessage(OrderDTO orderDTO) { String eventId = orderDTO.getEventId(); if (processedTable.exists(eventId)) { log.warn("消息已处理:{}", eventId); return; } process(orderDTO); processedTable.insert(eventId); }
|
Q3: 事务回查失败怎么办?
A: 分级处理:
- 偶发失败:自动重试(最多 3 次)
- 持续失败:转入人工处理队列
- 数据不一致:触发对账修复
Q4: 如何选择 Topic 和 Tag?
A: 按业务域划分:
1 2 3 4 5 6 7 8
| Topic: order-event # 订单相关事件 Tag: created # 订单创建 Tag: paid # 订单支付 Tag: cancelled # 订单取消
Topic: payment-event # 支付相关事件 Tag: success # 支付成功 Tag: failed # 支付失败
|
5.6 总结
核心要点:
- ✅ 事务消息适用于最终一致性场景,不适用于强一致性
- ✅ 核心价值是解耦,而非性能提升
- ✅ 必须实现幂等性和事务回查
- ✅ 需要完善的监控和告警机制
- ✅ 合理设计消息体和本地事务表
决策树:
1 2 3 4 5 6 7 8 9
| 是否需要强一致性? ├─ 是 → 选择 Seata TCC/AT └─ 否 → 是否可以接受秒级延迟? ├─ 否 → 选择同步 RPC 调用 └─ 是 → 是否需要解耦多个下游系统? ├─ 否 → 选择本地事务 └─ 是 → 是否有 MQ 基础设施? ├─ 否 → 选择数据库事件表 └─ 是 → ✅ 选择 RocketMQ 事务消息
|
最后建议:
技术选型没有银弹,只有最适合。事务消息是优秀的最终一致性解决方案,但前提是团队具备相应的开发和运维能力。建议在非核心业务先行试点,积累经验后再推广到核心链路。