事务中无法切换数据源?DataSourceSwitchInvoker:轻松实现多数据源切换执行工具类
背景:在有标注为@Transactional的类或公共方法中(传播特性,如:NOT_SUPPORTED、SUPPORTS、REQUIRED【默认值】、REQUIRES_NEW)执行数据源切换可能不成功(比如:主从数据源切换,多数据源切换等,均会发现切换不成功,或“偶尔又切换成功”),导致本应该需要查主库却查了从库,本应该查B库却仍查了A库导致表不存在等各种查询问题。
原因是什么呢?
本质原因是:因为只要添加了@Transactional (传播特性,如:NOT_SUPPORTED、SUPPORTS、REQUIRED【默认值】、REQUIRES_NEW),在事务同步上下文类型为:SYNCHRONIZATION_ALWAYS时 ,那么会在事务切面中进行初始化事务同步上下文状态【prepareTransactionStatus】(具体可分析代码位置:org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction),此时org.springframework.transaction.support.TransactionSynchronizationManager#isSynchronizationActive 是true,若需要事务时(EQUIRED【默认值】、REQUIRES_NEW)则还会org.springframework.transaction.support.AbstractPlatformTransactionManager#doBegin获取connection并开启事务且构建ConnectionHolder注册保存于事务同步上下文中,当mybatis 的SqlSessionTemplate.SqlSessionInterceptor.invoke执行时,第一次会将获取的SqlSession通过SqlSessionUtils.registerSessionHolder注册保存于事务同步上下文中,后续只要是同一个SqlSession,那么间接的就是持有同一个SpringManagedTransaction,SpringManagedTransaction是优先从ConnectionHolder获取已有connection对象,若不存在才会创建新的connection对象,并构建ConnectionHolder注册保存于事务同步上下文中,后续只要是在同一个事务同步上下文中,那么都是复用相同的SqlSession、SpringManagedTransaction、ConnectionHolder,所以单纯的改DataSource(ThreadLocal的线程变量)没有用,因为此时ConnectionHolder中保存的是Connection,而不是DataSource
Spring声明式事务源代码分析流程图
为何偶尔切换数据源成功?
当为事务传播特性为NOT_SUPPORTED、SUPPORTS时,由于此时事务管理器并不会提前打开Conneciton并开启事务(即:也不会保存到ConnectionHolder)【从上图中就可以看出】,而是在执行一条SQL语句时,触发了MyBatis的第一次获取SqlSession,间接的执行了DataSourceUtils.doGetConnection(会保存到ConnectionHolder中),如果在方法中的执行第一条SQL语句前进行数据源切换,那么就可以生效,若在执行第一条SQL语句后再尝试切换,那么由于SqlSession已不是最新的(ConnectionHolder中已有Connection),则只会复用。
解决方案:
新增数据源切换执行器工具类:DataSourceSwitchInvoker,作用:在执行前会检查要切换的数据源与当前已持有的数据源(ConnectionHolder.Connection)是否一致,一致则直接执行回调方法(即:不存在切换数据源),不一致则挂起当前事务(挂事务与资源后,会清空事务同步上下文,就像从来没有执行过事务方法一样,默认状态),然后执行回调方法,最后恢复被挂起的事务与资源,并恢复回执行前的数据源设置。即:相当于在事务执行过程中,撕开一个口子(无任何状态),执行完成后,再恢复回事务的原状态,不影响后续的执行。
DataSourceSwitchInvoker.invokeOn 代码逻辑流程图:
(注:图片部份位置有屏蔽删减是因为我实现了多个版本,本次是简化实用版,无需复杂的设置,直接方法入参传入即可)
DataSourceSwitchInvoker 实现CODE:
/** * @author: zuowenjun * @description:数据源切换后执行器,解决在多数据源项目中,无法在事务方法中进行数据源切换问题 */@Componentpublic class DataSourceSwitchInvoker { private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceSwitchInvoker.class); private static final Map<String, String> DATA_SOURCE_NAME_WITH_URL_MAP = new HashMap<>(); private static final String SET_BEFORE = "BEFORE"; private static final String SET_AFTER = "AFTER"; @Value("${dataSourceSwitchInvoker.settings.datasourceJdbcUrlPattern:}") private String datasourceJdbcUrlPattern; /** * 初始化必要条件:数据源配置集合(数据源名称与jdbcUrl对应关系) */ @PostConstruct public void initializeRequirement() { if (StringUtils.isBlank(datasourceJdbcUrlPattern)) { LOGGER.warn("datasourceJdbcUrlPattern is null"); return; } DATA_SOURCE_NAME_WITH_URL_MAP.clear(); Map<String, String> configMap = getPropertiesByPattern(datasourceJdbcUrlPattern, value -> ObjectUtils.defaultIfNull(value, "").toString().trim(), (k, v) -> StringUtils.isNotEmpty(v)); if (MapUtils.isEmpty(configMap)) { LOGGER.error("DataSourceSwitchInvoker.initializeRequirement configMap is empty ,datasourceJdbcUrlPattern: {}", datasourceJdbcUrlPattern); return; } DATA_SOURCE_NAME_WITH_URL_MAP.putAll(configMap); LOGGER.info("DataSourceSwitchInvoker.initializeRequirement ok"); } /** * 在指定的数据源下执行回调方法 * * @param getCurrentDsNameFunc * @param setCurrentDsNameFunc * @param invokeCallback * @return */ public static <T> T invokeOn(String newDataSourceName, Supplier<String> getCurrentDsNameFunc, Consumer<String> setCurrentDsNameFunc, BiFunction<String, String, Boolean> checkSameDsNameFunc, Supplier<T> invokeCallback) { Assert.notNull(getCurrentDsNameFunc, "执行前获取数据源配置回调方法不能为空"); Assert.notNull(setCurrentDsNameFunc, "执行前要设置的数据源配置回调方法不能为空"); Assert.notNull(invokeCallback, "具体执行回调方法不能为空"); String invokeId = "DSI" + System.currentTimeMillis(); String oldDataSourceName = getCurrentDsNameFunc.get(); setCurrentDsNameFunc.accept(newDataSourceName); LOGGER.info("DataSourceSwitchInvoker.invokeOn setCurrentDsName {} --> {} ,invokeId: {}", oldDataSourceName, newDataSourceName, invokeId); Object currentTransaction = null; Object suspendedResourcesHolder = null; PlatformTransactionManagerDelegateInner platformTransactionManagerDelegate = null; try { String currentDbConnectionUrl = TransactionManagerUtils.getCurrentDbConnectionUrl(null); if (StringUtils.isEmpty(currentDbConnectionUrl) || currentDbConnectionUrl.equalsIgnoreCase(DATA_SOURCE_NAME_WITH_URL_MAP.get(newDataSourceName))) { //若当前没有持有DB连接 或持有的DB连接与当前要设置的DB数据源相同,则表明无需额外处理,只需正常执行即可 return invokeCallback.get(); } else if (StringUtils.isNotEmpty(currentDbConnectionUrl) && checkSameDsNameFunc != null) { String currentUsedDataSourceName = DATA_SOURCE_NAME_WITH_URL_MAP.entrySet().stream().filter(kv -> currentDbConnectionUrl.equalsIgnoreCase(kv.getValue())).map(Map.Entry::getKey).findFirst().orElse(null); if (Boolean.TRUE.equals(checkSameDsNameFunc.apply(currentUsedDataSourceName, newDataSourceName))) { //若当前事务连接对应的已实际使用的数据源与要设置的数据源一致,则表明无需额外处理,只需正常执行即可 return invokeCallback.get(); } } //若持有DB连接,则需要先挂起当前事务或资源 AbstractPlatformTransactionManager platformTransactionManager = SpringUtils.getBean(AbstractPlatformTransactionManager.class); Assert.notNull(platformTransactionManager, "not found AbstractPlatformTransactionManager bean"); platformTransactionManagerDelegate = new PlatformTransactionManagerDelegateInner(platformTransactionManager); currentTransaction = TransactionManagerUtils.getCurrentTransaction(platformTransactionManager); if (!platformTransactionManagerDelegate.isExistingTransaction(currentTransaction)) { currentTransaction = null; } suspendedResourcesHolder = platformTransactionManagerDelegate.suspend(currentTransaction); LOGGER.debug("DataSourceSwitchInvoker.invokeOn suspend result is {} ,invokeId: {}", suspendedResourcesHolder != null, invokeId); return invokeCallback.get(); } finally { String resumeSuspendedResources = null; //前面若有挂起事务或资源,则需在执行完方法后需恢复到当前事务状态 if (currentTransaction != null || suspendedResourcesHolder != null) { platformTransactionManagerDelegate.resume(currentTransaction, suspendedResourcesHolder); resumeSuspendedResources = "resume suspendedResources ok"; } setCurrentDsNameFunc.accept(oldDataSourceName); LOGGER.info("DataSourceSwitchInvoker.invokeOn end {} , recover setCurrentDsName {} --> {} ,invokeId: {}", resumeSuspendedResources, newDataSourceName, oldDataSourceName, invokeId); } } /** * 在指定的数据源下执行回调方法 * * @param setCurrentDsNameFunc * @param invokeCallback * @param <T> * @return */ public static <T> T invokeOn(Consumer<String> setCurrentDsNameFunc, Supplier<T> invokeCallback) { return invokeOn(SET_BEFORE, () -> SET_AFTER, setCurrentDsNameFunc, null, invokeCallback); } private static <T> Map<String, T> getPropertiesByPattern(String configPath, Function<Object, T> convertValueFunc, BiFunction<String, T, Boolean> filterFunc) { Assert.notNull(configPath, "param configPath not be null"); Assert.notNull(convertValueFunc, "param convertValueFunc not be null"); Map<String, T> resultMap = new HashMap<>(); if (!(SpringUtils.getApplicationContext().getEnvironment() instanceof ConfigurableEnvironment)) { return resultMap; } ConfigurableEnvironment environment = (ConfigurableEnvironment) SpringUtils.getApplicationContext().getEnvironment(); AntPathMatcher antPathMatcher = new AntPathMatcher("."); String configKey = "{configKey}"; // 遍历所有的属性源 for (PropertySource<?> propertySource : environment.getPropertySources()) { if (propertySource instanceof EnumerablePropertySource) { EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>) propertySource; // 遍历当前属性源中的所有属性 for (String propertyName : enumerablePropertySource.getPropertyNames()) { if (antPathMatcher.match(configPath, propertyName)) { String key = propertyName; if (configPath.contains(configKey)) { key = antPathMatcher.extractUriTemplateVariables(configPath, propertyName).getOrDefault(configKey.replaceAll("[{}]", ""), "<null>"); } T value = convertValueFunc.apply(enumerablePropertySource.getProperty(propertyName)); if (filterFunc == null || filterFunc.apply(key, value)) { resultMap.put(key, convertValueFunc.apply(value)); } } } } } return resultMap; } /** * 通过内部类在不破坏封装性、访问性的前提下,提供当前类内部的protected方法的访问能力 */ private static class PlatformTransactionManagerDelegateInner extends PlatformTransactionManagerDelegate { public PlatformTransactionManagerDelegateInner(AbstractPlatformTransactionManager transactionManager) { super(transactionManager); } @Override protected Object suspend(Object transaction) throws TransactionException { return super.suspend(transaction); } @Override protected void resume(Object transaction, Object resourcesHolderObj) { super.resume(transaction, resourcesHolderObj); } @Override protected boolean isExistingTransaction(Object transaction) { return super.isExistingTransaction(transaction); } }}依赖CODE(注意包名路径需与AbstractPlatformTransactionManager、DataSourceTransactionManager一致):
//author: zuowenjun//注意包名必需是如下,因为要访问protected方法package org.springframework.jdbc.datasource;public class PlatformTransactionManagerDelegate { private final AbstractPlatformTransactionManager delegate; public PlatformTransactionManagerDelegate(AbstractPlatformTransactionManager transactionManager) { this.delegate = transactionManager; } protected Object suspend(Object transaction) throws TransactionException { return delegate.suspend(transaction); } protected void resume(Object transaction, Object resourcesHolderObj) { AbstractPlatformTransactionManager.SuspendedResourcesHolder resourcesHolder = (AbstractPlatformTransactionManager.SuspendedResourcesHolder) resourcesHolderObj; delegate.resume(transaction, resourcesHolder); } protected boolean isExistingTransaction(Object transaction) { return delegate.isExistingTransaction(transaction); }}//author: zuowenjun//注意包名必需是如下,因为要访问protected方法package org.springframework.transaction.support;public class TransactionManagerUtils { public static String getCurrentDbConnectionUrl(String threadLocalDbNameIfNoSet) { DataSource dataSource = SpringUtils.getBean(DataSource.class); if (dataSource == null) { return threadLocalDbNameIfNoSet; } ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); if (conHolder == null || !conHolder.hasConnection()) { return threadLocalDbNameIfNoSet; } try { return conHolder.getConnection().getMetaData().getURL(); } catch (Throwable e) { LOGGER.warn("TransactionManagerUtils.getCurrentDbConnectionUrl error", e); } return threadLocalDbNameIfNoSet; } public static Object getCurrentTransaction(AbstractPlatformTransactionManager transactionManager) { if (!(transactionManager instanceof DataSourceTransactionManager)) { throw new RuntimeException("only support DataSourceTransactionManager doGetTransaction"); } DataSourceTransactionManager dsTransactionManager = (DataSourceTransactionManager) transactionManager; return dsTransactionManager.doGetTransaction(); } }其中:SpringUtils工具类是一个简单的实现了Spring上下文织入的接口然后赋值给静态字段,最终实现可以直接使用applicationContext.getBean(type)
使用示例CODE:
//假设这里是数据源的设置,tips:多数据源一般都是自定义实现了AbstractRoutingDataSource,然后使用ThreadLocal来保存设置当前要使用的数据源配置名称private ThreadLocal<String> dataSourceHolder = new ThreadLocal<>();@Transactionalpublic doWithTx(){ //第一种方法:【推荐第一种】 //假设之前是read_db 数据源,现在需要切换成master_db DataSourceSwitchInvoker.invokeOn("master_db", () -> dataSourceHolder.get(), (dsName) -> dataSourceHolder.set(dsName), null, () -> { Object demo = null; //模拟 demoMapper.get(123L); return demo; }); //第二种方法:(重载方法,一个设置数据源方法处理执行前、执行后的数据源设置) //假设之前是read_db 数据源,现在需要切换成master_db AtomicReference<String> dsName = new AtomicReference<>(); DataSourceSwitchInvoker.invokeOn(eventName -> { if (SET_BEFORE.equals(eventName)) { //执行前,自行记录之前的数据源 dsName.set(dataSourceHolder.get()); //设置新数据源 dataSourceHolder.set("master_db"); } else if (SET_AFTER.equals(eventName)) { //执行后,还原设置数据源 dataSourceHolder.set(dsName.get()); } }, () -> { Object demo = null; //模拟 demoMapper.get(123L); return demo; });} 编码建议:
切换虽好用,但建议不要在切换的方法中进行写数据的操作,更适合仅用于临时需要查询其他数据源的数据时使用,以免破坏spring事务的完整性,因为invokeOn方法本身就是先挂起一个事务,然后开新连接执行新的操作DB的方法,最后还原恢复事务,若在其中又进行了其他的操作,可能存在未知风险,虽然理论做什么都可以但非常不建议。
经多种测试,无论是普通方法 OR 在事务中的方法,均能正常执行,简直就是YYDS!原创不易,如有帮助关注+点个赞吧v
页:
[1]