QDGEla0Lf 发表于 2025-2-28 18:48:21

数据并发安全校验处理工具类

一、项目现存问题描述

  当前系统项目中,存在一些并发安全风险问题(虽然并发量较小)。特别是在处理审批状态修改和涉及金额数量的操作,由于缺乏有效的并发控制,可能会导致业务逻辑重复执行和数据不一致。例如 并发场景下,多个线程同时尝试更新同一笔交易状态或金额,这不仅会导致数据不一致,还可能引发更严重的相关业务逻辑错误。
二、一般处理方案概述

乐观锁:
通过在表中添加一个版本号字段来实现,当更新记录时,检查版本号是否与读取时相同,否则表示数据已被其他事务修改,需要重试。PS:需要现行表增加字段并修改代码支持,改动稍大
悲观锁:
使用数据库提供的锁机制,在查询时即锁定记录。PS:应避免表级锁,查询条件应使用到索引字段。
分布式锁:
对于跨服调用的场景,可以采用redis等缓存技术实现分布式锁,确保在同一时刻只有一个服务实例能够对共享资源进行操作。PS:我们的项目开发规则不支持服务层使用redis组件,固开发了这个工具类
事务管理:
合理配置事务隔离级别,确保事务间的可见性服务预期,避免脏读、不可重复读等问题。PS:不便于后期维护,容易造成事务的未知风险
三、基于现行项目的工具类设计方案

结合项目实际情况,设计了一个专用于解决此类并发问题的工具类。该工具类采用了悲观锁方案,使用便捷,以下是不分测试样例与工具类源码。
1、使用测试样例:

    //修改金额    @Transactional(rollbackFor = Exception.class)    public void addUserAccountAmount() {      ConcurrentDataUtils.updateAmount(UserInfo::getAccountAmount, 600, this,Pair.of(UserAccount::getId, 10));    }  //判断是否与预期值一致 eg:判断审批状态 是否为待审批,否则应拦截    @Transactional(rollbackFor = Exception.class)    public void isEqual() {      ConcurrentDataUtils.isEqual(Approve::getStatus, 0, this,Pair.of(Approve::getForeignId, 1122334));    }    //判断数据是否已存在 eg:同步、保存等场景    @Transactional(rollbackFor = Exception.class)    public void isExist() {      ConcurrentDataUtils.isExist(userInfoService,Pair.of(UserInfo::getIdCard, 1122334),Pair.of(UserInfo::getIsDelete,0));    }
2、工具类源码

package com.example.dlock_demo.utils;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;import com.baomidou.mybatisplus.core.toolkit.support.SFunction;import com.baomidou.mybatisplus.extension.service.IService;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.tuple.Pair;import java.io.Serializable;import java.lang.invoke.SerializedLambda;import java.lang.reflect.Field;import java.lang.reflect.Method;import java.math.BigDecimal;import java.util.Map;import java.util.Objects;import java.util.Optional;import java.util.concurrent.ConcurrentHashMap;/** * 并发校验、处理数据工具类 * * @author: shf * @date: 2025年02月14日 11:13 */@Slf4jpublic class ConcurrentDataUtils {    private static final String DEFAULT_LAST_JOIN_SQL = "ORDER BY id DESC Limit 1 FOR UPDATE";    private static Map<Class<?>, SerializedLambda> CLASS_LAMDBA_CACHE = new ConcurrentHashMap<>();    /**   * 校验数据是否已存在   * <p>   * 根据查询条件默认查询的是满足条件的最后一条数据做判断或更新,查询条件入参优先传ID(行锁)、业务编号等表中唯一标识字段;   *   * @param <T>            实体类型   * @param <C>            查询条件列值的类型   * @param service      数据库操作Service   * @param conditionPairs 数据库查询条件(key:条件查询列字段 eg:删除状态;value:对应值)   */    @SafeVarargs    public static <T, C> boolean isExist(IService<T> service, Pair<SFunction<T, C>, C>... conditionPairs) {      if (service == null || conditionPairs == null || conditionPairs.length == 0) {            throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-isExist查询条件为空");      }      return lockAndGet(service, conditionPairs) != null;    }    /**   * 校验入参与数据库字段值是否一致   * <p>   * 根据查询条件默认查询的是满足条件的最后一条数据做判断或更新,查询条件入参优先传ID(行锁)、业务编号等表中唯一标识字段;   * 期望值入参类型需要与实体字段数据类型一致;   *   * @param <T>            实体类型   * @param <R>            待验证值的类型   * @param <C>            查询条件列值的类型   * @param targetColumn   实体类查询字段   * @param expectVal      期望值 需要与实体字段数据类型一致   * @param service      数据库操作Service   * @param conditionPairs 数据库查询条件(key:条件查询列字段 eg:删除状态;value:对应值)   */    @SafeVarargs    public static <T, R, C> boolean isEqual(SFunction<T, R> targetColumn, R expectVal, IService<T> service, Pair<SFunction<T, C>, C>... conditionPairs) {      if (targetColumn == null || service == null || conditionPairs == null || conditionPairs.length == 0) {            throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-isEqual查询条件为空");      }      T t = lockAndGet(service, conditionPairs);      if (t == null) {            log.warn("并发校验处理数据ConcurrentDataUtils-expectEqual查询为空");            return false;      }      R columnVal = targetColumn.apply(t);      return Objects.equals(expectVal, columnVal);    }    /**   * 处理增加或减少金额、数量   * <p>   * 根据查询条件默认查询的是满足条件的最后一条数据做判断或更新,查询条件入参优先传ID(行锁)、业务编号等表中唯一标识字段;   * 新增值入参类型需要与实体字段数据类型一致;   * 支持BigDecimal、long、int   *      * @param <T>            实体类型   * @param <R>            待验证值的类型   * @param <C>            查询条件列值的类型   * @param targetColumn   实体类查询字段   * @param thisVal      本次新增值 需要与实体字段数据类型一致   * @param service      数据库操作Service   * @param conditionPairs 数据库查询条件(key:条件查询列字段 eg:删除状态;value:对应值)   */    @SafeVarargs    public static <T, R, C> boolean updateAmount(SFunction<T, R> targetColumn, R thisVal, IService<T> service, Pair<SFunction<T, C>, C>... conditionPairs) {      if (targetColumn == null || thisVal == null || service == null || conditionPairs == null || conditionPairs.length == 0) {            throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-updateAmount查询条件为空");      }      try {            T t = lockAndGet(service, conditionPairs);            if (t == null) {                log.warn("并发校验处理数据ConcurrentDataUtils-update未查询到有效数据");                return false;            }            R columnVal = targetColumn.apply(t);            R compute = compute(columnVal, thisVal);            //获取字段            String fieldName = getFieldName(targetColumn);            Field field = t.getClass().getDeclaredField(fieldName);            field.setAccessible(true);            field.set(t, compute);            return service.updateById(t);      } catch (Throwable e) {            log.error("并发校验处理数据ConcurrentDataUtils-update异常:", e);            throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-update异常");      }    }    /**   * 根据查询条件默认查询的是满足条件的最后一条数据做判断或更新,查询条件入参优先传ID(行锁)、业务编号等表中唯一标识字段;   *   * @param <T>            实体类型   * @param <R>            查询字段值的类型   * @param <C>            查询条件列值的类型   * @param service      数据库操作Service   * @param conditionPairs 数据库查询条件(key:条件查询列字段 eg:删除状态;value:对应值)   * @return 实体数据   */    @SafeVarargs    private static <T, R, C> T lockAndGet(IService<T> service, Pair<SFunction<T, C>, C>... conditionPairs) {      if (service == null || conditionPairs == null) {            return null;      }      //加锁数据查询      LambdaQueryWrapper<T> wrapper = new LambdaQueryWrapper<>();      //动态拼接参数对      for (Pair<SFunction<T, C>, C> pair : conditionPairs) {            if (Objects.nonNull(pair.getValue())) {                wrapper.eq(pair.getKey(), pair.getValue());            }      }      wrapper.last(DEFAULT_LAST_JOIN_SQL);      return service.getOne(wrapper);    }    /**   * 计算增减结果   *   * @param columnVal 字段原值   * @param thisVal   本次变动值   * @param <R>       计算结果类型   * @return 增减计算结果   */    private static <R> R compute(R columnVal, R thisVal) {      if (columnVal instanceof BigDecimal) {            BigDecimal original = Optional.of((BigDecimal) columnVal).orElse(BigDecimal.ZERO);            BigDecimal addVal = (BigDecimal) thisVal;            return (R) original.add(addVal);      } else if (columnVal instanceof Integer) {            Integer original = Optional.of((Integer) columnVal).orElse(0);            Integer addVal = (Integer) thisVal;            Integer i = original + addVal;            return (R) i;      } else if (columnVal instanceof Long) {            Long original = Optional.of((Long) columnVal).orElse(0L);            Long addVal = (Long) thisVal;            Long l = original + addVal;            return (R) l;      } else {            throw new RuntimeException("并发校验处理数据ConcurrentDataUtils-compute不支持的数据类型");      }    }    /***   * 转换方法引用为属性名   * @param fn   * @return   */    public static <T, R> String getFieldName(SFunction<T, R> fn) {      SerializedLambda lambda = getSerializedLambda(fn);      String methodName = lambda.getImplMethodName();      String prefix = null;      if (methodName.startsWith("get")) {            prefix = "get";      }      // 截取get之后的字符串并转换首字母为小写      return toLowerCaseFirstOne(methodName.replace(prefix, ""));    }    /**   * 首字母转小写   *   * @param s   */    private static String toLowerCaseFirstOne(String s) {      if (Character.isLowerCase(s.charAt(0))) {            return s;      } else {            return (new StringBuilder()).append(Character.toLowerCase(s.charAt(0))).append(s.substring(1)).toString();      }    }    private static SerializedLambda getSerializedLambda(Serializable fn) {      SerializedLambda lambda = CLASS_LAMDBA_CACHE.get(fn.getClass());      if (lambda == null) {            try {                Method method = fn.getClass().getDeclaredMethod("writeReplace");                method.setAccessible(Boolean.TRUE);                lambda = (SerializedLambda) method.invoke(fn);                CLASS_LAMDBA_CACHE.put(fn.getClass(), lambda);            } catch (Exception e) {                e.printStackTrace();            }      }      return lambda;    }}
ConcurrentDataUtils工具类
3、使用注意事项


[*]使用该工具类需要放在事务中,如果没有加事务, 则只是做了一层基本的查询判断,不能彻底解决并发问题;
[*]传参查询条件,优先传ID主键或业务唯一标识编号(建议设计表时将业务编号字段设置为普通索引,由于有逻辑删除字段固不能设置为唯一索引),走行级锁;
[*]实际使用时,查询条件需要传入删除状态字段(如果表设计规则该字段一致,则可优化省略,在工具类源码中加入非删除查询即可);
[*]仅支持单表获取单条数据的校验或更新操作且查询条件为等量查询(wrapper.eq(...)),查询条件必传否则抛出异常;
[*]避免大事务问题,如果除了使用该工具类,还有大量其他耗时业务处理,可将该工具类使用部分以及涉及到该表的数据处理独立成一个方法, 单独放在一个小事务中(需兼顾数据一致性,尽量将所有数据表操作放在一起)。
页: [1]
查看完整版本: 数据并发安全校验处理工具类