当前位置: 首页 > news >正文

sharding-jdbc metadata load优化(4.1.1版本)

背景

系统启动时,会注意sharding-jdbc提示加载metadata

于是想看看里面做了什么事情

问题追踪

debug后可以观察走到了该类

org.apache.shardingsphere.shardingjdbc.jdbc.core.context.ShardingRuntimeContext#loadSchemaMetaData

先看这个shardingRuntimeContext相关的类关系,对于多数据源的场景,会定义一个加载元数据的方法供子类实现,主要关注loadSchemaMetaData方法

重点关注ShardingRuntimeContext里的逻辑

对于分库的数据库加载,可以看到有两个问题

1.对于分库的表元数据加载,是单线程执行的,即使把max.connections.size.per.query调大,也不会有效率提升

2.某些规则的元数据与分片表的元数据是一致的,存在重复加载情况

问题解决

分表配置中,某些规则的acutal_data_nodes是一样的,根据该字段分组,对于分表规则相同的配置,可以加载一份即可

在本项目中复写对应的类增强下对应的load方法,新增了groupBy加载的配置支持

//根据配置选择原生加载还是groupBy加载
public SchemaMetaData load(final DatabaseType databaseType) throws SQLException {SchemaMetaData result = useGroupMetaLoad?loadShardingSchemaMetaDataGroupByActualDataNodes(databaseType):loadShardingSchemaMetaData(databaseType);result.merge(loadDefaultSchemaMetaData(databaseType));return result;}//原有逻辑private SchemaMetaData loadShardingSchemaMetaData(final DatabaseType databaseType) throws SQLException {LOGGER.info("Loading {} logic tables' meta data.", shardingRule.getTableRules().size());Map<String, TableMetaData> tableMetaDataMap = new HashMap<>(shardingRule.getTableRules().size(), 1);for (TableRule each : shardingRule.getTableRules()) {tableMetaDataMap.put(each.getLogicTable(), load(each.getLogicTable(), databaseType));}return new SchemaMetaData(tableMetaDataMap);}
//新逻辑private SchemaMetaData loadShardingSchemaMetaDataGroupByActualDataNodes(final DatabaseType databaseType) throws SQLException {LOGGER.info("Loading {} logic tables' meta data.", shardingRule.getTableRules().size());
//根据actualDataNodes分组,key为要查询的逻辑表名,value为同个acutalDataNodes的表名Map<String, Set<String>> sameNodeMap = groupByActualDataNodes(shardingRule.getShardingDataSourceNames().getShardingRuleConfig().getTableRuleConfigs());int maxQuery=Math.min(CORES*2,maxConnectionsSizePerQuery);List<List<String>> tableGroups = Lists.partition(new ArrayList<>(sameNodeMap.keySet()), Math.max(sameNodeMap.size() / maxQuery, 1));Map<String, TableMetaData> tableMetaDataMap = new HashMap<>(shardingRule.getTableRules().size(), 1);if (tableGroups.size() == 1 || isCheckingMetaData) {for (String sameLogicTable : tableGroups.get(0)) {TableMetaData load = load(sameLogicTable, databaseType);Set<String> value = sameNodeMap.get(sameLogicTable);for (String s : value) {tableMetaDataMap.put(s, load);}}} else {//async,模仿已有写法ExecutorService executorService = Executors.newFixedThreadPool(Math.min(tableGroups.size(), maxQuery));try {Collection<Future<Map<String, TableMetaData>>> futures = new LinkedList<>();for (List<String> each : tableGroups) {futures.add(executorService.submit(() -> {Map<String, TableMetaData> tableMetaData = new HashMap<>();for (String s : each) {tableMetaData.put(s, load(s, databaseType));}return tableMetaData;}));}for (Future<Map<String, TableMetaData>> each : futures) {try {Map<String, TableMetaData> m = each.get();for (String s : m.keySet()) {Set<String> sameTable = sameNodeMap.get(s);for (String string : sameTable) {tableMetaDataMap.put(string, m.get(s));}}} catch (final InterruptedException | ExecutionException ex) {if (ex.getCause() instanceof SQLException) {throw (SQLException) ex.getCause();}Thread.currentThread().interrupt();}}} finally {executorService.shutdown();}}LOGGER.info("Actual {} logic tables' meta data loaded.", sameNodeMap.size());return new SchemaMetaData(tableMetaDataMap);}private Map<String, Set<String>> groupByActualDataNodes(Collection<TableRuleConfiguration> configurations) {Map<String, List<TableRuleConfiguration>> collect = configurations.stream().collect(Collectors.groupingBy(TableRuleConfiguration::getActualDataNodes));Map<String, Set<String>> containSet = new HashMap<>();for (Entry<String, List<TableRuleConfiguration>> entry : collect.entrySet()) {//同一个datasource分为一组Map<String,Set<String>> sameDatabaseMap=new HashMap<>();entry.getValue().stream().map(TableRuleConfiguration::getLogicTable).forEach(e->{String currentDatabase = getFirstDataSourceNameByLogicTableName(e);Set<String> tableNames = sameDatabaseMap.getOrDefault(currentDatabase, new HashSet<>());tableNames.add(e);sameDatabaseMap.put(currentDatabase,tableNames);});for (Entry<String, Set<String>> sameDatabaseSet : sameDatabaseMap.entrySet()) {Set<String> value = sameDatabaseSet.getValue();containSet.put(value.iterator().next(),value);}}return containSet;}private String getFirstDataSourceNameByLogicTableName(String logicTableName){TableRule tableRule = shardingRule.getTableRule(logicTableName);DataNode dataNode = tableRule.getActualDataNodes().iterator().next();return dataNode.getDataSourceName();}

这样就达到了优化的效果

并添加配置

spring.shardingsphere.props.max.connections.size.per.query=10
spring.shardingsphere.props.group.metadata.load.enabled=true

运行结果如下,减少重复的表元数据加载,并采用异步加载,启动速度快了一些

配置修改后的关注问题

可以关注到对于max.connections.size.per.query,我们从1→10,有什么需要注意的呢?

1.createConnections

我们可以先看一下一次查询的执行过程

1.org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare

这里主要解析sql,做路由&生成需要执行的单元信息,生成执行上下文

2.接下来是重点关注的逻辑

根据执行上下文初始化执行器

我们需要关注的是生成执行单元中获取链接的逻辑

具体在 org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#obtainExecuteGroups

一路debug会走到

我们需要关注的是生成执行单元中获取链接的逻辑

具体在 org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#obtainExecuteGroups

一路debug会走到

在这里可以看到该配置的影响,他会根据实际要执行的sql单元,确认链接模式,关于链接模式的进一步说明,可以参考官网说明

链接模式说明
MEMORY_STRICTLY
内存限制模式,当对单个物理库查询时,链接数足够时会采用此模式,单个库有多个链接
CONNECTION_STRICTLY
链接限制模式,对单个物理库查询时,只有0或1个链接可用

这两种限制模式在创建链接时有差异

链接限制模式直接创建,对于内存限制模式,需要一次性获取执行链接

主要影响是单个逻辑sql有多个分片sql执行场景时所需要的连接数,由于我们使用的druid链接池,获取链接的方法为com.alibaba.druid.pool.DruidDataSource#getConnectionDirect

对于配了超时(druid.maxWait),如果maxActive<max.connections.size.per.query,会报无法获取足够链接的错误

对于没配超时,会出现一直等待的场景com.alibaba.druid.pool.DruidDataSource#takeLast,造成假死

重点保证链接池的maxActive>=max.connections.size.per.query即可

2.executeQuery

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#getQueryResult

内存限制模式,会把链接hold住,相当于游标方式获取结果,对于链接限制模式,会把数据加载到内存,链接可以释放

其他

1.本地启动时也可以配置druid链接池进行异步初始化,加快启动速度

spring.datasource.druid.async-init=true

2.在sharding-jdbc执行过程中,会发现如下逻辑

会抽取第一个任务给主线程执行,其他交由线程池异步处理

减少主线程的空等待时间,我们一些异步多任务的执行也可以参考一下该做法 


http://www.mrgr.cn/news/26442.html

相关文章:

  • 工具方法 - 高我法
  • 在国内版Office 365中通过PowerShell命令查询指定主题的邮件详解
  • Java 每日一刊(第6期):整数运算
  • AI+RPA:开启智能自动化新时代
  • Flask中的蓝图如何进行模块化
  • Git 中的refs
  • CSS基本布局理解——WEB开发系列38
  • LLM - 理解 多模态大语言模型 (MLLM) 的指令微调与相关技术 (四)
  • Java微服务架构最佳实践:如何设计高可用的分布式系统
  • C++——unordered_map
  • SciPy 插值
  • C++ | Leetcode C++题解之第404题左叶子之和
  • 一次开发,多端部署--实例介绍
  • Java应用的数据库连接池连接池故障恢复
  • Java 行为型设计模式一口气讲完!*^____^*
  • django-admin自定义功能按钮样式
  • 大模型→世界模型下的「认知流形」本质·下
  • 【隐私保护】如何找出CLS方案的创新之处?
  • IDEA测试类启动报 “java: 常量字符串过长” 解决办法
  • 【JUC并发编程系列】深入理解Java并发机制:Synchronized机制深度剖析、HotSpot下的并发奥秘(四、synchronized 原理分析)