小小程序猿
这个人很懒,什么都没写
Toggle navigation
小小程序猿
主页
关于
友链
归档
标签
分布式锁-redis
2025-03-14 09:23:27
6
0
0
terry
##一、概念 在分布式系统中,多个进程或服务可能会同时访问和修改共享资源,为了保证数据的一致性和完整性,避免多个进程同时操作共享资源而产生的数据冲突问题,就需要使用分布式锁。分布式锁是一种在分布式环境下实现互斥访问共享资源的机制,它确保在同一时刻只有一个进程或线程能够获得锁并访问共享资源。 ##二、应用场景 ###1. 库存管理 在电商系统中,多个订单处理服务可能会同时对商品库存进行扣减操作。使用分布式锁可以保证同一时刻只有一个服务能够对库存进行修改,避免超卖现象的发生。 ###2. 定时任务调度 在分布式系统中,可能会有多个节点同时部署了定时任务。使用分布式锁可以确保同一个定时任务在同一时刻只在一个节点上执行,避免任务的重复执行。 ###3. 数据处理 在数据处理过程中,多个服务可能会同时对同一份数据进行读写操作。使用分布式锁可以保证数据的一致性,避免数据冲突和错误。 ##三、实现方式 ###1. 基于数据库实现 原理:利用数据库的唯一索引或行级锁来实现分布式锁。例如,在数据库中创建一个锁表,通过插入或更新记录来获取和释放锁。 优点:实现简单,易于理解和维护。 缺点:性能较差,数据库的读写操作会成为瓶颈;锁的释放依赖于数据库连接的正常关闭,可能会出现死锁问题。 ###2. 基于 Redis 实现 原理:利用 Redis 的原子操作(如 SETNX、SET 等)来实现分布式锁。通过设置一个唯一的键值对来表示锁,获取锁时尝试设置该键值对,释放锁时删除该键值对。 优点:性能高,Redis 是内存数据库,读写速度快;支持锁的自动续期,避免锁提前过期。 缺点:Redis 是单节点或主从复制架构时,可能会出现锁丢失的问题;需要处理 Redis 节点故障和网络分区等问题。 ###3. 基于 ZooKeeper 实现 原理:利用 ZooKeeper 的临时顺序节点来实现分布式锁。每个客户端在 ZooKeeper 上创建一个临时顺序节点,节点编号最小的客户端获得锁,其他客户端监听比自己编号小的节点的删除事件,当该节点删除时,自己尝试获取锁。 优点:可靠性高,ZooKeeper 是分布式协调服务,具有高可用性和数据一致性;支持公平锁和可重入锁。 缺点:性能相对较低,ZooKeeper 的读写操作需要经过网络传输;实现复杂度较高。 ##四、分布式锁的特性 ###1. 互斥性 同一时刻只能有一个进程或线程获得锁,其他进程或线程需要等待锁的释放。 ###2. 可重入性 同一个进程或线程在持有锁的情况下,可以多次获取同一把锁,避免死锁。 ###3. 锁的超时机制 为了避免锁被长时间占用,导致其他进程或线程无法获取锁,需要设置锁的超时时间,当锁的持有时间超过超时时间时,自动释放锁。 ###4. 高可用性 分布式锁应该具备高可用性,当某个节点出现故障时,不会影响锁的正常使用。 ###5. 公平性 在某些场景下,需要保证锁的公平性,即按照请求锁的顺序依次获得锁。 ##六、使用注意事项 . 锁的粒度:锁的粒度应该尽可能小,避免长时间持有锁,影响系统的并发性能。 . 异常处理:在获取和释放锁的过程中,需要处理各种异常情况,确保锁的正确释放,避免死锁。 . 锁的超时时间:需要根据业务需求合理设置锁的超时时间,避免锁提前过期或长时间占用。 . 分布式环境的一致性:在分布式环境中,需要考虑网络延迟、节点故障等因素,确保锁的一致性和可靠性。 ``` package com.geokon.receive.util; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @Component public class BaseDistributedLock implements DisposableBean { private RedisTemplate<String, String> redisTemplate; private static final ThreadLocal<String> lockValue = new ThreadLocal<>(); private static final String defaultLockName = "receive-lock"; private static final Map<String, ScheduledFuture<?>> lockRenewalFutures = new ConcurrentHashMap<>(); private static final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor( Constant.CPU_CORE, new ThreadPoolExecutor.CallerRunsPolicy() ); @Autowired public BaseDistributedLock(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } public boolean tryLock(long waitTime) { return tryLock(defaultLockName, 3600, 300, waitTime, TimeUnit.SECONDS); } public boolean unLock(String lockKey) { return releaseLock(lockKey, lockValue.get()); } public boolean tryLock(String lockKey, long expireTime, long maxLockTime, long waitTime, TimeUnit timeUnit) { long startTime = System.currentTimeMillis(); long waitTimeMillis = TimeUnit.MILLISECONDS.convert(waitTime, timeUnit); long retryInterval = 100; int retryCount = 0; while (true) { try { if (System.currentTimeMillis() - startTime > waitTimeMillis) { break; } if (acquireLock(lockKey, expireTime, maxLockTime, timeUnit)) { return true; } // Exponential backoff retryInterval = (long) (retryInterval * Math.pow(2, retryCount)); if (retryInterval > 1000) { retryInterval = 1000; } retryCount++; TimeUnit.MILLISECONDS.sleep(retryInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } return false; } private boolean acquireLock(String lockKey, long expireTime, long maxLockTime, TimeUnit timeUnit) { String value = UUID.randomUUID().toString(); int retryTimes = 3; while (retryTimes > 0) { try { Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, timeUnit); if (result != null && result) { long startTime = System.currentTimeMillis(); ScheduledFuture<?> future = scheduleRenewal(lockKey, value, expireTime, maxLockTime, startTime, timeUnit); lockValue.set(value); lockRenewalFutures.put(lockKey, future); return true; } break; } catch (Exception e) { log.error("get lock exception, retry left: {}, thread ID={} , e => {}", retryTimes, Thread.currentThread().getId(), e); retryTimes--; if (retryTimes == 0) { return false; } try { Thread.sleep(1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return false; } } } return false; } private boolean releaseLock(String lockKey, String value) { try { String storedValue = String.valueOf(redisTemplate.opsForValue().get(lockKey)); if (value != null && value.equals(storedValue)) { cancelRenewal(lockKey); lockValue.remove(); return redisTemplate.delete(lockKey); } } catch (Exception e) { log.error("Release lock exception, thread ID={} , e => {}", Thread.currentThread().getId(), e); } return false; } private ScheduledFuture<?> scheduleRenewal(String lockKey, String value, long expireTime, long maxLockTime, long startTime, TimeUnit timeUnit) { long renewalTime = expireTime * 2 / 3; return scheduler.schedule(() -> { long elapsedTime; long maxLockTimeMillis = TimeUnit.MILLISECONDS.convert(maxLockTime, timeUnit); while (true) { String storedValue = String.valueOf(redisTemplate.opsForValue().get(lockKey)); if (value != null && value.equals(storedValue)) { elapsedTime = System.currentTimeMillis() - startTime; if (elapsedTime < maxLockTimeMillis) { try { redisTemplate.expire(lockKey, expireTime, timeUnit); } catch (Exception e) { log.error("Lock renewal exception, thread ID={} , e => {}", Thread.currentThread().getId(), e); break; } try { TimeUnit.MILLISECONDS.sleep(renewalTime); } catch (InterruptedException e) { log.error("scheduleRenewal exception, thread ID={} , e => {}", Thread.currentThread().getId(), e); break; } } else { redisTemplate.delete(lockKey); cancelRenewal(lockKey); break; } } else { break; } } }, renewalTime, timeUnit); } private void cancelRenewal(String lockKey) { ScheduledFuture<?> future = lockRenewalFutures.get(lockKey); if (future != null) { future.cancel(true); lockRenewalFutures.remove(lockKey); } } @Override public void destroy() throws Exception { scheduler.shutdown(); if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { scheduler.shutdownNow(); } } } ```
上一篇: 无
下一篇:
安装java zeromq 在mac
0
赞
6 人读过
新浪微博
微信
腾讯微博
QQ空间
人人网
提交评论
立即登录
, 发表评论.
没有帐号?
立即注册
0
条评论
More...
文档导航
没有帐号? 立即注册