240820-分布式事务解决方案事务性消息

RocketMQ 事务消息

分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。

Apache RocketMQ 事务消息的方案,具备高性能、可扩展、业务开发简单的优势,是实现分布式事务最终一致性的优秀解决方案。

一、应用场景

RocketMQ 事务消息适用于以下典型的分布式业务场景:

1.1 电商交易链路

典型场景:用户下单后的多系统协同

  • 核心业务:订单创建成功
  • 下游业务
    • 扣减库存
    • 增加积分
    • 发放优惠券
    • 通知物流系统
    • 推送大数据分析

为什么使用事务消息

  • 订单系统不能直接调用所有下游服务(耦合度高)
  • 需要保证订单创建成功后,后续操作最终会执行
  • 允许秒级延迟,不要求强一致性

1.2 支付结算系统

典型场景:支付成功后的账务处理

  • 核心业务:用户支付成功
  • 下游业务
    • 更新订单状态
    • 扣除账户余额
    • 记录流水账单
    • 通知商户
    • 触发分账逻辑

为什么使用事务消息

  • 支付系统与账务系统独立部署
  • 资金安全要求高,必须保证数据一致性
  • 需要完整的审计日志和可追溯性

1.3 用户注册流程

典型场景:新用户注册后的自动化处理

  • 核心业务:用户账号创建成功
  • 下游业务
    • 发送欢迎邮件/短信
    • 初始化用户画像
    • 创建会员档案
    • 推送营销系统
    • 分配默认权限

为什么使用事务消息

  • 注册流程需要快速响应(不能等待所有下游完成)
  • 后续处理可以异步执行
  • 允许部分失败后重试

1.4 内容发布系统

典型场景:文章/视频发布后的多平台同步

  • 核心业务:内容审核通过并入库
  • 下游业务
    • 推送到 APP 首页
    • 同步到 CDN
    • 通知订阅用户
    • 生成推荐索引
    • 数据统计分析

1.5 不适合的场景

强一致性场景:银行转账等需要实时一致性的业务
低延迟场景:要求毫秒级响应的实时计算
简单 CRUD 场景:单体应用或无需跨系统协调的业务
消息量极小场景:直接使用本地事务更简单

二、功能原理

2.1 什么是事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

核心思想:将分布式事务拆分为两个阶段,通过”半消息”机制和本地事务绑定,确保”本地事务成功”与”消息发送成功”的原子性。

2.2 事务消息处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────┐      ┌─────────┐      ┌─────────┐      ┌─────────┐
│ Producer│ │ MQ │ │Consumer │ │ Local │
│ │ │ Server │ │ │ │ DB │
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
│ │ │ │
│ 1.发送半消息 │ │ │
│───────────────>│ │ │
│ │ │ │
│ 2.ACK(暂存) │ │ │
│<───────────────│ │ │
│ │ │ │
│ 3.执行本地事务 │ │ │
│────────────────────────────────────────────────>│
│ │ │ │
│ 4.提交二次确认 │ │ │
│───────────────>│ │ │
│ │ │ │
│ │ 5.投递消息 │ │
│ │───────────────>│ │
│ │ │ │
│ │ 6.消费消息 │ │
│ │────────────────────────────────>│
│ │ │ │

详细步骤说明

第一阶段:发送半消息

  1. 生产者发送半消息

    • 生产者向 MQ 服务端发送一条特殊类型的消息(半消息)
    • 半消息与普通消息的区别:标记为”暂不可投递”
  2. MQ 服务端持久化

    • MQ 将半消息存储到特殊的 Topic(RMQ_SYS_TRANS_HALF_TOPIC)
    • 消费者无法看到该消息
    • 返回 ACK 确认,表示半消息发送成功
  3. 生产者接收 ACK

    • 收到 ACK 后,生产者知道半消息已成功存储
    • 开始执行本地事务

第二阶段:执行本地事务并提交确认

  1. 执行本地事务

    • 生产者在本地数据库执行业务操作
    • 例如:更新订单状态、扣减库存等
    • 记录事务执行结果(成功/失败/未知)
  2. 提交二次确认

    • 根据本地事务执行结果,向 MQ 发送二次确认:
      • Commit:本地事务成功,消息可投递给消费者
      • Rollback:本地事务失败,删除半消息
      • Unknown:事务状态未知,请求回查
  3. MQ 处理确认结果

    • Commit:将半消息转为正常消息,投递给消费者
    • Rollback:删除半消息,不投递
    • Unknown:启动事务回查机制

异常处理:事务回查机制

触发条件

  • 生产者未在规定时间内(默认 60 秒)提交二次确认
  • 网络故障导致确认消息丢失
  • 生产者宕机

回查流程

  1. MQ 定时扫描超时的半消息
  2. 主动向生产者发送事务状态回查请求
  3. 生产者检查本地事务日志,返回事务状态
  4. MQ 根据回查结果执行相应操作

2.3 关键设计

2.3.1 半消息机制

什么是半消息

  • 一种特殊的消息类型,对消费者不可见
  • 存储在独立的系统 Topic 中
  • 只有收到 Commit 确认后才会转为可投递消息

为什么需要半消息

  • 避免本地事务失败后,消息仍被消费
  • 保证”本地事务执行”与”消息投递”的原子性

2.3.2 本地事务表

作用:记录本地事务执行状态,用于事务回查

表结构示例

1
2
3
4
5
6
7
8
9
10
CREATE TABLE transaction_log (
id BIGINT PRIMARY KEY,
transaction_id VARCHAR(64) NOT NULL, -- 事务 ID
message_id VARCHAR(64), -- 消息 ID
status TINYINT NOT NULL, -- 状态:0-未知,1-成功,2-失败
create_time DATETIME, -- 创建时间
update_time DATETIME, -- 更新时间
retry_count INT DEFAULT 0, -- 重试次数
INDEX idx_transaction_id (transaction_id)
);

2.3.3 事务监听器

职责

  • 监听本地事务执行结果
  • 决定提交 Commit 还是 Rollback
  • 处理事务回查请求

接口定义

1
2
3
4
5
6
7
public interface TransactionListener {
// 执行本地事务
LocalTransactionState executeLocalTransaction(Message msg, Object arg);

// 事务回查
LocalTransactionState checkLocalTransaction(MessageExt msg);
}

2.4 一致性保障

最终一致性保证

  • ✅ 本地事务成功 → 消息最终会被消费
  • ✅ 本地事务失败 → 消息不会被消费
  • ✅ 网络异常 → 通过回查机制恢复

不保证什么

  • ❌ 不保证强一致性(存在秒级延迟)
  • ❌ 不保证实时性(消费者异步处理)
  • ❌ 不保证顺序性(可能乱序消费)

三、使用限制

3.1 一致性级别限制

仅支持最终一致性

  • ❌ 不适用于强一致性场景(如银行核心转账)
  • ⚠️ 存在秒级延迟(从本地事务完成到消息被消费)
  • ⚠️ 极端情况下可能出现分钟级延迟(触发事务回查)

延迟来源

  • 网络传输延迟
  • MQ 服务端处理延迟
  • 消费者异步处理延迟
  • 事务回查间隔(默认 60 秒)

3.2 性能限制

吞吐量影响

  • 事务消息的 TPS 约为普通消息的 60%-70%
  • 原因:
    • 额外的二次确认交互
    • 本地事务执行开销
    • 事务日志记录开销

资源消耗

  • 需要额外的数据库表存储事务日志
  • MQ 服务端需要维护半消息状态
  • 定时回查任务占用系统资源

3.3 开发复杂度

代码侵入性

  • 需要实现 TransactionListener 接口
  • 需要维护本地事务日志表
  • 需要处理事务回查逻辑

调试难度

  • 分布式链路追踪复杂
  • 需要同时关注 MQ 日志和本地事务日志
  • 问题定位涉及多个系统

3.4 运维要求

依赖组件

  • ✅ RocketMQ Broker(4.x 及以上版本)
  • ✅ 关系型数据库(MySQL/PostgreSQL 等)
  • ✅ 稳定的网络连接

监控要求

  • 需要监控半消息积压情况
  • 需要监控事务回查频率
  • 需要告警机制处理异常事务

3.5 消息特性限制

不支持的消息类型

  • ❌ 延时消息
  • ❌ 顺序消息
  • ❌ 批量消息

消息大小限制

  • 单条消息最大 4MB(与普通消息一致)
  • 建议控制在 100KB 以内

3.6 事务超时限制

默认超时时间

  • 二次确认超时:60 秒(可配置)
  • 超过超时时间未确认,MQ 主动发起回查

超时影响

  • 超时时间过短:可能频繁触发回查
  • 超时时间过长:半消息积压,占用存储

3.7 不适用场景汇总

场景类型 原因 替代方案
强一致性要求 无法保证实时一致 Seata AT/TCC 模式
低延迟要求 (<100ms) 异步处理延迟 本地事务 + 同步调用
简单业务场景 过度设计 直接本地事务
无 MQ 基础设施 依赖组件过多 数据库事件表轮询
消息量极小 (<100/天) 成本效益低 定时任务补偿

四、使用方法

4.1 环境准备

4.1.1 添加依赖

Maven 项目引入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependencies>
<!-- RocketMQ Spring Boot Starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>

<!-- 如果使用 Spring Cloud Alibaba -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
</dependencies>

4.1.2 配置文件

application.yml 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: order-transaction-group
send-message-timeout: 3000 # 发送超时 3 秒
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
# 事务消息配置
tx-producer:
group: order-transaction-check-group
core-pool-size: 10 # 事务回查线程池大小
max-pool-size: 30
keep-alive-time: 60
queue-capacity: 100

4.2 代码实现

4.2.1 完整示例:订单创建流程

业务场景:用户下单后,扣减库存并发送积分

步骤 1:定义事务监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Component
@RocketMQTransactionListener(txProducerGroup = "order-transaction-check-group")
public class OrderTransactionListener implements TransactionListener {

@Autowired
private OrderService orderService;

@Autowired
private InventoryService inventoryService;

/**
* 执行本地事务
* @param message 消息对象
* @param o 业务参数
* @return 事务状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
// 解析消息体
String body = new String(message.getBody(), StandardCharsets.UTF_8);
OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);

// 开启本地事务
TransactionStatus status = orderService.createOrderWithTransaction(orderDTO);

// 根据事务状态返回
if (status.isCompleted() && status.isSuccess()) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
log.error("执行本地事务失败", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

/**
* 事务回查
* @param messageExt 消息对象
* @return 事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
try {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);

// 查询本地事务状态
boolean isOrderCreated = orderService.checkOrderExists(orderDTO.getOrderId());

if (isOrderCreated) {
log.info("事务回查:订单已创建,提交消息");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
log.warn("事务回查:订单不存在,回滚消息");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
log.error("事务回查失败", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
步骤 2:发送事务消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Service
public class OrderServiceImpl implements OrderService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Autowired
private OrderMapper orderMapper;

@Autowired
private InventoryService inventoryService;

/**
* 创建订单并发送事务消息
*/
@Transactional(rollbackFor = Exception.class)
public TransactionStatus createOrderWithTransaction(OrderDTO orderDTO) {
try {
// 1. 创建订单
OrderPO order = convertToPO(orderDTO);
orderMapper.insert(order);

// 2. 扣减库存(本地事务内)
inventoryService.decreaseStock(orderDTO.getProductId(), orderDTO.getCount());

// 3. 构建消息
MessageBuilder<OrderDTO> builder = MessageBuilder
.withPayload(orderDTO)
.setHeader(MessageConst.PROPERTY_TRANSACTION_ID, generateTransactionId());

// 4. 发送事务消息
rocketMQTemplate.sendMessageInTransaction(
"order-topic:order-tag", // Topic 和 Tag
builder.build(), // 消息对象
orderDTO // 业务参数(传递给监听器)
);

return TransactionStatus.success();

} catch (Exception e) {
log.error("创建订单失败", e);
return TransactionStatus.failure(e.getMessage());
}
}

/**
* 检查订单是否存在(用于事务回查)
*/
public boolean checkOrderExists(Long orderId) {
OrderPO order = orderMapper.selectById(orderId);
return order != null;
}

private String generateTransactionId() {
return UUID.randomUUID().toString().replace("-", "");
}
}
步骤 3:消费者处理消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "inventory-consumer-group",
messageModel = MessageModel.CLUSTERING
)
public class InventoryConsumer implements RocketMQListener<OrderDTO> {

@Autowired
private InventoryService inventoryService;

@Autowired
private PointsService pointsService;

@Override
public void onMessage(OrderDTO orderDTO) {
try {
log.info("收到订单消息,开始处理:{}", orderDTO.getOrderId());

// 1. 增加积分
pointsService.addPoints(orderDTO.getUserId(), orderDTO.getAmount());

// 2. 发送优惠券
// ...

// 3. 通知物流系统
// ...

log.info("订单消息处理成功:{}", orderDTO.getOrderId());

} catch (Exception e) {
log.error("订单消息处理失败:{}", orderDTO.getOrderId(), e);
// RocketMQ 会自动重试(默认 16 次)
throw new RuntimeException("处理失败", e);
}
}
}

4.2.2 简化版本(适合简单场景)

如果业务逻辑较简单,可以使用 Lambda 表达式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class SimpleOrderService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

public void createOrder(OrderDTO orderDTO) {
// 1. 执行本地事务
orderMapper.insert(convertToPO(orderDTO));

// 2. 发送事务消息
rocketMQTemplate.sendMessageInTransaction(
"order-topic",
MessageBuilder.withPayload(orderDTO).build(),
(msg, args) -> {
// 本地事务已提交,返回 COMMIT
return LocalTransactionState.COMMIT_MESSAGE;
},
null
);
}
}

4.3 测试验证

4.3.1 单元测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@SpringBootTest
public class OrderTransactionTest {

@Autowired
private OrderService orderService;

@Autowired
private OrderMapper orderMapper;

@Test
public void testCreateOrderSuccess() {
// 准备数据
OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderId(10001L);
orderDTO.setUserId(888L);
orderDTO.setAmount(new BigDecimal("199.00"));

// 执行
orderService.createOrderWithTransaction(orderDTO);

// 验证订单已创建
OrderPO order = orderMapper.selectById(10001L);
assertNotNull(order);

// 等待消息投递(异步)
Thread.sleep(2000);

// 验证库存已扣减、积分已增加等
// ...
}
}

4.3.2 集成测试

使用 Docker 启动 RocketMQ 进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 启动 NameServer
docker run -d --name rmqnamesrv \
-p 9876:9876 \
apache/rocketmq:4.9.4 \
sh mqnamesrv

# 启动 Broker
docker run -d --name rmqbroker \
-p 10911:10911 -p 10909:10909 \
--link rmqnamesrv:rmqnamesrv \
-e "NAMESRV_ADDR=rmqnamesrv:9876" \
apache/rocketmq:4.9.4 \
sh mqbroker

4.4 监控和排查

4.4.1 查看事务消息状态

1
2
3
4
5
# 查看半消息数量
sh mqadmin queryMsgByUniqueKey -n "localhost:9876" -t RMQ_SYS_TRANS_HALF_TOPIC -i <messageId>

# 查看死信队列
sh mqadmin queryDlqByGroupId -n "localhost:9876" -g order-transaction-check-group

4.4.2 日志配置

1
2
3
4
5
6
7
8
<!-- logback-spring.xml -->
<logger name="org.apache.rocketmq" level="INFO">
<appender-ref ref="ROCKETMQ_FILE" />
</logger>

<logger name="com.example.order.transaction" level="DEBUG">
<appender-ref ref="TRANSACTION_FILE" />
</logger>

五、使用建议

5.1 架构设计建议

5.1.1 何时选择事务消息

推荐使用

  • 核心业务完成后,需要通知多个下游系统
  • 可以接受秒级延迟的最终一致性
  • 追求低耦合、高可扩展的架构
  • 已有 RocketMQ 基础设施

不推荐使用

  • 需要强一致性保证(选 Seata TCC)
  • 要求毫秒级响应(选同步调用)
  • 业务逻辑极其简单(选本地事务)
  • 没有 MQ 运维能力(选数据库事件表)

5.1.2 与其他方案对比

方案 一致性 性能 开发成本 适用场景
事务消息 最终一致 异步解耦、多下游通知
Seata AT 最终一致 微服务间强关联事务
Seata TCC 最终一致 最高 高并发核心链路
本地事件表 最终一致 无 MQ 基础设施
同步 RPC 强一致 简单调用链

5.2 开发实践建议

5.2.1 事务监听器设计

最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 1. 保持监听器轻量,委托给 Service 处理
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return orderTransactionService.executeAndReturnState((OrderDTO) arg);
}

// 2. 回查方法必须幂等
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 查询数据库,不要执行写操作
return orderTransactionService.checkState(msg);
}

// 3. 记录详细日志,便于排查
log.info("事务执行:orderId={}, status={}", orderDTO.getOrderId(), state);

避免做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 1. 在监听器中执行复杂业务逻辑
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 错误:调用远程 RPC、执行复杂计算
remoteService.call(); // ❌
}

// 2. 回查时修改业务数据
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 错误:回查时执行业务操作
inventoryService.decreaseStock(); // ❌
}

// 3. 吞掉异常不处理
try {
orderService.create();
} catch (Exception e) {
// 错误:只打印日志不返回 ROLLBACK
log.error("失败", e); // ❌
}

5.2.2 本地事务表设计

推荐表结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE `transaction_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`transaction_id` varchar(64) NOT NULL COMMENT '事务 ID(UUID)',
`message_id` varchar(64) DEFAULT NULL COMMENT '消息 ID',
`business_key` varchar(128) NOT NULL COMMENT '业务主键(订单 ID 等)',
`status` tinyint(4) NOT NULL COMMENT '状态:0-未知 1-成功 2-失败',
`retry_count` int(11) DEFAULT '0' COMMENT '回查重试次数',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`ext_data` text COMMENT '扩展数据(JSON)',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_transaction_id` (`transaction_id`),
KEY `idx_business_key` (`business_key`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='事务日志表';

使用建议

  • ✅ 事务 ID 使用 UUID,避免重复
  • ✅ 记录业务主键,便于回查
  • ✅ 添加重试次数,监控异常情况
  • ✅ 定期归档历史数据(保留 7-30 天)

5.2.3 消息体设计

推荐格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"eventId": "uuid-1234-5678",
"eventType": "ORDER_CREATED",
"timestamp": 1692518400000,
"data": {
"orderId": 10001,
"userId": 888,
"amount": 199.00
},
"metadata": {
"source": "order-service",
"version": "1.0"
}
}

设计原则

  • ✅ 包含唯一事件 ID,支持幂等消费
  • ✅ 包含时间戳,支持延迟分析
  • ✅ 数据结构清晰,便于扩展
  • ✅ 控制大小在 100KB 以内

5.3 性能优化建议

5.3.1 提升吞吐量

批量发送(谨慎使用):

1
2
3
4
5
6
7
8
9
10
11
12
// 收集一定数量的订单后批量发送
List<OrderDTO> batch = new ArrayList<>();
for (OrderDTO order : orders) {
orderService.createOrder(order);
batch.add(order);

if (batch.size() >= 50) {
// 批量发送事务消息(需自定义实现)
transactionSender.sendBatch(batch);
batch.clear();
}
}

异步处理

1
2
3
4
// 使用线程池异步发送
executorService.submit(() -> {
rocketMQTemplate.sendMessageInTransaction(...);
});

5.3.2 降低延迟

优化配置

1
2
3
4
5
rocketmq:
producer:
send-message-timeout: 1000 # 降低超时时间(默认 3 秒)
tx-producer:
check-interval: 30 # 缩短回查间隔(默认 60 秒)

网络优化

  • 生产者和 Broker 同机房部署
  • 使用专线或 VPC 内网通信
  • 启用 TCP 长连接

5.4 容错和降级建议

5.4.1 重试策略

消费者重试

1
2
3
4
5
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "inventory-consumer-group",
maxReconsumeTimes = 5 // 最大重试 5 次(默认 16 次)
)

降级方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void onMessage(OrderDTO orderDTO) {
try {
pointsService.addPoints(orderDTO.getUserId(), orderDTO.getAmount());
} catch (Exception e) {
// 重试多次后仍然失败,记录到死信表
if (message.getReconsumeTimes() >= 5) {
deadLetterTable.save(orderDTO, e.getMessage());
// 触发人工处理
alertService.send("积分发放失败,请人工处理");
return; // 不再重试
}
throw new RuntimeException("处理失败", e);
}
}

5.4.2 监控告警

关键指标

  • 半消息积压数量(超过 1000 告警)
  • 事务回查成功率(低于 95% 告警)
  • 消息消费延迟(超过 5 分钟告警)
  • 死信队列增长速率

监控脚本示例

1
2
3
4
5
6
7
8
#!/bin/bash
# 检查半消息积压
HALF_MSG_COUNT=$(sh mqadmin queryMsgByUniqueKey -n "localhost:9876" \
-t RMQ_SYS_TRANS_HALF_TOPIC | wc -l)

if [ $HALF_MSG_COUNT -gt 1000 ]; then
echo "警告:半消息积压超过 1000 条" | mail -s "MQ 告警" ops@example.com
fi

5.5 常见问题 FAQ

Q1: 如何保证消息不丢失?

A: 三层保障:

  1. Broker 持久化:同步刷盘 + 同步复制
  2. 事务回查:超时未确认自动回查
  3. 死信队列:消费失败的消息单独存储

Q2: 如何处理重复消费?

A: 消费者必须实现幂等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 使用唯一事件 ID 去重
public void onMessage(OrderDTO orderDTO) {
String eventId = orderDTO.getEventId();

// 检查是否已处理
if (processedTable.exists(eventId)) {
log.warn("消息已处理:{}", eventId);
return;
}

// 处理业务
process(orderDTO);

// 记录已处理
processedTable.insert(eventId);
}

Q3: 事务回查失败怎么办?

A: 分级处理:

  1. 偶发失败:自动重试(最多 3 次)
  2. 持续失败:转入人工处理队列
  3. 数据不一致:触发对账修复

Q4: 如何选择 Topic 和 Tag?

A: 按业务域划分:

1
2
3
4
5
6
7
8
Topic: order-event       # 订单相关事件
Tag: created # 订单创建
Tag: paid # 订单支付
Tag: cancelled # 订单取消

Topic: payment-event # 支付相关事件
Tag: success # 支付成功
Tag: failed # 支付失败

5.6 总结

核心要点

  1. ✅ 事务消息适用于最终一致性场景,不适用于强一致性
  2. ✅ 核心价值是解耦,而非性能提升
  3. ✅ 必须实现幂等性和事务回查
  4. ✅ 需要完善的监控和告警机制
  5. ✅ 合理设计消息体和本地事务表

决策树

1
2
3
4
5
6
7
8
9
是否需要强一致性?
├─ 是 → 选择 Seata TCC/AT
└─ 否 → 是否可以接受秒级延迟?
├─ 否 → 选择同步 RPC 调用
└─ 是 → 是否需要解耦多个下游系统?
├─ 否 → 选择本地事务
└─ 是 → 是否有 MQ 基础设施?
├─ 否 → 选择数据库事件表
└─ 是 → ✅ 选择 RocketMQ 事务消息

最后建议

技术选型没有银弹,只有最适合。事务消息是优秀的最终一致性解决方案,但前提是团队具备相应的开发和运维能力。建议在非核心业务先行试点,积累经验后再推广到核心链路。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×