Hmily TCC 分布式事务源码解析

1. 请求订单服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class OrderServiceImpl implements OrderService {
public String orderPay(Integer count, BigDecimal amount) {
Order order = saveOrder(count, amount);
long start = System.currentTimeMillis();
paymentService.makePayment(order);
System.out.println("hmily-cloud分布式事务耗时:" + (System.currentTimeMillis() - start));
return "success";
}
}

/* =============================================== */

public class PaymentServiceImpl implements PaymentService {
// Try:预留资源
@HmilyTCC(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
public void makePayment(Order order) {
// 更新本地数据库订单状态
updateOrderStatus(order, OrderStatusEnum.PAYING);
// RPC调用付款服务
accountClient.payment(buildAccountDTO(order));
// RPC调用扣减库存服务
inventoryClient.decrease(buildInventoryDTO(order));
}

// Confirm:对Try阶段方法的确认
public void confirmOrderStatus(Order order) {
updateOrderStatus(order, OrderStatusEnum.PAY_SUCCESS);
LOGGER.info("=========进行订单confirm操作完成================");
}

// Cancel:对Try阶段方法的回滚
public void cancelOrderStatus(Order order) {
updateOrderStatus(order, OrderStatusEnum.PAY_FAIL);
LOGGER.info("=========进行订单cancel操作完成================");
}

private void updateOrderStatus(Order order, OrderStatusEnum orderStatus) {
order.setStatus(orderStatus.getCode());
orderMapper.update(order);
}
}

/* =============================================== */

@FeignClient(value = "account-service")
public interface AccountClient {
@RequestMapping("/account-service/account/payment")
@Hmily // 使用Hmily分布式事务框架
Boolean payment(@RequestBody AccountDTO accountDO);
}

2. Try 流程源码解析

  • @HmilyTCC 是 Hmily 框架处理 TCC 事务的切面,在初始化流程里初始化了 SpringHmilyTransactionAspect 切面,继承自 AbstractHmilyTransactionAspect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface HmilyTCC {
// Try、Confirm、Cancel方法的参数列表要一致
String confirmMethod() default "";
String cancelMethod() default "";
TransTypeEnum pattern() default TransTypeEnum.TCC;
}

/* =============================================== */

@Aspect
public abstract class AbstractHmilyTransactionAspect {
private final HmilyTransactionInterceptor interceptor = new HmilyGlobalInterceptor();

@Pointcut("@annotation(org.dromara.hmily.annotation.HmilyTCC) || @annotation(org.dromara.hmily.annotation.HmilyTAC) || @annotation(org.dromara.hmily.annotation.HmilyXA)")
public void hmilyInterceptor() {
}

@Around("hmilyInterceptor()")
public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return interceptor.invoke(proceedingJoinPoint);
}
}

/* =============================================== */

public class HmilyGlobalInterceptor implements HmilyTransactionInterceptor {
private static RpcParameterLoader parameterLoader;
private static final EnumMap<TransTypeEnum, HmilyTransactionHandlerRegistry> REGISTRY = new EnumMap<>(TransTypeEnum.class);

static {
// 1. 根据引入不同的RPC支持包,获取不同的RPC参数加载器(这里使用的是SpringCloud)
parameterLoader = Optional.ofNullable(ExtensionLoaderFactory.load(RpcParameterLoader.class)).orElse(new LocalParameterLoader());
}

static {
// 2. 注册不同模式的事务处理器,使用SPI加载不同的事务处理器
REGISTRY.put(TransTypeEnum.TCC, ExtensionLoaderFactory.load(HmilyTransactionHandlerRegistry.class, "tcc"));
REGISTRY.put(TransTypeEnum.TAC, ExtensionLoaderFactory.load(HmilyTransactionHandlerRegistry.class, "tac"));
REGISTRY.put(TransTypeEnum.XA, ExtensionLoaderFactory.load(HmilyTransactionHandlerRegistry.class, "xa"));
}

@Override
public Object invoke(final ProceedingJoinPoint pjp) throws Throwable {
HmilyTransactionContext context = parameterLoader.load();
return invokeWithinTransaction(context, pjp);
}

private Object invokeWithinTransaction(final HmilyTransactionContext context, final ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
// 3. 获取事务处理器(这里是TCC),进行事务处理
return getRegistry(signature.getMethod()).select(context).handleTransaction(point, context);
}

private HmilyTransactionHandlerRegistry getRegistry(final Method method) {
if (method.isAnnotationPresent(HmilyTCC.class)) {
return REGISTRY.get(TransTypeEnum.TCC);
} else if (method.isAnnotationPresent(HmilyTAC.class)) {
return REGISTRY.get(TransTypeEnum.TAC);
} else if (method.isAnnotationPresent(HmilyXA.class)) {
return REGISTRY.get(TransTypeEnum.XA);
} else {
return REGISTRY.get(TransTypeEnum.TAC);
}
}
}

/* =============================================== */

@HmilySPI(value = "springCloud")
public class SpringCloudParameterLoader implements RpcParameterLoader {
@Override
public HmilyTransactionContext load() {
HmilyTransactionContext hmilyTransactionContext = null;
try {
final RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
// 1.1 从Header中获取key为_HMILY_TRANSACTION_CONTEXT的值(第一次进入,返回的是null)
hmilyTransactionContext = RpcMediator.getInstance().acquire(key -> ((ServletRequestAttributes) requestAttributes).getRequest().getHeader(key));
} catch (IllegalStateException ex) {
LogUtil.warn(LOGGER, () -> "can not acquire request info:" + ex.getLocalizedMessage());
}
return hmilyTransactionContext;
}
}

/* =============================================== */

@HmilySPI("tcc")
public class HmilyTccTransactionHandlerRegistry extends AbstractHmilyTransactionHandlerRegistry {
@Override
public void register() {
// 2.1 注册了不同角色的事务处理器
getHandlers().put(HmilyRoleEnum.START, new StarterHmilyTccTransactionHandler());
getHandlers().put(HmilyRoleEnum.PARTICIPANT, new ParticipantHmilyTccTransactionHandler());
getHandlers().put(HmilyRoleEnum.CONSUMER, new ConsumeHmilyTccTransactionHandler());
getHandlers().put(HmilyRoleEnum.LOCAL, new LocalHmilyTccTransactionHandler());
}
}

/* =============================================== */

public abstract class AbstractHmilyTransactionHandlerRegistry implements HmilyTransactionHandlerRegistry {
@Override
public HmilyTransactionHandler select(final HmilyTransactionContext context) {
// 3.1 根据hmily事务上下文,不同的角色获取不同的事务处理器(这里上下文为null)
if (Objects.isNull(context)) {
return getHandler(HmilyRoleEnum.START);
} else {
if (context.getRole() == HmilyRoleEnum.LOCAL.getCode()) {
return getHandler(HmilyRoleEnum.LOCAL);
} else if (context.getRole() == HmilyRoleEnum.PARTICIPANT.getCode()
|| context.getRole() == HmilyRoleEnum.START.getCode()) {
return getHandler(HmilyRoleEnum.PARTICIPANT);
}
return getHandler(HmilyRoleEnum.CONSUMER);
}
}
}

/* =============================================== */

public class StarterHmilyTccTransactionHandler implements HmilyTransactionHandler, AutoCloseable {
@Override
public Object handleTransaction(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
Object returnValue;
// 3.2 记录metrics信息
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_TOTAL, new String[]{TransTypeEnum.TCC.name()});
LocalDateTime starterTime = LocalDateTime.now();
try {
// 3.3 执行preTry方法(最为重要)
HmilyTransaction hmilyTransaction = executor.preTry(point);
try {
// 3.4 执行切面进入点的原始Try方法(PaymentServiceImpl.makePayment)
returnValue = point.proceed();
// 3.5 Try执行成功事务日志状态
hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
executor.updateStartStatus(hmilyTransaction);
} catch (Throwable throwable) {
// 3.6 如果出现异常,异步执行Cancel方法
final HmilyTransaction currentTransaction = HmilyTransactionHolder.getInstance().getCurrentTransaction();
disruptor.getProvider().onData(() -> {
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_STATUS, new String[]{TransTypeEnum.TCC.name(), HmilyRoleEnum.START.name(), HmilyActionEnum.CANCELING.name()});
executor.globalCancel(currentTransaction);
});
throw throwable;
}
// 3.7 Try方法执行成功,执行Confirm方法
final HmilyTransaction currentTransaction = HmilyTransactionHolder.getInstance().getCurrentTransaction();
disruptor.getProvider().onData(() -> {
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_STATUS, new String[]{TransTypeEnum.TCC.name(), HmilyRoleEnum.START.name(), HmilyActionEnum.CONFIRMING.name()});
executor.globalConfirm(currentTransaction);
});
} finally {
// 3.8 清理资源与缓存
HmilyContextHolder.remove();
executor.remove();
// 3.9 记录调用耗时
MetricsReporter.recordTime(LabelNames.TRANSACTION_LATENCY, new String[]{TransTypeEnum.TCC.name()}, starterTime.until(LocalDateTime.now(), ChronoUnit.MILLIS));
}
return returnValue;
}
}

/* =============================================== */

public final class HmilyTccTransactionExecutor {
public HmilyTransaction preTry(final ProceedingJoinPoint point) {
LogUtil.debug(LOGGER, () -> "......hmily tcc transaction starter....");
// 3.3.1 创建主事务
HmilyTransaction hmilyTransaction = createHmilyTransaction();
// 3.3.2 存储主事务
HmilyRepositoryStorage.createHmilyTransaction(hmilyTransaction);
// 3.3.3 解析HmilyTCC注解,构建事务参与者(分支事务)
HmilyParticipant hmilyParticipant = buildHmilyParticipant(point, null, null, HmilyRoleEnum.START.getCode(), hmilyTransaction.getTransId());
// 3.3.4 存储事务参与者
HmilyRepositoryStorage.createHmilyParticipant(hmilyParticipant);
hmilyTransaction.registerParticipant(hmilyParticipant);
// 3.3.5 缓存主事务(ThreadLocal)
HmilyTransactionHolder.getInstance().set(hmilyTransaction);
// 3.3.6 创建事务上下文
HmilyTransactionContext context = new HmilyTransactionContext();
context.setAction(HmilyActionEnum.TRYING.getCode());
context.setTransId(hmilyTransaction.getTransId());
context.setRole(HmilyRoleEnum.START.getCode());
context.setTransType(TransTypeEnum.TCC.name());
// 3.3.7 设置事务上下文
HmilyContextHolder.set(context);
return hmilyTransaction;
}

private HmilyParticipant buildHmilyParticipant(final ProceedingJoinPoint point, final Long participantId, final Long participantRefId, final int role, final Long transId) {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
Object[] args = point.getArgs();
final HmilyTCC hmilyTCC = method.getAnnotation(HmilyTCC.class);
String confirmMethodName = hmilyTCC.confirmMethod();
String cancelMethodName = hmilyTCC.cancelMethod();
if (StringUtils.isBlank(confirmMethodName) || StringUtils.isBlank(cancelMethodName)) {
return null;
}
HmilyParticipant hmilyParticipant = new HmilyParticipant();
if (null == participantId) {
hmilyParticipant.setParticipantId(IdWorkerUtils.getInstance().createUUID());
} else {
hmilyParticipant.setParticipantId(participantId);
}
if (null != participantRefId) {
hmilyParticipant.setParticipantRefId(participantRefId);
}
Class<?> clazz = point.getTarget().getClass();
hmilyParticipant.setTransId(transId);
hmilyParticipant.setTransType(TransTypeEnum.TCC.name());
hmilyParticipant.setStatus(HmilyActionEnum.PRE_TRY.getCode());
hmilyParticipant.setRole(role);
hmilyParticipant.setTargetClass(clazz.getName());
hmilyParticipant.setTargetMethod(method.getName());
if (StringUtils.isNoneBlank(confirmMethodName)) {
hmilyParticipant.setConfirmMethod(confirmMethodName);
// 3.3.3.1 构建confirm类型的HmilyInvocation
HmilyInvocation confirmInvocation = new HmilyInvocation(clazz.getInterfaces()[0], method.getName(), method.getParameterTypes(), args);
hmilyParticipant.setConfirmHmilyInvocation(confirmInvocation);
}
if (StringUtils.isNoneBlank(cancelMethodName)) {
hmilyParticipant.setCancelMethod(cancelMethodName);
// 3.3.3.2 构建cancel类型的HmilyInvocation
HmilyInvocation cancelInvocation = new HmilyInvocation(clazz.getInterfaces()[0], method.getName(), method.getParameterTypes(), args);
hmilyParticipant.setCancelHmilyInvocation(cancelInvocation);
}
return hmilyParticipant;
}
}

/* =============================================== */

public class HmilyRepositoryStorage {
public static void createHmilyTransaction(final HmilyTransaction hmilyTransaction) {
if (Objects.nonNull(hmilyTransaction)) {
PUBLISHER.publishEvent(hmilyTransaction, EventTypeEnum.CREATE_HMILY_TRANSACTION.getCode());
}
}
}

public final class HmilyRepositoryEventPublisher implements AutoCloseable {
public void publishEvent(final HmilyTransaction hmilyTransaction, final int type) {
HmilyRepositoryEvent event = new HmilyRepositoryEvent();
event.setType(type);
event.setHmilyTransaction(hmilyTransaction);
event.setTransId(hmilyTransaction.getTransId());
push(event);
}

private void push(final HmilyRepositoryEvent event) {
// 根据hmily.config.asyncRepository的配置同步/异步存储事务日志
if (Objects.nonNull(hmilyConfig) && hmilyConfig.isAsyncRepository()) {
// 3.3.2.1 异步模式使用disruptor事件机制
disruptor.getProvider().onData(event);
} else {
// 3.3.2.2 同步模式直接存储
HmilyRepositoryEventDispatcher.getInstance().doDispatch(event);
}
}
}

/* =============================================== */

public class HmilyRepositoryEventConsumer implements HmilyDisruptorConsumer<HmilyRepositoryEvent> {
private ConsistentHashSelector executor;

public HmilyRepositoryEventConsumer(final ConsistentHashSelector executor) {
this.executor = executor;
}

@Override
public void execute(final HmilyRepositoryEvent event) {
Long transId = event.getTransId();
// 3.3.2.1.1 根据事务id一致性哈希算法,同一个事务id,会被同一线程顺序执行,保证了数据的正确性
executor.select(String.valueOf(transId)).execute(() -> {
HmilyRepositoryEventDispatcher.getInstance().doDispatch(event);
event.clear();
});
}
}

/* =============================================== */

public class HmilyContextHolder {
private static HmilyContext hmilyContext;

static {
HmilyConfig hmilyConfig = ConfigEnv.getInstance().getConfig(HmilyConfig.class);
if (Objects.isNull(hmilyConfig)) {
hmilyContext = new ThreadLocalHmilyContext();
} else {
// 3.3.7.1 根据SPI加载hmily.config.contextTransmittalMode配置属性,默认是ThreadLocal模式
// 另一种是阿里提供的跨线程ThreadLocal的实现TransmittableThreadLocal
hmilyContext = Optional.ofNullable(ExtensionLoaderFactory.load(HmilyContext.class, hmilyConfig.getContextTransmittalMode())).orElse(new ThreadLocalHmilyContext());
}
}

public static void set(final HmilyTransactionContext context) {
hmilyContext.set(context);
}
}
  • 由于项目依赖了 hmily-springcloud,在初始化时,会进行 Spring Cloud 支持的相关初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class HmilyFeignConfiguration {

// 对RPC调用进行参数的传递
@Bean
@Qualifier("hmilyFeignInterceptor")
public RequestInterceptor hmilyFeignInterceptor() {
return new HmilyFeignInterceptor();
}

// 对添加了Hmily注解的Bean实例进行代理
@Bean
public HmilyFeignBeanPostProcessor feignPostProcessor() {
return new HmilyFeignBeanPostProcessor();
}

// 处理hystrix跨线程传递参数问题
@Bean
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public HystrixConcurrencyStrategy hmilyHystrixConcurrencyStrategy() {
return new HmilyHystrixConcurrencyStrategy();
}
}
  • 进行扣款服务时,会进入 HmilyFeignHandler,源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// 3.4
public class HmilyFeignHandler implements InvocationHandler {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
} else {
// 1. 获取事务上下文
final HmilyTransactionContext context = HmilyContextHolder.get();
if (Objects.isNull(context)) { // 如果为空,进行正常调用
return this.delegate.invoke(proxy, method, args);
}
// 2. 调用方法上是否含有Hmily注解
final Hmily hmily = method.getAnnotation(Hmily.class);
if (Objects.isNull(hmily)) {
return this.delegate.invoke(proxy, method, args);
}
try {
// 3. 构建参与者对象,进行缓存
Long participantId = context.getParticipantId();
final HmilyParticipant hmilyParticipant = buildParticipant(method, args, context);
Optional.ofNullable(hmilyParticipant).ifPresent(participant -> context.setParticipantId(participant.getParticipantId()));
if (context.getRole() == HmilyRoleEnum.PARTICIPANT.getCode()) {
context.setParticipantRefId(participantId);
}
// 4. 发起真正的调用
final Object invoke = delegate.invoke(proxy, method, args);
// 5. 如果成功调用,缓存参与者对象至发起者
if (context.getRole() == HmilyRoleEnum.PARTICIPANT.getCode()) {
HmilyTransactionHolder.getInstance().registerParticipantByNested(participantId, hmilyParticipant);
} else {
HmilyTransactionHolder.getInstance().registerStarterParticipant(hmilyParticipant);
}
return invoke;
} catch (Throwable e) {
LOGGER.error("HmilyFeignHandler invoker exception :", e);
throw e;
}
}
}

private HmilyParticipant buildParticipant(final Method method, final Object[] args, final HmilyTransactionContext context) {
if (HmilyActionEnum.TRYING.getCode() != context.getAction()) {
return null;
}
HmilyParticipant hmilyParticipant = new HmilyParticipant();
hmilyParticipant.setParticipantId(IdWorkerUtils.getInstance().createUUID());
hmilyParticipant.setTransId(context.getTransId());
hmilyParticipant.setTransType(context.getTransType());
final Class<?> declaringClass = method.getDeclaringClass();
HmilyInvocation hmilyInvocation = new HmilyInvocation(declaringClass, method.getName(), method.getParameterTypes(), args);
hmilyParticipant.setConfirmHmilyInvocation(hmilyInvocation);
hmilyParticipant.setCancelHmilyInvocation(hmilyInvocation);
return hmilyParticipant;
}
}

/* =============================================== */

public class HmilyFeignInterceptor implements RequestInterceptor {
@Override
public void apply(final RequestTemplate requestTemplate) {
// 3.1 远程调用时,先在Header中设置事务上下文
RpcMediator.getInstance().transmit(requestTemplate::header, HmilyContextHolder.get());
}
}

/* =============================================== */

@Service("accountService")
public class AccountServiceImpl implements AccountService {
@Override
@HmilyTCC(confirmMethod = "confirm", cancelMethod = "cancel")
public boolean payment(final AccountDTO accountDTO) {
// 3.2 然后调用到Account服务
// 因为有HmilyTCC注解,会先进入切面,然后进入HmilyGlobalInterceptor,此时Header中设置了事务上下文
// 根据上下文的角色,会进入ParticipantHmilyTccTransactionHandler
LOGGER.info("============执行try付款接口===============");
// update account set balance = balance - #{amount}, freeze_amount= freeze_amount + #{amount},
// update_time = now() where user_id = #{userId} and balance >= #{amount}
accountMapper.update(accountDTO);
return Boolean.TRUE;
}
}

/* =============================================== */

public class ParticipantHmilyTccTransactionHandler implements HmilyTransactionHandler {
private final HmilyTccTransactionExecutor executor = HmilyTccTransactionExecutor.getInstance();

static {
// 3.2.1 注册事务角色类型metrics指标
MetricsReporter.registerCounter(LabelNames.TRANSACTION_STATUS, new String[]{"type", "role", "status"}, "collect hmily transaction count");
}

@Override
public Object handleTransaction(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
HmilyParticipant hmilyParticipant = null;
switch (HmilyActionEnum.acquireByCode(context.getAction())) {
case TRYING:
try {
// 3.2.2 执行preTry方法
hmilyParticipant = executor.preTryParticipant(context, point);
// 3.2.3 真正地执行业务方法(AccountServiceImpl.payment)
final Object proceed = point.proceed();
// 3.2.4 更新事务状态
hmilyParticipant.setStatus(HmilyActionEnum.TRYING.getCode());
HmilyRepositoryStorage.updateHmilyParticipantStatus(hmilyParticipant);
return proceed;
} catch (Throwable throwable) {
if (Objects.nonNull(hmilyParticipant)) {
HmilyParticipantCacheManager.getInstance().removeByKey(hmilyParticipant.getParticipantId());
}
// 3.2.5 删除参与者
HmilyRepositoryStorage.removeHmilyParticipant(hmilyParticipant);
throw throwable;
} finally {
HmilyContextHolder.remove();
}
case CONFIRMING:
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_STATUS, new String[]{TransTypeEnum.TCC.name(), HmilyRoleEnum.PARTICIPANT.name(), HmilyActionEnum.CONFIRMING.name()});
List<HmilyParticipant> confirmList = HmilyParticipantCacheManager.getInstance().get(context.getParticipantId());
return executor.participantConfirm(confirmList, context.getParticipantId());
case CANCELING:
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_STATUS, new String[]{TransTypeEnum.TCC.name(), HmilyRoleEnum.PARTICIPANT.name(), HmilyActionEnum.CANCELING.name()});
List<HmilyParticipant> cancelList = HmilyParticipantCacheManager.getInstance().get(context.getParticipantId());
return executor.participantCancel(cancelList, context.getParticipantId());
default:
break;
}
Method method = ((MethodSignature) (point.getSignature())).getMethod();
return DefaultValueUtils.getDefaultValue(method.getReturnType());
}
}

/* =============================================== */

public final class HmilyTccTransactionExecutor {
public HmilyParticipant preTryParticipant(final HmilyTransactionContext context, final ProceedingJoinPoint point) {
LogUtil.debug(LOGGER, "participant hmily tcc transaction start..:{}", context::toString);
// 3.2.2.1 构建参与者
final HmilyParticipant hmilyParticipant = buildHmilyParticipant(point, context.getParticipantId(), context.getParticipantRefId(), HmilyRoleEnum.PARTICIPANT.getCode(), context.getTransId());
// 3.2.2.2 缓存参与者到本地
HmilyTransactionHolder.getInstance().cacheHmilyParticipant(hmilyParticipant);
// 3.2.2.3 存储参与者
HmilyRepositoryStorage.createHmilyParticipant(hmilyParticipant);
// 3.2.2.4 设置角色
context.setRole(HmilyRoleEnum.PARTICIPANT.getCode());
// 3.2.2.5 设置事务上下文,支持且套调用
HmilyContextHolder.set(context);
return hmilyParticipant;
}
}

3. Confirm 流程源码解析

  • Hmily 框架中所有的 Confirm 流程都是由分布式事务发起方调用的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// 3.7
public class StarterHmilyTccTransactionHandler implements HmilyTransactionHandler, AutoCloseable {
@Override
public Object handleTransaction(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
......
// Try方法执行成功,且没有异常时,使用disruptor队列异步执行Confirm方法
final HmilyTransaction currentTransaction = HmilyTransactionHolder.getInstance().getCurrentTransaction();
disruptor.getProvider().onData(() -> {
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_STATUS, new String[]{TransTypeEnum.TCC.name(), HmilyRoleEnum.START.name(), HmilyActionEnum.CONFIRMING.name()});
executor.globalConfirm(currentTransaction);
});
}
}

/* =============================================== */

public final class HmilyTccTransactionExecutor {
public void globalConfirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException {
LogUtil.debug(LOGGER, () -> "hmily transaction confirm .......!start");
if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
return;
}
currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
// 1. 更新事务状态为confirm
HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction);
// 2. 从本地缓存中获取所有的参与者对象
final List<HmilyParticipant> hmilyParticipants = currentTransaction.getHmilyParticipants();
List<Boolean> successList = new ArrayList<>();
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
// 3. 如果参与者的角色是发起者(order模块既是事务的发起者也是事务的参与者)
if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) { // 执行本地调用
HmilyReflector.executor(HmilyActionEnum.CONFIRMING, ExecutorTypeEnum.LOCAL, hmilyParticipant);
HmilyRepositoryStorage.removeHmilyParticipant(hmilyParticipant);
} else { // 进行RPC调用
HmilyReflector.executor(HmilyActionEnum.CONFIRMING, ExecutorTypeEnum.RPC, hmilyParticipant);
}
successList.add(true);
} catch (Throwable e) {
successList.add(false);
LOGGER.error("HmilyParticipant confirm exception param:{} ", hmilyParticipant.toString(), e);
} finally {
HmilyContextHolder.remove();
}
}
if (successList.stream().allMatch(e -> e)) {
// 4. 如果每个参与者都执行成功,删除主事务
HmilyRepositoryStorage.removeHmilyTransaction(currentTransaction);
}
}
}

/* =============================================== */

public class HmilyReflector {
public static Object executor(final HmilyActionEnum action, final ExecutorTypeEnum executorType, final HmilyParticipant hmilyParticipant) throws Exception {
// 3.1 设置事务上下文
setContext(action, hmilyParticipant);
if (executorType == ExecutorTypeEnum.RPC && hmilyParticipant.getRole() != HmilyRoleEnum.START.getCode()) {
if (action == HmilyActionEnum.CONFIRMING) { // 如果是confirm状态,执行confirm方法
return executeRpc(hmilyParticipant.getConfirmHmilyInvocation());
} else { // 执行cancel方法
return executeRpc(hmilyParticipant.getCancelHmilyInvocation());
}
} else {
if (action == HmilyActionEnum.CONFIRMING) {
return executeLocal(hmilyParticipant.getConfirmHmilyInvocation(), hmilyParticipant.getTargetClass(), hmilyParticipant.getConfirmMethod());
} else {
return executeLocal(hmilyParticipant.getCancelHmilyInvocation(), hmilyParticipant.getTargetClass(), hmilyParticipant.getCancelMethod());
}
}
}

private static void setContext(final HmilyActionEnum action, final HmilyParticipant hmilyParticipant) {
HmilyTransactionContext context = new HmilyTransactionContext();
context.setAction(action.getCode()); // CONFIRMING
context.setTransId(hmilyParticipant.getTransId());
context.setParticipantId(hmilyParticipant.getParticipantId());
context.setRole(HmilyRoleEnum.START.getCode());
context.setTransType(hmilyParticipant.getTransType());
HmilyContextHolder.set(context);
}

private static Object executeLocal(final HmilyInvocation hmilyInvocation, final String className, final String methodName) throws Exception {
if (Objects.isNull(hmilyInvocation)) {
return null;
}
// 获取class对象
final Class<?> clazz = Class.forName(className);
final Object[] args = hmilyInvocation.getArgs();
final Class<?>[] parameterTypes = hmilyInvocation.getParameterTypes();
final Object bean = SingletonHolder.INST.get(ObjectProvide.class).provide(clazz);
// 发起反射调用
return MethodUtils.invokeMethod(bean, methodName, args, parameterTypes);
}

private static Object executeRpc(final HmilyInvocation hmilyInvocation) throws Exception {
if (Objects.isNull(hmilyInvocation)) {
return null;
}
final Class<?> clazz = hmilyInvocation.getTargetClass();
final String method = hmilyInvocation.getMethodName();
final Object[] args = hmilyInvocation.getArgs();
final Class<?>[] parameterTypes = hmilyInvocation.getParameterTypes();
// 获取提供者对象,如果是Spring对象,则获取其bean实例,否则是反射获取对象
final Object bean = SingletonHolder.INST.get(ObjectProvide.class).provide(clazz);
// 发起反射调用
return MethodUtils.invokeMethod(bean, method, args, parameterTypes);
}
}

/* =============================================== */

public class ParticipantHmilyTccTransactionHandler implements HmilyTransactionHandler {
@Override
public Object handleTransaction(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
HmilyParticipant hmilyParticipant = null;
switch (HmilyActionEnum.acquireByCode(context.getAction())) {
case TRYING:
// ......
case CONFIRMING:
// 再次执行RPC调用时,会进入CONFIRMING分支
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_STATUS, new String[]{TransTypeEnum.TCC.name(), HmilyRoleEnum.PARTICIPANT.name(), HmilyActionEnum.CONFIRMING.name()});
List<HmilyParticipant> confirmList = HmilyParticipantCacheManager.getInstance().get(context.getParticipantId());
// 执行参与者的confirm方法
return executor.participantConfirm(confirmList, context.getParticipantId());
case CANCELING:
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_STATUS, new String[]{TransTypeEnum.TCC.name(), HmilyRoleEnum.PARTICIPANT.name(), HmilyActionEnum.CANCELING.name()});
List<HmilyParticipant> cancelList = HmilyParticipantCacheManager.getInstance().get(context.getParticipantId());
return executor.participantCancel(cancelList, context.getParticipantId());
default:
break;
}
Method method = ((MethodSignature) (point.getSignature())).getMethod();
return DefaultValueUtils.getDefaultValue(method.getReturnType());
}
}

/* =============================================== */

public final class HmilyTccTransactionExecutor {
public Object participantConfirm(final List<HmilyParticipant> hmilyParticipantList, final Long selfParticipantId) {
if (CollectionUtils.isEmpty(hmilyParticipantList)) {
return null;
}
List<Object> results = Lists.newArrayListWithCapacity(hmilyParticipantList.size());
for (HmilyParticipant hmilyParticipant : hmilyParticipantList) {
try {
if (hmilyParticipant.getParticipantId().equals(selfParticipantId)) {
// 本地反射执行
final Object result = HmilyReflector.executor(HmilyActionEnum.CONFIRMING, ExecutorTypeEnum.LOCAL, hmilyParticipant);
results.add(result);
// 删除本地参与者对象
HmilyRepositoryStorage.removeHmilyParticipant(hmilyParticipant);
} else {
final Object result = HmilyReflector.executor(HmilyActionEnum.CONFIRMING, ExecutorTypeEnum.RPC, hmilyParticipant);
results.add(result);
}
} catch (Throwable throwable) {
throw new HmilyRuntimeException(" hmilyParticipant execute confirm exception:" + hmilyParticipant.toString());
} finally {
HmilyContextHolder.remove();
}
}
// 清空缓存
HmilyParticipantCacheManager.getInstance().removeByKey(selfParticipantId);
return results.get(0);
}
}
  • 如果没有都提交成功,主事务日志不会删除,如果在 Confirm 阶段没有提交成功,则依赖定时任务进行事务恢复,再次提交

4. Cancel 流程源码解析

  • Hmily 框架中所有的 Cancel 流程都是分布式事务发起方发现在 Try 阶段有异常时调用的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// 3.6
public class StarterHmilyTccTransactionHandler implements HmilyTransactionHandler, AutoCloseable {
@Override
public Object handleTransaction(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
// ......
HmilyTransaction hmilyTransaction = executor.preTry(point);
try {
returnValue = point.proceed();
hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
executor.updateStartStatus(hmilyTransaction);
} catch (Throwable throwable) {
// Try阶段有异常时,使用disruptor队列执行Cancel方法
final HmilyTransaction currentTransaction = HmilyTransactionHolder.getInstance().getCurrentTransaction();
disruptor.getProvider().onData(() -> {
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_STATUS, new String[]{TransTypeEnum.TCC.name(), HmilyRoleEnum.START.name(), HmilyActionEnum.CANCELING.name()});
executor.globalCancel(currentTransaction);
});
throw throwable;
}
}
}

/* =============================================== */

public final class HmilyTccTransactionExecutor {
public void globalCancel(final HmilyTransaction currentTransaction) {
LogUtil.debug(LOGGER, () -> "tcc cancel ...........start!");
if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
return;
}
currentTransaction.setStatus(HmilyActionEnum.CANCELING.getCode());
// 更新事务日志状态为cancel
HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction);
final List<HmilyParticipant> hmilyParticipants = currentTransaction.getHmilyParticipants();
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) {
// 如果是发起者,执行本地调用
HmilyReflector.executor(HmilyActionEnum.CANCELING, ExecutorTypeEnum.LOCAL, hmilyParticipant);
HmilyRepositoryStorage.removeHmilyParticipant(hmilyParticipant);
} else {
// 执行远端RPC调用
HmilyReflector.executor(HmilyActionEnum.CANCELING, ExecutorTypeEnum.RPC, hmilyParticipant);
}
} catch (Throwable e) {
LOGGER.error("HmilyParticipant cancel exception :{}", hmilyParticipant.toString(), e);
} finally {
HmilyContextHolder.remove();
}
}
}
}

/* =============================================== */

public class ParticipantHmilyTccTransactionHandler implements HmilyTransactionHandler {
@Override
public Object handleTransaction(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
HmilyParticipant hmilyParticipant = null;
switch (HmilyActionEnum.acquireByCode(context.getAction())) {
case TRYING:
// ......
case CONFIRMING:
// ......
case CANCELING:
MetricsReporter.counterIncrement(LabelNames.TRANSACTION_STATUS, new String[]{TransTypeEnum.TCC.name(), HmilyRoleEnum.PARTICIPANT.name(), HmilyActionEnum.CANCELING.name()});
List<HmilyParticipant> cancelList = HmilyParticipantCacheManager.getInstance().get(context.getParticipantId());
return executor.participantCancel(cancelList, context.getParticipantId());
default:
break;
}
Method method = ((MethodSignature) (point.getSignature())).getMethod();
return DefaultValueUtils.getDefaultValue(method.getReturnType());
}
}

/* =============================================== */

public final class HmilyTccTransactionExecutor {
public Object participantCancel(final List<HmilyParticipant> hmilyParticipants, final Long selfParticipantId) {
LogUtil.debug(LOGGER, () -> "tcc cancel ...........start!");
if (CollectionUtils.isEmpty(hmilyParticipants)) {
return null;
}
//if cc pattern,can not execute cancel
//update cancel
HmilyParticipant selfHmilyParticipant = filterSelfHmilyParticipant(hmilyParticipants);
if (Objects.nonNull(selfHmilyParticipant)) {
selfHmilyParticipant.setStatus(HmilyActionEnum.CANCELING.getCode());
HmilyRepositoryStorage.updateHmilyParticipantStatus(selfHmilyParticipant);
}
List<Object> results = Lists.newArrayListWithCapacity(hmilyParticipants.size());
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
if (hmilyParticipant.getParticipantId().equals(selfParticipantId)) {
// 发起发射调用
final Object result = HmilyReflector.executor(HmilyActionEnum.CANCELING, ExecutorTypeEnum.LOCAL, hmilyParticipant);
results.add(result);
// 删除参与者
HmilyRepositoryStorage.removeHmilyParticipant(hmilyParticipant);
} else {
final Object result = HmilyReflector.executor(HmilyActionEnum.CANCELING, ExecutorTypeEnum.RPC, hmilyParticipant);
results.add(result);
}
} catch (Throwable throwable) {
throw new HmilyRuntimeException(" hmilyParticipant execute cancel exception:" + hmilyParticipant.toString());
} finally {
HmilyContextHolder.remove();
}
}
HmilyParticipantCacheManager.getInstance().removeByKey(selfParticipantId);
return results.get(0);
}
}
  • 如果没有都执行成功,主事务日志不会删除,如果在 Cancel 阶段没有执行成功,则依赖定时任务进行事务恢复,再次回滚

参考

  • 《深入理解分布式事务:原理与实战》