English 简体中文 繁體中文 한국 사람 日本語 Deutsch русский بالعربية TÜRKÇE português คนไทย french
查看: 1|回复: 0

JUC并发—10.锁优化与锁故障

[复制链接]
查看: 1|回复: 0

JUC并发—10.锁优化与锁故障

[复制链接]
查看: 1|回复: 0

224

主题

0

回帖

682

积分

高级会员

积分
682
SRPE6ob4Q8m

224

主题

0

回帖

682

积分

高级会员

积分
682
2025-2-23 00:40:23 | 显示全部楼层 |阅读模式
大纲
1.标志位修改场景优先使用volatile(服务优雅停机)
2.数值递增场景优先使用Atomic类(心跳计数器)
3.共享变量仅对当前线程可见的场景优先使用ThreadLocal(edits log的处理)
4.读多写少需要加锁的场景优先使用读写锁(服务注册表的并发读写)
5.尽量减少线程对锁占用时间(edits log分段加锁)
6.尽量减少线程对数据加锁的粒度(库存分段扣减)
7.尽量按不同功能进行锁分离(避免循环中加锁)
8.尽量减少高并发下线程对锁的竞争(多级缓存)
9.锁故障之死锁总结
10.无锁化编程
 
1.标志位修改场景优先使用volatile(服务优雅停机)
(1)标志位修改等可见性场景优先使用volatile
(2)通过volatile标志位实现服务优雅停机的案例
 
(1)标志位修改等可见性场景优先使用volatile
分布式系统、中间件系统都需要开启大量的线程进行处理。多线程访问共享变量时,首先需要明确是并发写还是并发读。
 
如果仅仅只有一个线程会来写一个共享变量,比如标志位。另外多个线程会来读取这个标志位的值,那么此时优先使用volatile。
 
所以锁优化的原则是:能不用锁就尽量不用锁。因为可能会导致锁争用和锁冲突,从而导致系统吞吐量、性能大幅度降低。
 
(2)通过volatile标志位实现服务优雅停机的案例
比如下面的shutdown()方法就利用了volatile完成锁优化。服务优雅停机中会使用一个isRunning标志位,该标志位会被心跳线程和批量拉取注册表的线程共享使用。但是只有某个线程执行shutdown()方法时才会修改isRunning标志位。其他情况都是由心跳线程和批量拉取注册表的线程读取isRunning标志位,所以使用volatile。
//服务启动时,负责与register-server进行通信public class RegisterClient {    public static final String SERVICE_NAME = "inventory-service";    public static final String IP = "192.168.31.207";    public static final String HOSTNAME = "inventory01";    public static final int PORT = 9000;    private static final Long HEARTBEAT_INTERVAL = 30 * 1000L;        //服务实例ID    private String serviceInstanceId;    //HTTP通信组件    private HttpSender httpSender;    //心跳线程    private HeartbeatWorker heartbeatWorker;    //服务实例是否在运行    private volatile Boolean isRunning;    //客户端缓存的注册表    private CachedServiceRegistry registry;        public RegisterClient() {        this.serviceInstanceId = UUID.randomUUID().toString().replace("-", "");        this.httpSender = new HttpSender();        this.heartbeatWorker = new HeartbeatWorker();        this.isRunning = true;        this.registry = new CachedServiceRegistry(this, httpSender);       }        //启动RegisterClient组件    public void start() {        try {            //开启一个线程去完成注册            //如果注册完成后,就进入while循环,每隔30秒就发送一个心跳请求            RegisterWorker registerWorker = new RegisterWorker();            registerWorker.start();            registerWorker.join();            //启动心跳线程,定时发送心跳信息            heartbeatWorker.start();            //初始化客户端缓存的服务注册表组件            this.registry.initialize();        } catch (Exception e) {            e.printStackTrace();        }    }        //停止RegisterClient组件    public void shutdown() {        this.isRunning = false;        this.heartbeatWorker.interrupt();//通过interrupt中断心跳线程        this.registry.destroy();//通过interrupt打断拉取注册表线程        this.httpSender.cancel(SERVICE_NAME, serviceInstanceId);//通知服务端下线    }        //服务注册线程    private class RegisterWorker extends Thread {        @Override        public void run() {            RegisterRequest registerRequest = new RegisterRequest();            registerRequest.setServiceName(SERVICE_NAME);            registerRequest.setIp(IP);             registerRequest.setHostname(HOSTNAME);            registerRequest.setPort(PORT);              registerRequest.setServiceInstanceId(serviceInstanceId);            RegisterResponse registerResponse = httpSender.register(registerRequest);            System.out.println("服务注册的结果是:" + registerResponse.getStatus() + "......");           }    }        //心跳线程    private class HeartbeatWorker extends Thread {        @Override        public void run() {            //如果注册成功,就进入while循环            HeartbeatRequest heartbeatRequest = new HeartbeatRequest();            heartbeatRequest.setServiceName(SERVICE_NAME);              heartbeatRequest.setServiceInstanceId(serviceInstanceId);            HeartbeatResponse heartbeatResponse = null;            while (isRunning) {                try {                    heartbeatResponse = httpSender.heartbeat(heartbeatRequest);                    System.out.println("心跳的结果为:" + heartbeatResponse.getStatus() + "......");                    Thread.sleep(HEARTBEAT_INTERVAL);                   } catch (Exception e) {                      e.printStackTrace();                }            }        }    }        //返回RegisterClient是否正在运行,给批量拉取注册表的后台线程调用    public Boolean isRunning() {        return isRunning;    }} 
2.数值递增场景优先使用Atomic类(心跳计数器)
(1)数值递增场景优先使用Atomic原子类
(2)服务心跳计数器使用Atomic原子类的案例
 
(1)数值递增场景优先使用Atomic原子类
多线程并发访问共享变量时,首先判断变量是否只需保证可见性即可,如果是则使用volatile优化。然后判断是否要保证原子性,如果是则判断是简单数值累加还是变更操作。如果只需要简单进行数值累加,那么则建议使用Atomic原子类,否则才使用synchronized或者ReentrantLock这种重量级锁。Atomic原子类基于CAS机制实现了无锁化编程,并发性比synchronized好。
 
(2)服务心跳计数器使用Atomic原子类的案例
服务保护机制中有个心跳测量计数器:
//心跳测量计数器public class HeartbeatCounter {    //单例实例    private static HeartbeatCounter instance = new HeartbeatCounter();    //最近一分钟的心跳次数    private AtomicLong latestMinuteHeartbeatRate = new AtomicLong(0L);     //最近一分钟的时间戳    private long latestMinuteTimestamp = System.currentTimeMillis();        private HeartbeatCounter() {        Daemon daemon = new Daemon();        daemon.setDaemon(true);          daemon.start();    }        //获取单例实例    public static HeartbeatCounter getInstance() {        return instance;    }        //增加一次最近一分钟的心跳次数    //一旦加锁了后,就会导致大量的线程并发的更新心跳次数,从而导致加锁串行化    //进而导致多线程并发性能的降低    public /**synchronized*/ void increment() {        //如果服务实例很多,比如1万个服务实例,那么每秒可能都有很多个请求过来更新心跳        //如果在这里加了synchronized的话,会影响并发的性能        //但是换成了AtomicLong原子类之后,不加锁,无锁化,CAS操作,保证原子性,还可以多线程并发        latestMinuteHeartbeatRate.incrementAndGet();    }        //获取最近一分钟的心跳次数    public /**synchronized*/ long get() {        return latestMinuteHeartbeatRate.get();    }       private class Daemon extends Thread {        @Override        public void run() {            while(true) {                try {                    long currentTime = System.currentTimeMillis();                    if (currentTime - latestMinuteTimestamp > 60 * 1000) {                        while(true) {                            Long expectedValue = latestMinuteHeartbeatRate.get();                            if (latestMinuteHeartbeatRate.compareAndSet(expectedValue, 0L)) {                                break;                            }                        }                        latestMinuteTimestamp = System.currentTimeMillis();                    }                    Thread.sleep(1000);                 } catch (Exception e) {                    e.printStackTrace();                }            }        }    }} 
3.共享变量仅对当前线程可见的场景优先使用ThreadLocal(edits log的处理)
(1)共享变量仅对当前线程可见的场景优先使用ThreadLocal
(2)分布式存储系统处理edits log时使用ThreadLocal
 
(1)共享变量仅对当前线程可见的场景优先使用ThreadLocal
在多线程环境中,当多个线程同时访问某个共享变量时,如果希望对共享变量的操作仅对当前线程可见,那么可使用ThreadLocal。
 
ThreadLocal会为每个线程提供独立的存储空间来存储共享变量的副本,每个线程只会对共享变量的副本进行操作,且该操作对其他线程不可见。
 
volatile、Atomic、ThreadLocal是锁优化的三大利器,应当优先考虑。如果不能使用它们,再考虑synchronized和ReentrantLock等这些重量级锁。
 
(2)分布式存储系统处理edits log时使用ThreadLocal
//负责管理edits log日志的核心组件public class FSEditlog {    ...    //每个线程自己本地的txid副本    private ThreadLocal<Long> localTxid = new ThreadLocal<Long>();       //记录edits log日志    public void logEdit(String content) {        //这里必须直接加锁,有线程执行logSync()方法时这里没有其他线程能进来        synchronized(this) {            //获取全局唯一递增的txid,代表了edits log的序号            txidSeq++;            long txid = txidSeq;            localTxid.set(txid);            //构造一条edits log对象            EditLog log = new EditLog(txid, content);            //将edits log写入内存缓冲中,不是直接刷入磁盘文件            editLogBuffer.write(log);          }        logSync();    }    ...} 
4.读多写少需要加锁的场景优先使用读写锁(服务注册表的并发读写)
(1)读多写少需要加锁的场景优先使用读写锁
(2)服务注册表的并发读写场景使用读写锁的案例
 
(1)读多写少需要加锁的场景优先使用读写锁
如果volatile、Atomic、ThreadLocal都不适用,这时多线程并发访问一块共享数据就需要加锁了。但是此时会优先考虑使用读写锁,而不是使用synchronized重量级锁。
 
比如在读多写少的场景下,可以进行读写锁分离。读锁 -> 大量的线程可以并发的读,
写锁 -> 有线程写数据时其他线程不能同时来写也不能同时来读。
 
(2)服务注册表的并发读写场景使用读写锁的案例
服务注册中心通过如下两种方式处理服务注册表:
一.每隔30秒会拉取增量注册表
二.每隔30秒会处理心跳请求
 
拉取增量注册表属于读操作,发现心跳异常时会更新注册表属于写操作。很明显大部分情况下都不会更新注册表数据,所以这是读多写少的场景。
 
如果读取注册表和更新注册表使用synchronized重量级锁,那么将会导致大量的读取注册表的操作都是串行化的,从而降低服务注册中心的并发性能。
 
所以对服务注册表的读写操作,使用读写锁进行分离。读取服务注册表时使用读锁,更新服务注册表时使用写锁,这样就能保证读操作都能并发执行。
//服务注册表public class ServiceRegistry {    public static final Long RECENTLY_CHANGED_ITEM_CHECK_INTERVAL = 3000L;    public static final Long RECENTLY_CHANGED_ITEM_EXPIRED = 3 * 60 * 1000L;        //注册表是一个单例    private static ServiceRegistry instance = new ServiceRegistry();    //核心的内存数据结构:注册表,Map:key是服务名称,value是这个服务的所有的服务实例    private Map<String, Map<String, ServiceInstance>> registry = new HashMap<String, Map<String, ServiceInstance>>();    //最近变更的服务实例的队列    private LinkedList<RecentlyChangedServiceInstance> recentlyChangedQueue = new LinkedList<RecentlyChangedServiceInstance>();        //服务注册表的锁    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();    private ReadLock readLock = lock.readLock();    private WriteLock writeLock = lock.writeLock();        //构造函数    private ServiceRegistry() {        //启动后台线程监控最近变更的队列        RecentlyChangedQueueMonitor recentlyChangedQueueMonitor = new RecentlyChangedQueueMonitor();        recentlyChangedQueueMonitor.setDaemon(true);         recentlyChangedQueueMonitor.start();    }        //获取服务注册表的单例实例    public static ServiceRegistry getInstance() {        return instance;    }    //加读锁    public void readLock() {        this.readLock.lock();   }    //释放读锁    public void readUnlock() {        this.readLock.unlock();    }    //加写锁    public void writeLock() {        this.writeLock.lock();    }    //释放写锁    public void writeUnlock() {        this.writeLock.unlock();    }        //服务注册    public void register(ServiceInstance serviceInstance) {        try {            //加写锁            this.writeLock();            System.out.println("服务注册......【" + serviceInstance + "】");            //将服务实例放入最近变更的队列中            RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), ServiceInstanceOperation.REGISTER);            recentlyChangedQueue.offer(recentlyChangedItem);            System.out.println("最近变更队列:" + recentlyChangedQueue);            //将服务实例放入注册表中            Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceInstance.getServiceName());            if (serviceInstanceMap == null) {                serviceInstanceMap = new HashMap<String, ServiceInstance>();                registry.put(serviceInstance.getServiceName(), serviceInstanceMap);            }            serviceInstanceMap.put(serviceInstance.getServiceInstanceId(), serviceInstance);            System.out.println("注册表:" + registry);         } finally {            this.writeUnlock();        }    }        //从注册表删除一个服务实例    public void remove(String serviceName, String serviceInstanceId) {        try {            //加写锁            this.writeLock();            System.out.println("服务摘除【" + serviceName + ", " + serviceInstanceId + "】");            //获取服务实例            Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName);            ServiceInstance serviceInstance = serviceInstanceMap.get(serviceInstanceId);            //将服务实例变更信息放入队列中            RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), ServiceInstanceOperation.REMOVE);            recentlyChangedQueue.offer(recentlyChangedItem);            System.out.println("最近变更队列:" + recentlyChangedQueue);            //从服务注册表删除服务实例            serviceInstanceMap.remove(serviceInstanceId);            System.out.println("注册表:" + registry);         } finally {            this.writeUnlock();        }    }        //获取服务实例    public ServiceInstance getServiceInstance(String serviceName, String serviceInstanceId) {        try {            this.readLock();            Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName);            return serviceInstanceMap.get(serviceInstanceId);        } finally {            this.readUnlock();          }    }        //获取整个注册表    public Map<String, Map<String, ServiceInstance>> getRegistry() {        return registry;    }        //获取最近有变化的注册表    public DeltaRegistry getDeltaRegistry() {        Long totalCount = 0L;        for (Map<String, ServiceInstance> serviceInstanceMap : registry.values()) {            totalCount += serviceInstanceMap.size();        }        DeltaRegistry deltaRegistry = new DeltaRegistry(recentlyChangedQueue, totalCount);        return deltaRegistry;    }        //最近变化的服务实例    class RecentlyChangedServiceInstance {        //服务实例        ServiceInstance serviceInstance;        //发生变更的时间戳        Long changedTimestamp;        //变更操作        String serviceInstanceOperation;        public RecentlyChangedServiceInstance(ServiceInstance serviceInstance, Long changedTimestamp, String serviceInstanceOperation) {            this.serviceInstance = serviceInstance;            this.changedTimestamp = changedTimestamp;            this.serviceInstanceOperation = serviceInstanceOperation;        }        @Override        public String toString() {            return "RecentlyChangedServiceInstance [serviceInstance=" + serviceInstance + ", changedTimestamp=" + changedTimestamp + ", serviceInstanceOperation=" + serviceInstanceOperation + "]";        }    }        //服务实例操作    class ServiceInstanceOperation {        public static final String REGISTER = "register";//注册        public static final String REMOVE = "REMOVE";//删除    }        //最近变更队列的监控线程    class RecentlyChangedQueueMonitor extends Thread {        @Override        public void run() {            while(true) {                try {                    try {                        writeLock.lock();                        RecentlyChangedServiceInstance recentlyChangedItem = null;                        Long currentTimestamp = System.currentTimeMillis();                        while ((recentlyChangedItem = recentlyChangedQueue.peek()) != null) {                            //判断如果一个服务实例变更信息已经在队列里存在超过3分钟了,就从队列中移除                            if (currentTimestamp - recentlyChangedItem.changedTimestamp > RECENTLY_CHANGED_ITEM_EXPIRED) {                                recentlyChangedQueue.poll();                            }                        }                    } finally {                        writeLock.unlock();                    }                    Thread.sleep(RECENTLY_CHANGED_ITEM_CHECK_INTERVAL);                } catch (Exception e) {                    e.printStackTrace();                }             }        }    }}//微服务存活状态监控组件public class ServiceAliveMonitor {    //检查服务实例是否存活的间隔    private static final Long CHECK_ALIVE_INTERVAL = 60 * 1000L;    //负责监控微服务存活状态的后台线程    private Daemon daemon;        public ServiceAliveMonitor() {        this.daemon = new Daemon();        daemon.setDaemon(true);          daemon.setName("ServiceAliveMonitor");      }        //启动后台线程    public void start() {        daemon.start();    }        //负责监控微服务存活状态的后台线程    private class Daemon extends Thread {        private ServiceRegistry registry = ServiceRegistry.getInstance();        @Override        public void run() {            Map<String, Map<String, ServiceInstance>> registryMap = null;            while (true) {                try {                    //可以判断一下是否要开启自我保护机制                    SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();                    if (selfProtectionPolicy.isEnable()) {                        Thread.sleep(CHECK_ALIVE_INTERVAL);                        continue;                    }                    //定义要删除的服务实例的集合                    List<ServiceInstance> removingServiceInstances = new ArrayList<ServiceInstance>();                    //开始读服务注册表的数据,这个过程中,其他线程可以读但不可以写                    try {                        //对整个服务注册表,加读锁                        registry.readLock();                        registryMap = registry.getRegistry();                        for (String serviceName : registryMap.keySet()) {                            Map<String, ServiceInstance> serviceInstanceMap = registryMap.get(serviceName);                            for (ServiceInstance serviceInstance : serviceInstanceMap.values()) {                                //说明服务实例距离上一次发送心跳已经超过90秒了,认为这个服务死了,从注册表中摘除这个服务实例                                if (!serviceInstance.isAlive()) {                                    removingServiceInstances.add(serviceInstance);                                }                              }                        }                    } finally {                        registry.readUnlock();                    }                                     //将所有的要删除的服务实例,从服务注册表删除                    for (ServiceInstance serviceInstance : removingServiceInstances) {                        registry.remove(serviceInstance.getServiceName(), serviceInstance.getServiceInstanceId());                        //更新自我保护机制的阈值                        synchronized(SelfProtectionPolicy.class) {                            selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);                            selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));                        }                    }                    Thread.sleep(CHECK_ALIVE_INTERVAL);                } catch (Exception e) {                    e.printStackTrace();                }            }        }    }}//负责接收register-client发送过来的请求的处理器public class RegisterServerController {    private ServiceRegistry registry = ServiceRegistry.getInstance();    //服务注册    public RegisterResponse register(RegisterRequest registerRequest) {        RegisterResponse registerResponse = new RegisterResponse();        try {            //在注册表中加入这个服务实例            ServiceInstance serviceInstance = new ServiceInstance();            serviceInstance.setHostname(registerRequest.getHostname());             serviceInstance.setIp(registerRequest.getIp());             serviceInstance.setPort(registerRequest.getPort());             serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId());             serviceInstance.setServiceName(registerRequest.getServiceName());            registry.register(serviceInstance);            //更新自我保护机制的阈值            synchronized(SelfProtectionPolicy.class) {                SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();                selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() + 2);                  selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));              }            registerResponse.setStatus(RegisterResponse.SUCCESS);         } catch (Exception e) {            e.printStackTrace();             registerResponse.setStatus(RegisterResponse.FAILURE);          }        return registerResponse;    }        //服务下线    public void cancel(String serviceName, String serviceInstanceId) {        //从服务注册中摘除实例        registry.remove(serviceName, serviceInstanceId);        //更新自我保护机制的阈值        synchronized(SelfProtectionPolicy.class) {            SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();            selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);              selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));          }    }        //发送心跳    public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) {         HeartbeatResponse heartbeatResponse = new HeartbeatResponse();        try {            ServiceInstance serviceInstance = registry.getServiceInstance(heartbeatRequest.getServiceName(), heartbeatRequest.getServiceInstanceId());            if (serviceInstance != null) {                serviceInstance.renew();            }            //记录一下每分钟的心跳的次数            HeartbeatCounter heartbeatMeasuredRate = HeartbeatCounter.getInstance();            heartbeatMeasuredRate.increment();            heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS);         } catch (Exception e) {            e.printStackTrace();             heartbeatResponse.setStatus(HeartbeatResponse.FAILURE);         }        return heartbeatResponse;    }        //拉取全量注册表    public Applications fetchFullRegistry() {        try {            registry.readLock();            return new Applications(registry.getRegistry());        } finally {            registry.readUnlock();        }    }        //拉取增量注册表    public DeltaRegistry fetchDeltaRegistry() {        try {            registry.readLock();            return registry.getDeltaRegistry();        } finally {            registry.readUnlock();          }    }} 
5.尽量减少线程对锁占用时间(edits log分段加锁)
(1)尽可能减少线程对锁占用的时间
(2)edits log通过分段加锁减少锁占用时间的案例
 
(1)尽可能减少线程对锁占用的时间
无论是synchronized锁还是读写锁,都要尽量保证加锁的时间比较短,不能在加锁后执行耗时的磁盘文件读写、网络IO读写等操作长时间占有锁。一般会在操作内存中的数据时加锁,而不会在操作数据库中的数据时加锁。否则可能会导致长时间占用锁,导致线程并发的性能和吞吐量大幅度下降。
 
(2)edits log通过分段加锁减少锁占用时间的案例
分布式存储系统中的edits log分段加锁机制就是为了尽量减少锁占用的时间,其中加锁的部分都是简单的变量判断,执行时间都是很快的。
//负责管理edits log日志的核心组件public class FSEditlog {    //当前递增到的txid的序号    private long txidSeq = 0L;    //内存双缓冲区    private DoubleBuffer editLogBuffer = new DoubleBuffer();    //当前是否在将内存缓冲刷入磁盘中    private volatile Boolean isSyncRunning = false;    //当前是否有线程在等待刷新下一批edits log到磁盘里去    private volatile Boolean isWaitSync = false;    //在同步到磁盘中的最大的一个txid    private volatile Long syncMaxTxid = 0L;    //每个线程自己本地的txid副本    private ThreadLocal<Long> localTxid = new ThreadLocal<Long>();       //记录edits log日志    public void logEdit(String content) {        //这里直接加锁,有线程执行logSync()方法时这里没有其他线程能进来        synchronized(this) {            //获取全局唯一递增的txid,代表了edits log的序号            txidSeq++;            long txid = txidSeq;            localTxid.set(txid);            //构造一条edits log对象            EditLog log = new EditLog(txid, content);            //将edits log写入内存缓冲中,不是直接刷入磁盘文件            editLogBuffer.write(log);          }        //尝试允许某一个执行logEdit()方法的线程,一次性将内存缓冲中的数据刷入到磁盘文件中        logSync();    }       //将内存缓冲中的数据刷入磁盘文件中    //在这里尝试允许某一个线程一次性将内存缓冲中的数据刷入到磁盘文件中    //相当于批量将内存缓冲的数据刷入磁盘    private void logSync() {        //再次尝试加锁,只有一个线程能进来,这个过程很快,纳秒级别,这里属于第一段加锁        synchronized(this) {            //如果当前正好有线程在刷内存缓冲到磁盘中去            if (isSyncRunning) {                //假如某个线程正在把txid = 6,7,8,9,10,11,12的edits log从syncBuffer刷入磁盘                //此时syncMaxTxid = 12,代表的是正在刷入磁盘的最大txid                //那么刷盘的线程释放锁进行刷盘后,这时来一个线程对应的txid = 10,此时它可以直接返回                //因为它对应的edits log被刷盘的线程正在刷入或者已经刷入磁盘了,这时txid = 12的线程就不需要等待                long txid = localTxid.get();                if (txid <= syncMaxTxid) {                    return;                }                   //此时如果来的是一个txid = 13的线程,那么就会发现已经有线程在等待刷下一批数据到磁盘,此时会直接返回                if (isWaitSync) {                    return;                }                //此时如果来的是一个txid = 14的线程,并且刷盘还没刷完,                //那么就在这里等待或者成为下一个刷盘的线程,只有一个线程在等                isWaitSync = true;                while (isSyncRunning) {                    try {                        wait(2000);//释放锁并自己等2秒或者等别人唤醒                    } catch (Exception e) {                        e.printStackTrace();                      }                }                isWaitSync = false;            }            //交换两块缓冲区            editLogBuffer.setReadyToSync();            //然后保存当前要同步到磁盘中的最大txid,此时editLogBuffer中的syncBuffer在交换完以后可能有多条数据            //而且里面的edits log的txid一定是从小到大的,此时要同步的txid = 6,7,8,9,10,11,12,所以syncMaxTxid = 12            syncMaxTxid = editLogBuffer.getSyncMaxTxid();            //设置当前正在同步到磁盘的标志位            isSyncRunning = true;        }              //释放锁,开始同步内存缓冲的数据到磁盘文件里去        //这个过程其实是比较慢,基本上肯定是毫秒级了,弄不好就要几十毫秒        editLogBuffer.flush();                //这里属于另外一段加锁        synchronized(this) {            //同步完了磁盘之后,就会将标志位复位,再释放锁            isSyncRunning = false;            //唤醒可能正在等待他同步完磁盘的线程            notifyAll();        }    }        //代表了一条edits log(内部类)    private class EditLog {        long txid;        String content;        public EditLog(long txid, String content) {            this.txid = txid;            this.content = content;        }    }        //内存双缓冲(内部类)    private class DoubleBuffer {        //专门用来承载线程写入edits log        LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();        //专门用来将数据同步到磁盘中去的一块缓冲        LinkedList<EditLog> syncBuffer = new LinkedList<EditLog>();                //将edits log写到内存缓冲里去        public void write(EditLog log) {            currentBuffer.add(log);        }                //交换两块缓冲区,为了同步内存数据到磁盘做准备        public void setReadyToSync() {            LinkedList<EditLog> tmp = currentBuffer;            currentBuffer = syncBuffer;            syncBuffer = tmp;        }              //获取sync buffer缓冲区里的最大的一个txid        public Long getSyncMaxTxid() {            return syncBuffer.getLast().txid;        }              //将syncBuffer缓冲区中的数据刷入磁盘中        public void flush() {            for (EditLog log : syncBuffer) {                System.out.println("将edit log写入磁盘文件中:" + log);                 //正常来说,就是用文件输出流将数据写入磁盘文件中            }            syncBuffer.clear();          }    }} 
6.尽量减少线程对数据加锁的粒度(库存分段扣减)
(1)尽可能减少线程对数据加锁的粒度
(2)每秒上千订单下库存分段扣减的案例
 
(1)尽可能减少线程对数据加锁的粒度
如果有一份共享数据,里面包含了多个子数据。可以对完整的这份数据加锁,线程只要访问这份数据,就会竞争锁。也可以只对数据里的部分子数据进行加锁,从而降低加锁的粒度,减少竞争。比如库存扣减时可以仅仅对部分库存数据加锁,而不是对完整的库存数据加锁。
 
一.edits log的分段加锁机制,尽可能减少一个线程占用锁的时间
二.库存的分段扣减机制,尽可能减少一个线程对数据加锁的粒度
 
(2)每秒上千订单下库存分段扣减的案例
一.不加锁时的库存超卖问题
假设订单系统部署在两台机器上,某一时刻不同用户都要买10个商品,于是这两台机器上的订单系统实例分别收到了不同用户的购买请求。然后每个订单系统实例都去数据库里查询检查,发现当前商品库存是12个。于是每个订单系统实例都发送SQL到数据库里进行下单,即扣减10个库存。其中一个实例将库存从12扣减为2,另外一个实例将库存从2扣减为-8,于是出现了库存超卖的问题。
 
二.用分布式锁解决库存超卖问题
分布式锁就是同一个锁key,同一时间只能有一个客户端拿到锁,这样只有获取到锁的客户端才能执行接下来的库存扣减逻辑。
 
当然对于库存超卖的问题,除了使用分布式锁来解决之外,也可以使用:悲观锁、乐观锁、队列串行化、异步队列分散、Redis原子操作等来解决。
 
三.分布式锁的方案在高并发场景下的问题
一旦某个客户端的某个线程获取了某个商品的分布式锁,那么该客户端的其他线程或其他客户端的所有线程,对这个商品的下单请求就要进行串行化等待。
 
假设获取锁后到释放锁前,查库存 -> 创建订单 -> 扣减库存,只需20毫秒,那么1秒(1000毫秒)只能容纳50个客户端线程对同一个商品的下单请求。
 
所以如果应对低并发、无秒杀场景的电商系统,分布式锁还是可以解决的。因为并发量很低,每秒不到10个请求,不存在高并发秒杀单个商品的情况。但对于高并发秒杀商品的情况,明显分布式锁只支持50个线程是不行的。
 
四.如何对分布式锁进行高并发优化
把库存数据分成很多个段,每个段对应一个单独的锁。当多个线程并发修改库存数据时,就可以并发修改不同段的库存数据,从而避免同一时间只能有一个线程独占修改某商品的库存数据。
 
Java中的LongAdder类,也采用了类似的思路进行分段CAS操作。当某段CAS操作失败时会自动迁移到下一个分段进行CAS操作,从而减少在高并发场景下,CAS操作共享数据出现线程长时间自旋的情况。
 
因此库存问题下的分布式锁优化思路如下:
将一个库存拆分为N个库存段,比如10个库存段,每个库存段有100个库存。每个线程处理下单请求时会从10个库存段中挑选一个来进行加锁并扣减库存。这样就将竞争一个库存锁转变为竞争10个库存锁,从而并发性能提升10倍。
 
如果一个分段库存是0,那么就先释放锁,然后遍历下一个分段直到找出分段库存不是0的再尝试加锁处理。如果一个分段库存大于购买数量,那么就直接加锁扣减。如果一个分段库存小于购买数量,那么就要对多个分段库存加锁合并扣减。
 
该分段加锁方案的不足就是实现比较复杂:
首先要对一个库存数据分段存储,比如本来一个库存字段要分为10个字段。其次在每次处理库存时,需要写随机算法,随机挑选一个分段来扣减。最后如果某个分段中的库存不足,需要自动切换到下一个分段去处理。
public class RedisLockOptimizeDemo {    public static void main(String[] args) throws Exception {        Long goodsSkuId = 1L;//ID        Long purchaseCount = 50L;//购买数量        int stockSegmentSeq = new Random().nextInt(10) + 1;//随机的库存段        InventoryDAO inventoryDAO = new InventoryDAO();        RLock lock = new RLock("stock_" + goodsSkuId + "_" + stockSegmentSeq);        //分段加锁后查分段的库存        lock.lock();        Long stock = inventoryDAO.getStock(goodsSkuId, stockSegmentSeq);        //如果查出来库存是0,那么查下一个分段        if (stock == 0L) {            lock.unlock();            boolean foundOtherStockSegment = false;            for (int i = 1; i <= 10; i++) {                if (i == stockSegmentSeq) {                    continue;                }                lock = new RLock("stock_" + goodsSkuId + "_" + i);                lock.lock();                stock = inventoryDAO.getStock(goodsSkuId, i);                if (stock != 0) {                    stockSegmentSeq = i;                    foundOtherStockSegment = true;                    //该分段库存不为0那么走完后面的逻辑才unlock                    break;                } else {                    //该分段库存为0那么就要unlock了继续遍历下一个分段                    lock.unlock();                }            }            if (!foundOtherStockSegment) {                System.out.println("商品库存不足");                return;            }        }        //如果库存分段正好大于要购买的数量        if (stock >= purchaseCount) {            inventoryDAO.updateStock(goodsSkuId, stockSegmentSeq, stock - purchaseCount);            lock.unlock();            return;        }        //代码执行到这里,就说明,当前这个分段的库存小于要购买的数量,合并分段加锁        Long totalStock = stock;        Map<RLock, Long> otherLocks = new HashMap<RLock, Long>();        for (int i = 1; i <= 10; i++) {            if (i == stockSegmentSeq) {                continue;            }            RLock otherLock = new RLock("stock_" + goodsSkuId + "_" + i);            otherLock.lock();            Long otherStock = inventoryDAO.getStock(goodsSkuId, i);            if (otherStock == 0) {                otherLock.unlock();                continue;            }            totalStock += otherStock;            otherLocks.put(otherLock, otherStock);            if (totalStock >= purchaseCount) {                break;            }        }        //尝试所有的其他分段之后还是无法满足购买数量,需要对那些分段释放锁        if (totalStock < purchaseCount) {            System.out.println("商品库存不足");            for (Map.Entry<RLock, Long> otherLockEntry : otherLocks.entrySet()) {                otherLockEntry.getKey().unlock();            }            return;        }        //先将最初加的那个分段库存扣减掉        Long remainReducingStock = purchaseCount - stock;        inventoryDAO.updateStock(goodsSkuId, stockSegmentSeq, 0L);        lock.unlock();        for (Map.Entry<RLock, Long> otherLockEntry : otherLocks.entrySet()) {            RLock otherLock = otherLockEntry.getKey();            int otherStockSegmentSeq = Integer.valueOf(otherLock.name.substring(otherLock.name.length() - 1));            Long otherStock = otherLockEntry.getValue();            if (otherStock <= remainReducingStock) {                remainReducingStock -= otherStock;                inventoryDAO.updateStock(goodsSkuId, otherStockSegmentSeq, 0L);            } else {                remainReducingStock = 0L;                inventoryDAO.updateStock(goodsSkuId, otherStockSegmentSeq, otherStock - remainReducingStock);            }            otherLock.unlock();        }    }    static class RLock {        String name;        public RLock(String name) { this.name = name; }        public void lock() { System.out.println("加分布式锁:" + name); }        public void unlock() { System.out.println("释放分布式锁:" + name); }              @Override        public int hashCode() {            final int prime = 31;            int result = 1;            result = prime * result + ((name == null) ? 0 : name.hashCode());            return result;        }              @Override        public boolean equals(Object obj) {            if (this == obj) { return true; }            if (obj == null) { return false; }            if (getClass() != obj.getClass()) { return false; }            RLock other = (RLock) obj;            if (name == null) {                if (other.name != null) {                    return false;                }            } else if (!name.equals(other.name)) {                return false;            }            return true;        }    }    static class InventoryDAO {        public Long getStock(Long goodsSkuId, Integer stockSegmentSeq) { return 1000L; }        public void updateStock(Long goodsSkuId, Integer stockSegmentSeq, Long stock) { }    }} 
7.尽量按不同功能进行锁分离(避免循环中加锁)
(1)尽可能按不同功能对锁进行分离
(2)避免在循环中频繁加锁以及释放锁
 
(1)尽可能按不同功能对锁进行分离
如果可以按照使用功能的不同,将一把锁拆分为多把锁。那么在使用不同的功能时,就可以使用不同的锁了,从而减少线程竞争同一把锁时的冲突。
 
比如LinkedBlockingQueue有界阻塞队列,就使用了两把锁。队列头部使用一把take锁,队列尾部使用一把put锁。执行put()方法时先获取put锁,然后再从队列尾部入队元素。执行take()方法时先获取take锁,然后再从队列头部出队元素。这样同一时刻对阻塞队列的入队和出队操作,就不会产生锁冲突。
 
(2)避免在循环中频繁加锁以及释放锁
尽量避免在for循环里频繁地加锁和释放锁。
 
8.尽量减少高并发下线程对锁的竞争(多级缓存)
(1)高并发场景下如何减少线程对锁的竞争
(2)服务注册表通过多级缓存来降低锁竞争频率
 
(1)高并发场景下如何减少线程对锁的竞争
一.使用读写锁
读写锁主要用来解决读请求和读请求不会冲突可以并行执行,但写请求和读请求还是会冲突,写请求和写请求也是会冲突的。
 
二.按不同功能进行锁分离
比如按使用功能不同,将一把锁拆分为多把锁。
 
三.减少锁占用的时间
比如分布式存储系统中的edits log分段锁。
 
四.数据分段加锁
比如分布式锁的分段库存扣减。
 
五.减少线程对锁的竞争频率
比如通过多级缓存来降低锁竞争的频率。
 
(2)服务注册表通过多级缓存来降低锁竞争频率
一.读写锁依然存在并发读写冲突
虽然服务注册中心已经使用了读写锁来处理读写服务注册表,但是当需要对服务注册表进行写操作时,可能会出现大量的读操作被阻塞。现在假设每分钟会出现并发读写冲突的次数是10次。
 
二.多级缓存的运行逻辑
因此可以使用多级缓存机制来优化服务注册表的读写并发冲突问题。服务注册表的多级缓存是:一级缓存(只读缓存),二级缓存(读写缓存)。
 
所有的读请求都先从一级缓存(只读缓存)里读取数据。如果一级缓存(只读缓存)里没找到,则从二级缓存(读写缓存)里查找。如果二级缓存(读写缓存)也没有找到,则再从服务注册表中进行加载。
 
三.多级缓存的过期策略
一级缓存(读写缓存)中的数据的过期策略是:通过一个定时任务,每隔30s和二级缓存的数据保持同步(一致)。
 
二级缓存(读写缓存)中的数据的过期策略是:当有服务实例发生注册、下线时,就会主动过期(更新)其对应的二级缓存。此外构建二级缓存时会指定默认的过期时间为180秒,180秒后自动过期。
 
四.多级缓存如何降低并发读写频率
一级缓存使用ConcurrentHashMap实现,二级缓存使用Guava Cache实现。一级缓存主要用于降低每分钟的写频率,二级缓存主要用于冷热数据分离。
 
当服务注册表的某个key对应的数据更新时,二级缓存的数据会立即更新,30秒后一级缓存的数据也一定会更新。
 
当服务注册表的某个key数据没有更新时,二级缓存的数据180秒后就会过期,过期的30秒后一级缓存的数据变为null。后续访问一级缓存时,再从服务注册表中获取对应的数据,写入二级缓存。
 
通过这两级缓存,就可以大大降低每分钟并发读写缓存注册表的冲突次数,比如一级缓存就可以将每分钟并发读写冲突的次数从10次降为2次,因为一级缓存的数据最多每30秒更新一次。而二级缓存则实现冷数据180秒没访问则不缓存,减少需要缓存的热数据数量。
 
五.多级缓存的本质
一级缓存是只读缓存,二级缓存是读写缓存。在一级缓存(只读缓存)中存在很少的并发读写情况,每分钟两次。在二级缓存(读写缓存)中会进行冷热数据分离,冷数据不缓存。
 
服务注册表的数据更新之后,最多30秒就会更新到一级缓存,多级缓存就是牺牲读数据和写数据的实时一致性来降低读写竞争的频率。
public class MutilCache {    //一级缓存(只读缓存)    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();    //二级缓存(读写缓存)    private final LoadingCache<Key, Value> readWriteCacheMap;    public MultiCache() {        this.readWriteCacheMap = CacheBuilder.newBuilder()            .expireAfterWrite(180, TimeUnit.SECONDS)            .build(new CacheLoader<Key, Value>() {                @Override                public Value load(Key key) {                    //从注册表中获取数据                    return getFromRegistry(key);                }            });        ...    }        //读取服务注册表中的某个key的值    public Value get(final Key key) {        return getValue(key);    }    public Value getValue(final Key key) {        Value payload = null;        try {            //首先读取一级缓存(只读缓存)的数据            final Value currentPayload = readOnlyCacheMap.get(key);            if (currentPayload != null) {                payload = currentPayload;            } else {                //然后再读二级缓存(读写缓存)的数据                payload = readWriteCacheMap.get(key);                readOnlyCacheMap.put(key, payload);            }        } catch (Throwable t) {            logger.error("Cannot get value for key : {}", key, t);        }        return payload;    }        private TimerTask getCacheUpdateTask() {        return new TimerTask() {            @Override            public void run() {                //遍历一级缓存(只读缓存)                for (Key key : readOnlyCacheMap.keySet()) {                    Value cacheValue = readWriteCacheMap.get(key);                    Value currentCacheValue = readOnlyCacheMap.get(key);                    //判断二级缓存的数据是否和一级缓存的数据相同,如果不相同,则更新一级缓存的数据                    if (cacheValue != currentCacheValue) {                        readOnlyCacheMap.put(key, cacheValue);                    }                }            }        }    }    ...} 
9.锁故障之死锁总结
(1)产生死锁的例子
(2)死锁产生的必要条件
(3)如何解决死锁问题
(4)避免死锁的常见方式
 
(1)产生死锁的案例
案例一:
如果在一个类的<clint>()方法中有耗时的操作,那么可能会造成多个线程同时初始化该类时产生阻塞。
 
如果两个类在各自的静态语句块中都尝试通过反射初始化对方,那么并发调用这两个类的初始化方法就可能会产生死锁。
 
案例二:
简单的锁顺序死锁,类不一样。
synchronized(left)    synchronized(right)------------------------synchronized(right)    synchronized(left)案例三:
动态的锁顺序死锁,类一样,但对象实例不一样,比如相互转账场景。
synchronized(fromAccount)    synchronized(toAccount)(2)死锁产生的必要条件
不管是线程级别的死锁,还是数据库级别的死锁,只能通过人工干预解决。导致死锁的条件有如下四个,这四个条件同时满足就会产生死锁。
 
条件一:互斥条件
共享资源X和Y只能被一个线程占用。
 
条件二:占有且等待条件
线程1已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X。
 
条件三:不可抢占条件
其他线程不能强行抢占线程1占有的资源。
 
条件四:循环等待条件
线程1在等待线程2占有的共享资源,线程2在等待线程1占有的共享资源。
 
(3)如何解决死锁问题
只要破坏产生死锁的四个条件中的任意一个,就可以避免产生死锁。其中互斥条件不可破坏,因为这是互斥锁的基本约束,其他三个条件都可破坏。
 
一.破坏占有且等待条件
要破坏在占有资源X的同时还在等待资源Y,可以一次性申请资源X和资源Y。每个线程都一次性申请所有的资源,这样就不存在占有一部分等待一部分了。
 
二.破坏不可抢占条件
当占用部分资源的线程进一步申请其他资源时,如果申请不到,则可以主动释放其占有的资源,这样不可抢占条件就被破坏了。破坏不可抢占条件的核心是当前线程能够主动释放尝试占有的资源。
 
synchronized在申请不到资源的时候会直接进入阻塞状态,一旦线程被阻塞之后就无法再释放已经占有的资源。Lock接口中的tryLock()方法可以尝试抢占锁,抢占失败不会阻塞线程。所以synchronized不能实现破坏该条件,但Lock.tryLock()可以实现。
 
三.破坏循环等待条件
把资源按照某种顺序编号,所有锁资源的申请都按照某种顺序来获取。比如所有线程在申请资源时,都是先申请资源编号小的,再申请资源编号大的。
 
(4)避免死锁的常见方式
一.避免一个线程同时获得多个锁
二.避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源
三.尝试使用定时锁,使用Lock.tryLock(timeout)来替代使用内部锁
四.对于数据库锁,加锁和解锁必须要在同一个数据库连接里
 
10.无锁化编程
(1)一写一读的无锁队列(内存屏障)
(2)一写多读的无锁队列(volatile关键字)
(3)多写多读的无锁队列(CAS机制)
(3)多写多读的无锁队列(CAS机制)
(4)多写多读的无锁栈(CAS机制)
(5)多写多读的无锁链表
 
锁在Java中主要是指synchronized关键字和Lock接口,锁在Linux中主要是指pthread和mutex。
 
(1)一写一读的无锁队列(内存屏障)
一写一读的无锁队列即Linux内核的kfifo队列。一写一读两个线程,不需要锁,只需要内存屏障。
 
(2)一写多读的无锁队列(volatile关键字)
Disruptor是一个开源的并发框架,能在无锁的情况下实现队列并发操作。Disruptor的RingBuffer之所以可以做到完全无锁,是因为单线程写。RingBuffer有一个头指针,对应一个生产者线程。RingBuffer有多个尾指针,对应多个消费者线程。指针类型都是volatile的。
 
(3)多写多读的无锁队列(CAS机制)
基于CAS和链表,就可以实现一个多写多读的无锁队列。链表有一个头指针head和一个尾指针tail。入队时对tail指针进行CAS操作,出队时对head指针进行CAS操作。
 
(4)多写多读的无锁栈(CAS机制)
无锁栈比无锁队列的实现更简单,只需要对head指针进行CAS操作即可。入栈时对head指针进行CAS操作,出栈时也是对head指针进行CAS操作。
 
(5)多写多读的无锁链表
ConcurrentSkipListMap并发跳表查询就是基于无锁链表的。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

224

主题

0

回帖

682

积分

高级会员

积分
682

QQ|智能设备 | 粤ICP备2024353841号-1

GMT+8, 2025-3-12 02:06 , Processed in 0.777661 second(s), 26 queries .

Powered by 智能设备

©2025

|网站地图