【源码】Sharding-JDBC源码分析之ShardingSphereConnection的创建原理
Sharding-JDBC系列
1、Sharding-JDBC分库分表的基本使用
2、Sharding-JDBC分库分表之SpringBoot分片策略
3、Sharding-JDBC分库分表之SpringBoot主从配置
4、SpringBoot集成Sharding-JDBC-5.3.0分库分表
5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表
6、【源码】Sharding-JDBC源码分析之JDBC
7、【源码】Sharding-JDBC源码分析之SPI机制
8、【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理
9、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)
10、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(二)
11、【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理
12、【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理
13、【源码】Sharding-JDBC源码分析之ContextManager创建中mode分片配置信息的持久化存储的原理
14、【源码】Sharding-JDBC源码分析之ContextManager创建中ShardingSphereDatabase的创建原理
15、【源码】Sharding-JDBC源码分析之分片规则生成器DatabaseRuleBuilder实现规则配置到规则对象的生成原理
16、【源码】Sharding-JDBC源码分析之配置数据库定义的表的元数据解析原理
17、【源码】Sharding-JDBC源码分析之ShardingSphereConnection的创建原理
前言
在【源码】Sharding-JDBC源码分析之JDBC-CSDN博客中介绍,在ShardingSphere框架中,提供 了JDBC的接口。前面用了很多篇介绍了分片配置文件的配置、解析,及ShardingSphereDataSource的创建。ShardingSphereDataSource实现了javax.sql.DataSource接口,该接口提供了getConnection()接口,用于获取一个数据库Connection连接。本篇分享一下ShardingSphereConnection的创建。
ShardingSphereDataSource回顾
在ShardingSphereDriver中提供了connect()方法,用于获取一个Connection连接。该方法从缓存的DataSource中,执行getConnection()返回一个Connection连接。此处的DataSource即为ShardingSphereDataSource。
ShardingSphereDataSource继承AbstractDataSourceAdapter,AbstractDataSourceAdapter为抽象类,实现了javax.sql.DataSource接口。ShardingSphereDataSource中,提供了getConnection()方法,用于获取一个数据库连接,相关的源码如下:
package org.apache.shardingsphere.driver.jdbc.core.datasource;public final class ShardingSphereDataSource extends AbstractDataSourceAdapter implements AutoCloseable {/*** 默认返回一个ShardingSphereConnection对象* @return*/@Overridepublic Connection getConnection() {return DriverStateContext.getConnection(databaseName, contextManager, jdbcContext);}@Overridepublic Connection getConnection(final String username, final String password) {return getConnection();}}
在getConnection()方法中,通过DriverStateContext的getConnection()获取一个Connection连接。当系统正常运行时,此处获取的Connection为ShardingSphereConnection对象。
详见:【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理-CSDN博客
ShardingSphereConnection
ShardingSphereConnection的源码如下:
package org.apache.shardingsphere.driver.jdbc.core.connection;/*** ShardingSphere的连接*/
public final class ShardingSphereConnection extends AbstractConnectionAdapter {// 默认为logic_db@Getterprivate final String databaseName;// 上下文管理,包括元数据上下文、实例上下文、执行引擎@Getterprivate final ContextManager contextManager;// JDBC上下文。从DataSource中获取一个Connection,获取数据库信息封装成CachedDatabaseMetaData。如url、username、最大连接数、是否支持分组等@Getterprivate final JDBCContext jdbcContext;// 连接管理器。保存数据源、连接事务、连接缓存等@Getterprivate final ConnectionManager connectionManager;private boolean autoCommit = true;// 事务隔离级别private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;private boolean readOnly;private volatile boolean closed;// 连接上下文@Getterprivate final ConnectionContext connectionContext;public ShardingSphereConnection(final String databaseName, final ContextManager contextManager, final JDBCContext jdbcContext) {this.databaseName = databaseName;this.contextManager = contextManager;this.jdbcContext = jdbcContext;// 创建连接管理connectionManager = new ConnectionManager(databaseName, contextManager);// 创建连接上下文connectionContext = new ConnectionContext(connectionManager::getDataSourceNamesOfCachedConnections);}/*** 是否持有事务* @return*/public boolean isHoldTransaction() {return connectionManager.getConnectionTransaction().isHoldTransaction(autoCommit);}/*** 获取数据库的元数据*/@Overridepublic DatabaseMetaData getMetaData() {return new ShardingSphereDatabaseMetaData(this);}/*** 创建ShardingSpherePreparedStatement对象*/@Overridepublic PreparedStatement prepareStatement(final String sql) throws SQLException {return new ShardingSpherePreparedStatement(this, sql);}@Overridepublic Statement createStatement() {return new ShardingSphereStatement(this);}/*** 设置是否自动提交*/@Overridepublic void setAutoCommit(final boolean autoCommit) throws SQLException {this.autoCommit = autoCommit;if (connectionManager.getConnectionTransaction().isLocalTransaction()) {processLocalTransaction();} else {processDistributeTransaction();}}/*** 本地事务处理*/private void processLocalTransaction() throws SQLException {connectionManager.setAutoCommit(autoCommit);if (!autoCommit) {connectionContext.getTransactionConnectionContext().setInTransaction(true);}}/*** 分布式事务处理*/private void processDistributeTransaction() throws SQLException {switch (connectionManager.getConnectionTransaction().getDistributedTransactionOperationType(autoCommit)) {case BEGIN:// 关闭、开启一个新事务connectionManager.close();connectionManager.getConnectionTransaction().begin();// 设置当前处于事务状态getConnectionContext().getTransactionConnectionContext().setInTransaction(true);break;case COMMIT:// 提交事务connectionManager.getConnectionTransaction().commit();break;default:break;}}/*** 事务提交*/@Overridepublic void commit() throws SQLException {try {connectionManager.commit();} finally {connectionManager.getConnectionTransaction().setRollbackOnly(false);connectionContext.clearTransactionConnectionContext();connectionContext.clearTrafficInstance();connectionContext.clearCursorConnectionContext();}}// 省略其他
}
3.1 构造方法
ShardingSphereConnection的构造方法执行如下:
1)记录databaseName、contextManager和jdbcContext;
2)创建ConnectionManager对象;
ConnectionManager连接管理器,主要功能如下:
2.1)从ContextManager中获取配置的数据源对象并保存;
2.2)创建连接事务对象,保存事务状态及管理事务管理器;
2.3)维护当前ShardingSphereConnection对应的真实数据库的Connection连接。包括创建、维护事务保存点、开启事务、提交事务、回滚事务、设置事务隔离级别等;
3)创建ConnectionContext对象;
ConnectionContext连接上下文,保存游标连接上下文、事务连接上下文、当前真实Connection连接的数据源名称等。
3.2 prepareStatement()方法
所有的同名prepareStatement()方法,都是创建一个ShardingSpherePreparedStatement实例并返回。
3.3 createStatement()方法
所有同名的createStatement()方法,都是创建一个ShardingSphereStatement实例,并返回。
3.4 其他方法
ShardingSphereConnection的其他大部分方法都是用于事务相关操作的方法。方法中,通过调用ConnectionManager、ConnectionContext,执行事务等相关操作。
ConnectionManager
ConnectionManager的源码如下:
package org.apache.shardingsphere.driver.jdbc.core.connection;/*** 连接管理器。保存数据源、连接事务、连接缓存等,提供开启、提交、回滚事务。* 一个数据库请求对应一个ShardingSphereConnection,* 一个ShardingSphereConnection对应一个ConnectionManager,* 一个ShardingSphereConnection可能对应多个真实的数据库操作,需要对应多个数据源,多个Connection,* 所以对ShardingSphereConnection的一个setAutoCommit()操作,需要对多个真实的Connection执行setAutoCommit()操作,* 其他操作也一样*/
public final class ConnectionManager implements ExecutorJDBCConnectionManager, AutoCloseable {// 数据源。如order_ds:HikariDataSourceprivate final Map<String, DataSource> dataSourceMap = new LinkedHashMap<>();// 物理数据源,默认(非集群部署、或没有配置流量控制时)同dataSourceMap。private final Map<String, DataSource> physicalDataSourceMap = new LinkedHashMap<>();// 连接事务@Getterprivate final ConnectionTransaction connectionTransaction;// 连接缓存,在getConnection()方法中,先从缓存获取,没有再创建。key为数据源名称,value为对应数据源的连接对象private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();// 方法回调记录器,用于批量执行方法private final MethodInvocationRecorder<Connection> methodInvocationRecorder = new MethodInvocationRecorder<>();// 强制执行模板。此处强制执行Connection对象的方法private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();private final Random random = new SecureRandom();public ConnectionManager(final String databaseName, final ContextManager contextManager) {// 从metaDataContexts中获取对应数据库的数据源Map集合dataSourceMap.putAll(contextManager.getDataSourceMap(databaseName));// 获取流量控制的数据源。针对集群部署。在最新的5.5版本中已删除流量控制配置dataSourceMap.putAll(getTrafficDataSourceMap(databaseName, contextManager));// 物理数据源physicalDataSourceMap.putAll(contextManager.getDataSourceMap(databaseName));// 创建连接事务connectionTransaction = createConnectionTransaction(databaseName, contextManager);}/*** 获取流量控制的数据源*/private Map<String, DataSource> getTrafficDataSourceMap(final String databaseName, final ContextManager contextManager) {// 获取流量控制规则TrafficRule trafficRule = contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);// 获取元数据持久化ServiceMetaDataPersistService persistService = contextManager.getMetaDataContexts().getPersistService();if (trafficRule.getStrategyRules().isEmpty()) {return Collections.emptyMap();}String actualDatabaseName = contextManager.getMetaDataContexts().getMetaData().getActualDatabaseName(databaseName);// 从持久化Service中获取对应数据库的数据源信息Map<String, DataSourceProperties> dataSourcePropsMap = persistService.getDataSourceService().load(actualDatabaseName);Preconditions.checkState(!dataSourcePropsMap.isEmpty(), "Can not get data source properties from meta data.");// 获取第一个数据源配置信息DataSourceProperties dataSourcePropsSample = dataSourcePropsMap.values().iterator().next();// 从持久化Service中获取全局的审计规则人员信息Collection<ShardingSphereUser> users = persistService.getGlobalRuleService().loadUsers();// 获取集群实例Collection<InstanceMetaData> instances = contextManager.getInstanceContext().getAllClusterInstances(InstanceType.PROXY, trafficRule.getLabels());// 创建集群中的DataSourcereturn DataSourcePoolCreator.create(createDataSourcePropertiesMap(instances, users, dataSourcePropsSample, actualDatabaseName));}/*** 创建集群代理实例(代理远程数据库)的数据源属性对象*/private Map<String, DataSourceProperties> createDataSourcePropertiesMap(final Collection<InstanceMetaData> instances, final Collection<ShardingSphereUser> users,final DataSourceProperties dataSourcePropsSample, final String schema) {Map<String, DataSourceProperties> result = new LinkedHashMap<>();for (InstanceMetaData each : instances) {result.put(each.getId(), createDataSourceProperties((ProxyInstanceMetaData) each, users, dataSourcePropsSample, schema));}return result;}/*** 数据源的属性。获取代理实例(代理远程数据库)的数据库属性信息*/private DataSourceProperties createDataSourceProperties(final ProxyInstanceMetaData instanceMetaData, final Collection<ShardingSphereUser> users,final DataSourceProperties dataSourcePropsSample, final String schema) {Map<String, Object> props = dataSourcePropsSample.getAllLocalProperties();props.put("jdbcUrl", createJdbcUrl(instanceMetaData, schema, props));// ShardingSphere的使用者。代理实例(代理远程数据库)对应的用户信息ShardingSphereUser user = users.iterator().next();props.put("username", user.getGrantee().getUsername());props.put("password", user.getPassword());return new DataSourceProperties("com.zaxxer.hikari.HikariDataSource", props);}/*** 创建Jdbc的url。拼接代理实例(代理远程数据库)的访问url*/private String createJdbcUrl(final ProxyInstanceMetaData instanceMetaData, final String schema, final Map<String, Object> props) {// 获取配置的urlString jdbcUrl = String.valueOf(props.get("jdbcUrl"));// 获取url的协议头String jdbcUrlPrefix = jdbcUrl.substring(0, jdbcUrl.indexOf("//"));// 获取url的参数String jdbcUrlSuffix = jdbcUrl.contains("?") ? jdbcUrl.substring(jdbcUrl.indexOf("?")) : "";// 拼接代理实例,生成代理实例对应的urlreturn String.format("%s//%s:%s/%s%s", jdbcUrlPrefix, instanceMetaData.getIp(), instanceMetaData.getPort(), schema, jdbcUrlSuffix);}/*** 创建ConnectionTransaction*/private ConnectionTransaction createConnectionTransaction(final String databaseName, final ContextManager contextManager) {// 在本地线程变量中获取事务类型TransactionType type = TransactionTypeHolder.get();// 获取事务规则(按类查找单个规则)TransactionRule transactionRule = contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class);// 创建ConnectionTransactionreturn null == type ? new ConnectionTransaction(databaseName, transactionRule) : new ConnectionTransaction(databaseName, type, transactionRule);}public void setAutoCommit(final boolean autoCommit) throws SQLException {// 记录自动提交的设置methodInvocationRecorder.record("setAutoCommit", target -> target.setAutoCommit(autoCommit));// 所有缓存的连接执行setAutoCommit()forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(autoCommit));}/*** 提交或回滚一次逻辑SQL解析出来的多条物理SQL操作*/public void commit() throws SQLException {// 如果是本地事务,且设置了回滚,则强制回滚缓存的连接的事务if (connectionTransaction.isLocalTransaction() && connectionTransaction.isRollbackOnly()) {forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);} else if (connectionTransaction.isLocalTransaction() && !connectionTransaction.isRollbackOnly()) {// 如果是本地事务,且没有设置回滚,则强制提交缓存的连接的事务forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);} else {// 非本地事务,只是连接事务的提交connectionTransaction.commit();}}/*** 回滚*/public void rollback() throws SQLException {// 回滚所有的缓存的Connection连接if (connectionTransaction.isLocalTransaction()) {forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);} else {// 事务回滚connectionTransaction.rollback();}}/*** 回滚到保存点*/public void rollback(final Savepoint savepoint) throws SQLException {for (Connection each : cachedConnections.values()) {ConnectionSavepointManager.getInstance().rollbackToSavepoint(each, savepoint.getSavepointName());}}/*** 设置保存点* @param savepointName*/public Savepoint setSavepoint(final String savepointName) throws SQLException {ShardingSphereSavepoint result = new ShardingSphereSavepoint(savepointName);// 添加连接保存点for (Connection each : cachedConnections.values()) {ConnectionSavepointManager.getInstance().setSavepoint(each, savepointName);}// 记录操作methodInvocationRecorder.record("setSavepoint", target -> ConnectionSavepointManager.getInstance().setSavepoint(target, savepointName));return result;}public Savepoint setSavepoint() throws SQLException {ShardingSphereSavepoint result = new ShardingSphereSavepoint();for (Connection each : cachedConnections.values()) {ConnectionSavepointManager.getInstance().setSavepoint(each, result.getSavepointName());}methodInvocationRecorder.record("setSavepoint", target -> ConnectionSavepointManager.getInstance().setSavepoint(target, result.getSavepointName()));return result;}/*** 释放保存点*/public void releaseSavepoint(final Savepoint savepoint) throws SQLException {for (Connection each : cachedConnections.values()) {ConnectionSavepointManager.getInstance().releaseSavepoint(each, savepoint.getSavepointName());}}/*** 获取事务隔离级别*/public Optional<Integer> getTransactionIsolation() throws SQLException {// 返回缓存连接中第一个Connection的事务隔离级别return cachedConnections.values().isEmpty() ? Optional.empty() : Optional.of(cachedConnections.values().iterator().next().getTransactionIsolation());}/*** 设置事务隔离级别*/public void setTransactionIsolation(final int level) throws SQLException {methodInvocationRecorder.record("setTransactionIsolation", connection -> connection.setTransactionIsolation(level));forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setTransactionIsolation(level));}public void setReadOnly(final boolean readOnly) throws SQLException {methodInvocationRecorder.record("setReadOnly", connection -> connection.setReadOnly(readOnly));forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));}public boolean isValid(final int timeout) throws SQLException {for (Connection each : cachedConnections.values()) {if (!each.isValid(timeout)) {return false;}}return true;}/*** 随机获取一个数据源*/public String getRandomPhysicalDataSourceName() {Collection<String> cachedPhysicalDataSourceNames = Sets.intersection(physicalDataSourceMap.keySet(), cachedConnections.keySet());Collection<String> datasourceNames = cachedPhysicalDataSourceNames.isEmpty() ? physicalDataSourceMap.keySet() : cachedPhysicalDataSourceNames;return new ArrayList<>(datasourceNames).get(random.nextInt(datasourceNames.size()));}public Connection getRandomConnection() throws SQLException {return getConnections(getRandomPhysicalDataSourceName(), 1, ConnectionMode.MEMORY_STRICTLY).get(0);}/*** 获取连接* @param dataSourceName 数据源名称* @param connectionSize 连接大小*/@Overridepublic List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {// 获取对应数据源的DataSourceDataSource dataSource = dataSourceMap.get(dataSourceName);Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);Collection<Connection> connections;// 从缓存连接池中获取连接synchronized (cachedConnections) {connections = cachedConnections.get(dataSourceName);}List<Connection> result;// 如果当前缓存连接数量大于请求的个数,直接全部从缓存获取if (connections.size() >= connectionSize) {result = new ArrayList<>(connections).subList(0, connectionSize);} else if (!connections.isEmpty()) { // 如果缓存的连接数不足,则新创建result = new ArrayList<>(connectionSize);result.addAll(connections);// 创建其余的新连接List<Connection> newConnections = createConnections(dataSourceName, dataSource, connectionSize - connections.size(), connectionMode);result.addAll(newConnections);// 加入缓存synchronized (cachedConnections) {cachedConnections.putAll(dataSourceName, newConnections);}} else {// 创建新连接result = new ArrayList<>(createConnections(dataSourceName, dataSource, connectionSize, connectionMode));synchronized (cachedConnections) {cachedConnections.putAll(dataSourceName, result);}}return result;}/*** 创建connectionSize个Connection连接*/@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {if (1 == connectionSize) {// 创建连接。如果是分布式事务,则从连接事务中获取Connection;否则从dataSource中获取Connection connection = createConnection(dataSourceName, dataSource);// 回放方法调用,批量执行方法,保证connection状态的一致性methodInvocationRecorder.replay(connection);return Collections.singletonList(connection);}// 如果是严格连接,即必现重新创建if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {return createConnections(dataSourceName, dataSource, connectionSize);}synchronized (dataSource) {return createConnections(dataSourceName, dataSource, connectionSize);}}/*** 循环创建Connection*/private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {List<Connection> result = new ArrayList<>(connectionSize);for (int i = 0; i < connectionSize; i++) {try {Connection connection = createConnection(dataSourceName, dataSource);methodInvocationRecorder.replay(connection);result.add(connection);} catch (final SQLException ex) {for (Connection each : result) {each.close();}throw new OverallConnectionNotEnoughException(connectionSize, result.size()).toSQLException();}}return result;}/*** 如果存在分布式事务,则从事务中获取连接,否则从DataSource中获取Connection*/private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {// 如果是配置的DataSource,从连接事务中获取Connection(通常是代理的Connection,如果Seata的ConnectionProxy)。如果是LOCAL类型事务,此处返回空Optional<Connection> connectionInTransaction = isRawJdbcDataSource(dataSourceName) ? connectionTransaction.getConnection(dataSourceName) : Optional.empty();// 如果存在分布式事务,则从事务中获取连接,否则从DataSource中获取return connectionInTransaction.isPresent() ? connectionInTransaction.get() : dataSource.getConnection();}/*** 如果是配置的DataSource,返回true*/private boolean isRawJdbcDataSource(final String dataSourceName) {return physicalDataSourceMap.containsKey(dataSourceName);}/*** 获取Connection缓存的数据源名称* @return*/@Overridepublic Collection<String> getDataSourceNamesOfCachedConnections() {return cachedConnections.keySet();}@Overridepublic void close() throws SQLException {try {forceExecuteTemplate.execute(cachedConnections.values(), Connection::close);} finally {cachedConnections.clear();}}
}
4.1 构造方法
ConnectionManager的构造方法执行如下:
1)从ContextManager的metaDataContexts中获取对应数据库的数据源Map集合;
2)获取流量控制的数据源。针对集群部署。在最新的5.5版本中已删除流量控制配置;
3)执行createConnectionTransaction(),创建连接事务;
3.1)从本地线程变量中获取事务的类型,类型有LOCAL、AT、BASE;
3.2)从ContextManager中获取事务规则。如果在Yaml中没有配置全局的事务规则,在MetaDataContextsFactory.create() -> GlobalRulesBuilder.getRuleBuilderMap() -> getMissedDefaultRuleBuilderMap(),在该方法中,添加系统默认的全局规则,默认的全局规则包括SQL联合、事务、SQL解析器、审计、SQL转换器、流量,通过实现DefaultGlobalRuleConfigurationBuilder.build()方法创建默认规则,默认的事务为LOCAL类型;
3.3)根据事务类型,创建一个ConnectionTransaction对象,默认为LOCAL类型;在ConnectionTransaction中,如果是LOCAL类型,其transactionManager对象为null。因为对于LOCAL事务,借助Spring的事务即可。
4.2 createConnection()方法
createConnection()方法用于创建一个Connection对象。执行如下:
1)执行isRawJdbcDataSource(),判断传入的dataSourceName是否为配置的DataSource,此处返回true;
2)如果 1)为true,则执行connectionTransaction.getConnection(),获取一个Connection;
如果是分布式事务,如XA、SeataAT,则返回代理的Connection,如seata的ProxyConnection;
如果是LOCAL,由于ConnectionTransaction的transactionManager为null,所以返回的Connection对象为null;
3)如果 2)有返回值,则返回 2)中的Connection,否则通过DataSource的getConnection()方法获取一个Connection;
如果是分布式事务,则返回对应代理Connection;
如果是Local,则通过DataSource的getConnection()返回一个Connection;
4.3 createConnections()方法
createConnections()用于根据ConnectionMode连接类型,创建指定个数的Connection对象。执行如下:
1)根据指定个数,循环调用createConnection()方法,创建Connection对象;
2)执行针对Connection操作的命令,如setAutoCommit、setSavepoint等;
由于在同一个ShardingSphereConnection中的真实Connection连接的状态需要一致,所以创建之后,需要根据之前Connection的状态,重新执行一次所有的状态;
4.4 getConnections()方法
获取指定个数的Connection连接集合。执行如下:
1)从Connection缓存中找出当前dataSourceName的Connection;
2)如果当前缓存的Connection个数大于请求获取的个数,则直接从缓存中获取指定个数的Connection;
3)如果缓存的Connection个数小于请求获取的个数,则调用createConnections(),创建相差的个数的Connection。并将创建的Connection添加到缓存中;
4)返回指定个数的Connection集合;
4.5 其他方法
其他方法主要为对Connection操作的方法,如setAutoCommit、setSavepoint、setTransactionIsolation、commit、rollback等。
1)对于修改Connection属性的方法,需要遍历当前的Connection,执行对应的修改。且需要记录修改动作,当后续执行createConnection()创建新的Connection时,需要执行记录的动作,保证缓存中所有的Connection的状态都是一致的;
2)对于commit等方法,遍历当前的Connection,执行相应的方法;
小结
限于篇幅,本篇就分享到这里。以下做一个小结:
1)在系统正常运行时,通过ShardingSphereDataSource中获取的Connection为ShardingSphereConnection对象;
系统的另外两种状态
a)熔断降级,此时的Connection对象为CircuitBreakerConnection。该对象的所有方法都是空方法,即当前处于熔断降级状态,不允许执行数据库操作;
b)锁,此时调用获取Connection对象时,抛UnsupportedSQLOperationException异常;
2)在ShardingSphereConnection中,维护了ConnectionManager及ConnectionContext对象。其createStatement()和prepareStatement()方法,返回ShardingSphere框架中定义的ShardingSphereStatement和ShardingSpherePreparedStatement对象;
对于Connection的setReadyOnly、commit、setSavePoint等方法,通过调用ConnectionManager相应方法实现。
3)在ConnectionManager中,主要功能如下:
3.1)记录系统的数据源信息;
3.2)维护并缓存真实的Connection连接;
3.3)维护Connection的属性修改、事务的提交、回滚等;
4)关系小结
业务中的一个数据库连接操作,经过分片设置,对应多个真实数据库的操作;
ShardingSphereConnection对应代码业务中的一个数据库连接操作;
ShardingSphereConnection中记录一个ConnectionManager对象;
ConnectionManager维护真实的多个Connection;
关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。