Hmily 框架初始流程源码解析

1. 框架入口

1
2
3
4
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-spring-boot-starter-springcloud</artifactId>
</dependency>
  • Hmily 框架会随着应用程序的启动而启动,通过 spring-boot-starter 可以找到它的初始化类 HmilyAutoConfiguration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Configuration
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class HmilyAutoConfiguration {

// 处理添加了@HmlyTCC注解的切面入口
@Bean
public SpringHmilyTransactionAspect hmilyTransactionAspect() {
return new SpringHmilyTransactionAspect();
}

// 支持使用注解调用的RPC框架
@Bean
@ConditionalOnProperty(value = "hmily.support.rpc.annotation", havingValue = "true")
public BeanPostProcessor refererAnnotationBeanPostProcessor() {
return new RefererAnnotationBeanPostProcessor();
}

// 框架启动初始化类
@Bean
@Qualifier("hmilyTransactionBootstrap")
@Primary
public HmilyApplicationContextAware hmilyTransactionBootstrap() {
return new HmilyApplicationContextAware();
}
}

/* =============================================== */

public class HmilyApplicationContextAware implements ApplicationContextAware, BeanFactoryPostProcessor {

@Override
public void setApplicationContext(@NonNull final ApplicationContext applicationContext) throws BeansException {
SpringBeanUtils.INSTANCE.setCfgContext((ConfigurableApplicationContext) applicationContext);
SingletonHolder.INST.register(ObjectProvide.class, new SpringBeanProvide());
}

@Override
public void postProcessBeanFactory(@NonNull final ConfigurableListableBeanFactory beanFactory) throws BeansException {
// 整个框架的初始化入口
HmilyBootstrap.getInstance().start();
}
}

/* =============================================== */

public final class HmilyBootstrap {
public void start() {
try {
// 1. 加载框架的配置
ConfigLoaderServer.load();
HmilyConfig hmilyConfig = ConfigEnv.getInstance().getConfig(HmilyConfig.class);
check(hmilyConfig);
// 2. 注册对象的提供者。默认使用反射方式获取,如果是Spring环境,则通过Spring Bean的代理方式获取
registerProvide();
// 3. 初始化事务日志资源
loadHmilyRepository(hmilyConfig);
// 4. 初始化事务恢复调度和事件分发器,注册关闭资源接口
registerAutoCloseable(new HmilyTransactionSelfRecoveryScheduled(), HmilyRepositoryEventPublisher.getInstance());
// 5. 初始化metrics监控信息
initMetrics();
} catch (Exception e) {
LOGGER.error(" hmily init exception:", e);
System.exit(0);
}
new HmilyLogo().logo();
}
}

2. 加载框架的配置

  • Hmily 提供了多种加载配置的方式,包括本地模式、ZooKeeper、Nacos、Apollo、ETCD、Consul 等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
public class ConfigLoaderServer {
public static void load() {
// 1. 扫描所有的配置bean
ConfigScan.scan();
ServerConfigLoader loader = new ServerConfigLoader();
// 2. 加载配置
loader.load(ConfigLoader.Context::new, (context, config) -> {
if (config != null) {
if (StringUtils.isNotBlank(config.getConfigMode())) {
String configMode = config.getConfigMode();
ConfigLoader<?> configLoader = ExtensionLoaderFactory.load(ConfigLoader.class, configMode);
log.info("Load the configuration【{}】information...", configMode);
// 3. 进行配置属性的加载
configLoader.load(context, (contextAfter, configAfter) -> {
log.info("Configuration information: {}", configAfter);
});
}
}
});
}
}

/* =============================================== */

public final class ConfigScan {
public static void scan() {
// 1.1 使用SPI的方式加载所有实现了Config接口的配置类(如NacosConfig、HmilyDatabaseConfig、EtcdConfig等)
List<Config> configs = ExtensionLoaderFactory.loadAll(Config.class);
for (Config conf : configs) {
// 1.2 将配置类进行缓存
ConfigEnv.getInstance().registerConfig(conf);
}
}
}

/* =============================================== */

public class ServerConfigLoader implements ConfigLoader<HmilyServer> {
private final YamlPropertyLoader propertyLoader = new YamlPropertyLoader();

@Override
public void load(final Supplier<Context> context, final LoaderHandler<HmilyServer> handler) {
// 2.1 加载hmily.yml配置文件
String filePath = System.getProperty("hmily.conf"); // -Dhmily.conf(第一优先级)
File configFile;
if (StringUtils.isBlank(filePath)) {
String dirPath = getDirGlobal(); // user.dir(第二优先级)
configFile = new File(dirPath);
if (configFile.exists()) {
filePath = dirPath;
} else {
ClassLoader loader = ConfigLoader.class.getClassLoader(); // resource(第三优先级)
URL url = loader.getResource("hmily.yml");
if (url != null) {
filePath = url.getFile();
configFile = new File(filePath);
} else {
throw new ConfigException("ConfigLoader:loader config error,error file path:" + filePath);
}
}
} else {
configFile = new File(filePath);
if (!configFile.exists()) {
throw new ConfigException("ConfigLoader:loader config error,error file path:" + filePath);
}
}
try (FileInputStream inputStream = new FileInputStream(configFile)) {
List<PropertyKeySource<?>> propertyKeySources = propertyLoader.load(filePath, inputStream);
OriginalConfigLoader original = new OriginalConfigLoader();
// 2.2 将配置文件转为HmilyServer配置类
againLoad(() -> context.get().with(propertyKeySources, original), handler, HmilyServer.class);
} catch (IOException e) {
throw new ConfigException("ConfigLoader:loader config error,file path:" + filePath);
}
}

private String getDirGlobal() {
String userDir = System.getProperty("user.dir");
String fileName = "hmily.yml";
return String.join(String.valueOf(File.separatorChar), userDir, fileName);
}
}

/* =============================================== */

public interface ConfigLoader<T extends Config> {
default void againLoad(final Supplier<Context> context, final LoaderHandler<T> handler, final Class<T> tClass) {
T config = ConfigEnv.getInstance().getConfig(tClass);
for (PropertyKeySource<?> propertyKeySource : context.get().getSource()) {
ConfigPropertySource configPropertySource = new DefaultConfigPropertySource<>(propertyKeySource, PropertyKeyParse.INSTANCE);
Binder binder = Binder.of(configPropertySource);
// 2.2.1 获取当前配置模式(这里是本地模式)
T newConfig = binder.bind(config.prefix(), BindData.of(DataType.of(tClass), () -> config));
// 2.2.2 回调,完成配置的加载
handler.finish(context, newConfig);
}
}
}

/* =============================================== */

public class OriginalConfigLoader implements ConfigLoader<Config> {
@Override
public void load(final Supplier<Context> context, final LoaderHandler<Config> handler) {
for (PropertyKeySource<?> propertyKeySource : context.get().getSource()) {
ConfigPropertySource configPropertySource = new DefaultConfigPropertySource<>(propertyKeySource, PropertyKeyParse.INSTANCE);
ConfigEnv.getInstance().stream()
.filter(e -> !e.isLoad())
.map(e -> {
// 3.1 进行属性匹配前缀绑定(hmily.server)
Config config = getBind(e, configPropertySource);
if (config != null) {
@SuppressWarnings("unchecked")
Map<String, Object> source = (Map<String, Object>) propertyKeySource.getSource();
config.setSource(source);
}
return config;
}).filter(Objects::nonNull).peek(Config::flagLoad)
.forEach(e -> handler.finish(context, e));
}
}

private Config getBind(final Config config, final ConfigPropertySource configPropertySource) {
Binder binder = Binder.of(configPropertySource);
return binder.bind(config.prefix(), BindData.of(DataType.of(config.getClass()), () -> config));
}
}

3. 初始化事务日志存储

  • 分布式事务的日志至关重要,Hmily 支持同步和异步 2 种方式来存储日志,并且支持 File、Redis、ZooKeeper、MySQL、Oracle、PostgreSQL、SQLServer、ETCD、MongoDB 等多种模式来存储日志,对应的模块为 hmily-repository。Hmily 事务日志存储的架构图如下

  • TCC 事务日志的结构主要由 HmilyTransaction、HmilyParticipant 和 HmilyInvocation 类组成
    • HmilyTransaction 是主事务实体类,包含多个 HmilyParticipant
    • HmilyParticipant 是分支事务的实体类,包含多个 HmilyInvocation
    • HmilyInvocation 是事务方法的参数列表实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class HmilyTransaction implements Serializable {
// transaction id.
private Long transId;
// app name.
private String appName;
// transaction status.
private int status;
// trans type.
private String transType;
// retry.
private Integer retry = 0;
// version number mysql optimistic lock control.
private Integer version = 1;
// createTime.
private Date createTime;
// updateTime.
private Date updateTime;
// A collection of methods that participate in coordination.
private List<HmilyParticipant> hmilyParticipants;
}

/* =============================================== */

public class HmilyParticipant implements Serializable {
// participant id.
private Long participantId;
// participant ref id.
private Long participantRefId;
// transaction id.
private Long transId;
// trans type.
private String transType;
// transaction status.
private Integer status;
// app name.
private String appName;
// transaction role .
private int role;
// retry.
private int retry;
// Call interface name.
private String targetClass;
// Call interface method name.
private String targetMethod;
// confirm Method.
private String confirmMethod;
// cancel Method.
private String cancelMethod;
// version.
private Integer version = 1;
// createTime.
private Date createTime;
// updateTime.
private Date updateTime;
// confirm hmilyInvocation.
private HmilyInvocation confirmHmilyInvocation;
// cancel hmilyInvocation.
private HmilyInvocation cancelHmilyInvocation;
}

/* =============================================== */

public class HmilyInvocation implements Serializable {
@Getter
private Class<?> targetClass;
@Getter
private String methodName;
@Getter
private Class<?>[] parameterTypes;
@Getter
private Object[] args;
}
  • 事务日志初始化方法 loadHmilyRepository() 的源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public final class HmilyBootstrap {
private void loadHmilyRepository(final HmilyConfig hmilyConfig) {
// 1. SPI获取配置的序列化方式(hmily.config.serializer)
HmilySerializer hmilySerializer = ExtensionLoaderFactory.load(HmilySerializer.class, hmilyConfig.getSerializer());
// 2. SPI获取配置的事务日志存储(hmily.config.repository=mysql)
HmilyRepository hmilyRepository = ExtensionLoaderFactory.load(HmilyRepository.class, hmilyConfig.getRepository());
hmilyRepository.setSerializer(hmilySerializer);
// 3. 初始化(这里用的是MySQL数据库来存储事务日志)
hmilyRepository.init(buildAppName(hmilyConfig));
HmilyRepositoryFacade.getInstance().setHmilyRepository(hmilyRepository);
}
}

/* =============================================== */

public abstract class AbstractHmilyDatabase implements HmilyRepository {
public void init(final String appName) {
this.appName = appName;
try {
// 3.1 获取数据库配置
HmilyDatabaseConfig hmilyDatabaseConfig = ConfigEnv.getInstance().getConfig(HmilyDatabaseConfig.class);
// 3.2 初始化数据库连接池
HikariDataSource hikariDataSource = new HikariDataSource();
hikariDataSource.setJdbcUrl(hmilyDatabaseConfig.getUrl());
hikariDataSource.setDriverClassName(hmilyDatabaseConfig.getDriverClassName());
hikariDataSource.setUsername(hmilyDatabaseConfig.getUsername());
hikariDataSource.setPassword(hmilyDatabaseConfig.getPassword());
hikariDataSource.setMaximumPoolSize(hmilyDatabaseConfig.getMaxActive());
hikariDataSource.setMinimumIdle(hmilyDatabaseConfig.getMinIdle());
hikariDataSource.setConnectionTimeout(hmilyDatabaseConfig.getConnectionTimeout());
hikariDataSource.setIdleTimeout(hmilyDatabaseConfig.getIdleTimeout());
hikariDataSource.setMaxLifetime(hmilyDatabaseConfig.getMaxLifetime());
hikariDataSource.setConnectionTestQuery(hmilyDatabaseConfig.getConnectionTestQuery());
if (hmilyDatabaseConfig.getPropertyMap() != null && !hmilyDatabaseConfig.getPropertyMap().isEmpty()) {
hmilyDatabaseConfig.getPropertyMap().forEach(hikariDataSource::addDataSourceProperty);
}
HmilyConfig hmilyConfig = ConfigEnv.getInstance().getConfig(HmilyConfig.class);
this.dataSource = hikariDataSource;
if (hmilyConfig.isAutoSql()) { // 默认true
// 3.3 执行初始化脚本
this.initScript(hmilyDatabaseConfig);
}
} catch (Exception e) {
log.error("hmily jdbc log init exception please check config:{}", e.getMessage());
throw new HmilyRuntimeException(e.getMessage());
}
}
}

/* =============================================== */

public class MysqlRepository extends AbstractHmilyDatabase {
protected void initScript(final HmilyDatabaseConfig config) throws Exception {
String jdbcUrl = StringUtils.replace(config.getUrl(), "/hmily", "/");
// 3.3.1 获取连接
Connection conn = DriverManager.getConnection(jdbcUrl, config.getUsername(), config.getPassword());
ScriptRunner runner = new ScriptRunner(conn);
runner.setLogWriter(null);
runner.setAutoCommit(false);
Resources.setCharset(StandardCharsets.UTF_8);
// 3.3.2 加载resource/mysql下的schema.sql脚本
Reader read = Resources.getResourceAsReader(SQL_FILE_PATH);
// 3.3.3 执行脚本
runner.runScript(read);
conn.commit();
runner.closeConnection();
conn.close();
}
}

4. 初始化事务恢复调度器

  • 事务恢复是框架的核心功能之一,Hmily 框架采用定时任务的方式进行事务恢复。相关源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HmilyTransactionSelfRecoveryScheduled implements AutoCloseable {
public HmilyTransactionSelfRecoveryScheduled() {
hmilyRepository = ExtensionLoaderFactory.load(HmilyRepository.class, hmilyConfig.getRepository());
// tcc事务恢复单线程池
this.selfTccRecoveryExecutor = new ScheduledThreadPoolExecutor(1, HmilyThreadFactory.create("hmily-tcc-self-recovery", true));
this.selfTacRecoveryExecutor = new ScheduledThreadPoolExecutor(1, HmilyThreadFactory.create("hmily-tac-self-recovery", true));
// 事务日志清理线程池
this.cleanHmilyTransactionExecutor = new ScheduledThreadPoolExecutor(1, HmilyThreadFactory.create("hmily-transaction-clean", true));
hmilyTransactionRecoveryService = new HmilyTransactionRecoveryService();
// 进行TCC事务恢复
selfTccRecovery();
selfTacRecovery();
// 清理无用的事务日志
cleanHmilyTransaction();
// 删除过期的日志
phyDeleted();
}
}

5. 初始化事件分发器

  • Hmily 采用高性能队列 disruptor 进行事务日志的异步存储。相关源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public final class HmilyRepositoryEventPublisher implements AutoCloseable {
private static final HmilyRepositoryEventPublisher INSTANCE = new HmilyRepositoryEventPublisher();
private HmilyDisruptor<HmilyRepositoryEvent> disruptor;
private final HmilyConfig hmilyConfig = ConfigEnv.getInstance().getConfig(HmilyConfig.class);

private HmilyRepositoryEventPublisher() {
start();
}

public static HmilyRepositoryEventPublisher getInstance() {
return INSTANCE;
}

private void start() {
List<SingletonExecutor> selects = new ArrayList<>();
for (int i = 0; i < hmilyConfig.getConsumerThreads(); i++) {
selects.add(new SingletonExecutor("hmily-log-disruptor" + i));
}
// 根据事务id一致性哈希的线程选择器
ConsistentHashSelector selector = new ConsistentHashSelector(selects);
// 创建并启动disruptor
disruptor = new HmilyDisruptor<>(new HmilyRepositoryEventConsumer(selector), 1, hmilyConfig.getBufferSize());
disruptor.startup();
}

@Override
public void close() {
disruptor.getProvider().shutdown();
}
}

/* =============================================== */

public class HmilyDisruptor<T> {
public void startup() {
// 创建多生产者的消费队列
Disruptor<DataEvent<T>> disruptor = new Disruptor<>(new DisruptorEventFactory<>(),
size,
HmilyThreadFactory.create("disruptor_consumer_" + consumer.fixName(), false),
ProducerType.MULTI,
new BlockingWaitStrategy());
// 创建工作线程池
HmilyDisruptorWorkHandler<T>[] workerPool = new HmilyDisruptorWorkHandler[consumerSize];
for (int i = 0; i < consumerSize; i++) {
workerPool[i] = new HmilyDisruptorWorkHandler<>(consumer);
}
disruptor.handleEventsWithWorkerPool(workerPool);
// 设置异常策略
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
// 启动
disruptor.start();
RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
provider = new DisruptorProvider<>(ringBuffer, disruptor);
}
}

6. 初始化 Metrics 监控信息

  • Metrics 信息主要是 Hmily 框架用来监控事务执行状态的指标,对应的模块为 hmily-metrics。Metrics 主要分为两个部分:
    • 应用的 JVM 信息:内存、CPU、线程使用等
    • 事务信息:包括事务的总数、事务的迟延时间、事务的状态、事务执行成功的数量、事务执行失败的数量等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public final class HmilyBootstrap {
private void initMetrics() {
HmilyMetricsConfig metricsConfig = ConfigEnv.getInstance().getConfig(HmilyMetricsConfig.class);
// 配置了才初始化
if (Objects.nonNull(metricsConfig) && StringUtils.isNoneBlank(metricsConfig.getMetricsName())) {
MetricsTrackerFacade facade = new MetricsTrackerFacade();
facade.start(metricsConfig);
registerAutoCloseable(facade);
}
}
}

/* =============================================== */

public final class MetricsTrackerFacade implements AutoCloseable {
public void start(final HmilyMetricsConfig metricsConfig) {
if (this.isStarted.compareAndSet(false, true)) {
// SPI获取启动类(目前只支持Promethues)
metricsBootService = ExtensionLoaderFactory.load(MetricsBootService.class, metricsConfig.getMetricsName());
Preconditions.checkNotNull(metricsBootService,
"Can not find metrics tracker manager with metrics name : %s in metrics configuration.", metricsConfig.getMetricsName());
// 启动
metricsBootService.start(metricsConfig, ExtensionLoaderFactory.load(MetricsRegister.class, metricsConfig.getMetricsName()));
} else {
log.info("metrics tracker has started !");
}
}
}

/* =============================================== */

public final class PrometheusBootService implements MetricsBootService {
private HTTPServer server;
private volatile AtomicBoolean registered = new AtomicBoolean(false);

@Override
public void start(final HmilyMetricsConfig metricsConfig, final MetricsRegister register) {
startServer(metricsConfig);
MetricsReporter.register(register);
}

@Override
public void stop() {
if (server != null) {
// 关闭资源
server.stop();
registered.set(false);
// 清空注册器
CollectorRegistry.defaultRegistry.clear();
}
}

private void startServer(final HmilyMetricsConfig metricsConfig) {
// 注册metrics指标
register(metricsConfig.getJmxConfig());
int port = metricsConfig.getPort();
String host = metricsConfig.getHost();
InetSocketAddress inetSocketAddress;
if (null == host || "".equalsIgnoreCase(host)) {
inetSocketAddress = new InetSocketAddress(port);
} else {
inetSocketAddress = new InetSocketAddress(host, port);
}
try {
// 启动httpServer,提供metrics信息
server = new HTTPServer(inetSocketAddress, CollectorRegistry.defaultRegistry, true);
log.info(String.format("Prometheus metrics HTTP server `%s:%s` start success.", inetSocketAddress.getHostString(), inetSocketAddress.getPort()));
} catch (final IOException ex) {
log.error("Prometheus metrics HTTP server start fail", ex);
}
}

private void register(final String jmxConfig) {
if (!registered.compareAndSet(false, true)) {
return;
}
// 注册JDK版本的metrics指标
new BuildInfoCollector().register();
// 注册JVM参数的metrics指标
DefaultExports.initialize();
try {
if (StringUtils.isNotEmpty(jmxConfig)) {
// 注册jmx metrics指标
new JmxCollector(jmxConfig).register();
}
} catch (MalformedObjectNameException e) {
log.error("init jmx collector error", e);
}
}
}

参考

  • 《深入理解分布式事务:原理与实战》