目录

自定义分布式可重入锁

目录

思路:先获取进程内的独占锁,再通过redis原子操作获取锁。以减少与redis的网络交互。对于锁的可重入,使用ThreadLocal进行标识,每重入一次加一,释放一次减一。

代码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
/**
 * 分布式可重入锁
 *
 * @author Eddie
 */
public class RedisReentrantLock extends ReentrantLock {
	/**
	 * lockId,锁id
	 */
	private final ThreadLocal<String> lockId = ThreadLocal.withInitial(() -> UUID.randomUUID().toString());
	/**
	 * 锁键值,由具体的业务功能定义
	 */
	private final String lockKey;
	/**
	 * 当前获取锁的对象的键值
	 */
	private final String ownerLockKey;

	/**
	 * redis操作客户端对象
	 */
	private final JedisCommands jedisCommands;

	/**
	 * 重入次数
	 */
	private final ThreadLocal<Integer> state = ThreadLocal.withInitial(() -> 0);
	/**
	 * 超时时间,默认5秒,超过此时间,将会移除锁标志,重新竞争锁对象
	 */
	private final int timeOutMills;
	private static final int DEFAULT_TIME_OUT_MILLS = 5_000;

	public RedisReentrantLock(JedisCommands jedisCommands, String lockKey) {
		this(jedisCommands, lockKey, DEFAULT_TIME_OUT_MILLS);
	}

	public RedisReentrantLock(JedisCommands jedisCommands, String lockKey, int timeOutMills) {
		super();
		this.jedisCommands = jedisCommands;
		this.lockKey = lockKey;
		this.ownerLockKey = lockKey + ":current";
		this.timeOutMills = timeOutMills;
	}

	@Override
	public void lock() {
		super.lock();
		this.redisLock();
	}

	private void redisLock() {
		Random r = new Random();
		while (!tryRedisLock()) {
			//1000ns
			LockSupport.parkNanos(this, r.nextInt(2000));
		}
	}

	@Override
	public boolean tryLock() {
		return super.tryLock() && tryRedisLock();
	}

	private boolean tryRedisLock() {
		Long incr = jedisCommands.incr(lockKey);
		long expireUnixTime = System.currentTimeMillis() / 1000 + (timeOutMills / 1000);
		jedisCommands.expireAt(lockKey, expireUnixTime);
		String lockId = this.lockId.get();
		String currentLockId = jedisCommands.get(ownerLockKey);
		if (incr == 1) {
			jedisCommands.set(ownerLockKey, lockId);
			jedisCommands.expireAt(ownerLockKey, expireUnixTime);
			state.set(state.get() + 1);
			return true;
		} else if (lockId.equals(currentLockId)) {
			state.set(state.get() + 1);
			return true;
		}
		Thread.yield();
		return false;
	}

	@Override
	public void unlock() {
		this.unRedisLock();
		super.unlock();
	}

	private void unRedisLock() {
		Random r = new Random();
		String currentLockId = jedisCommands.get(ownerLockKey);
		state.set(state.get() - 1);
		if (lockId.get().equals(currentLockId)) {
			if (state.get() == 0) {
				jedisCommands.del(ownerLockKey);
				jedisCommands.del(lockKey);
			}
			//1000ns
			LockSupport.parkNanos(this, r.nextInt(2000));
		} else {
			if (state.get() != 0) {
				state.set(0);
			}
		}
	}
}

测试代码(同时启动多次,来模拟多个进程间锁的争抢):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class RedisReentrantLock extends ReentrantLock {

public static void main(String[] args) throws InterruptedException {
		String host = "192.168.0.103";
		String vmware = "192.168.116.125";
		String localhost = "127.0.0.1";
		GenericObjectPoolConfig config = new GenericObjectPoolConfig();
		config.setMaxIdle(100);
		config.setMinIdle(100);
		config.setMaxTotal(1000);
		config.setMaxWaitMillis(-1);
		config.setTestWhileIdle(true);
		config.setTestWhileIdle(true);
		config.setTimeBetweenEvictionRunsMillis(1000);
		JedisPool jedisPool = new JedisPool(config, localhost, 6379);
		Jedis jedis = new Jedis(localhost, 6379);

		RedisReentrantLock lock = new RedisReentrantLock(jedis, "delivery");
		ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
				50, 50, 10, TimeUnit.SECONDS,
				new ArrayBlockingQueue<>(50000), new ThreadPoolExecutor.DiscardPolicy());

		CountDownLatch countDownLatch = new CountDownLatch(500);
		long t1 = System.currentTimeMillis();
		for (int i = 0; i < 500; i++) {
			threadPool.execute(() -> {
//				Jedis j = jedisPool.getResource();
//				Long incr = j.incr("lock");
//				while (incr != 1) {
//					incr = j.incr("lock");
//				}
				lock.lock();
				try {
					System.out.println(Thread.currentThread().getName() + " start..." + System.currentTimeMillis());
					Thread.sleep(3000);
					System.out.println(Thread.currentThread().getName() + " end...");
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					lock.unlock();
//					j.del("lock");
//					jedisPool.returnResource(j);
					countDownLatch.countDown();
				}
			});
		}
		countDownLatch.await();
		long t2 = System.currentTimeMillis();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println(sdf.format(t1));
		System.out.println(sdf.format(t2));
		System.out.println((t2 - t1) / 1000f);
	}
}

此实现存在的问题:以上代码默认超时时间5秒,如果不加锁超时时间,如果网络抖动或者抢到锁的线程问题导致未能释放锁,就会造成死锁问题;如果加了超时时间,锁超时与下次竞争锁,这之间会存在数据一致性问题。

其他的优化空间:以上的实现中,抢到进程内独占锁的线程,会一直循环,抢不到等待一段时间再抢,这种机制不是很好。可以通过redis发布订阅机制来完善,抢不到redis锁的线程进行休眠,当抢到锁的线程释放了锁,且重入次数归零后,通过发布消息的机制来唤醒其他进程内在等待的线程。唤醒可以使用LockSupport.unpark(thread);