思路:先获取进程内的独占锁,再通过redis原子操作获取锁。以减少与redis的网络交互。对于锁的可重入,使用ThreadLocal进行标识,每重入一次加一,释放一次减一。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
/**
* 分布式可重入锁
*
* @author Eddie
*/
public class RedisReentrantLock extends ReentrantLock {
/**
* lockId,锁id
*/
private final ThreadLocal<String> lockId = ThreadLocal.withInitial(() -> UUID.randomUUID().toString());
/**
* 锁键值,由具体的业务功能定义
*/
private final String lockKey;
/**
* 当前获取锁的对象的键值
*/
private final String ownerLockKey;
/**
* redis操作客户端对象
*/
private final JedisCommands jedisCommands;
/**
* 重入次数
*/
private final ThreadLocal<Integer> state = ThreadLocal.withInitial(() -> 0);
/**
* 超时时间,默认5秒,超过此时间,将会移除锁标志,重新竞争锁对象
*/
private final int timeOutMills;
private static final int DEFAULT_TIME_OUT_MILLS = 5_000;
public RedisReentrantLock(JedisCommands jedisCommands, String lockKey) {
this(jedisCommands, lockKey, DEFAULT_TIME_OUT_MILLS);
}
public RedisReentrantLock(JedisCommands jedisCommands, String lockKey, int timeOutMills) {
super();
this.jedisCommands = jedisCommands;
this.lockKey = lockKey;
this.ownerLockKey = lockKey + ":current";
this.timeOutMills = timeOutMills;
}
@Override
public void lock() {
super.lock();
this.redisLock();
}
private void redisLock() {
Random r = new Random();
while (!tryRedisLock()) {
//1000ns
LockSupport.parkNanos(this, r.nextInt(2000));
}
}
@Override
public boolean tryLock() {
return super.tryLock() && tryRedisLock();
}
private boolean tryRedisLock() {
Long incr = jedisCommands.incr(lockKey);
long expireUnixTime = System.currentTimeMillis() / 1000 + (timeOutMills / 1000);
jedisCommands.expireAt(lockKey, expireUnixTime);
String lockId = this.lockId.get();
String currentLockId = jedisCommands.get(ownerLockKey);
if (incr == 1) {
jedisCommands.set(ownerLockKey, lockId);
jedisCommands.expireAt(ownerLockKey, expireUnixTime);
state.set(state.get() + 1);
return true;
} else if (lockId.equals(currentLockId)) {
state.set(state.get() + 1);
return true;
}
Thread.yield();
return false;
}
@Override
public void unlock() {
this.unRedisLock();
super.unlock();
}
private void unRedisLock() {
Random r = new Random();
String currentLockId = jedisCommands.get(ownerLockKey);
state.set(state.get() - 1);
if (lockId.get().equals(currentLockId)) {
if (state.get() == 0) {
jedisCommands.del(ownerLockKey);
jedisCommands.del(lockKey);
}
//1000ns
LockSupport.parkNanos(this, r.nextInt(2000));
} else {
if (state.get() != 0) {
state.set(0);
}
}
}
}
|
测试代码(同时启动多次,来模拟多个进程间锁的争抢):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
public class RedisReentrantLock extends ReentrantLock {
public static void main(String[] args) throws InterruptedException {
String host = "192.168.0.103";
String vmware = "192.168.116.125";
String localhost = "127.0.0.1";
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxIdle(100);
config.setMinIdle(100);
config.setMaxTotal(1000);
config.setMaxWaitMillis(-1);
config.setTestWhileIdle(true);
config.setTestWhileIdle(true);
config.setTimeBetweenEvictionRunsMillis(1000);
JedisPool jedisPool = new JedisPool(config, localhost, 6379);
Jedis jedis = new Jedis(localhost, 6379);
RedisReentrantLock lock = new RedisReentrantLock(jedis, "delivery");
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
50, 50, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50000), new ThreadPoolExecutor.DiscardPolicy());
CountDownLatch countDownLatch = new CountDownLatch(500);
long t1 = System.currentTimeMillis();
for (int i = 0; i < 500; i++) {
threadPool.execute(() -> {
// Jedis j = jedisPool.getResource();
// Long incr = j.incr("lock");
// while (incr != 1) {
// incr = j.incr("lock");
// }
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " start..." + System.currentTimeMillis());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + " end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
// j.del("lock");
// jedisPool.returnResource(j);
countDownLatch.countDown();
}
});
}
countDownLatch.await();
long t2 = System.currentTimeMillis();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sdf.format(t1));
System.out.println(sdf.format(t2));
System.out.println((t2 - t1) / 1000f);
}
}
|
此实现存在的问题:以上代码默认超时时间5秒,如果不加锁超时时间,如果网络抖动或者抢到锁的线程问题导致未能释放锁,就会造成死锁问题;如果加了超时时间,锁超时与下次竞争锁,这之间会存在数据一致性问题。
其他的优化空间:以上的实现中,抢到进程内独占锁的线程,会一直循环,抢不到等待一段时间再抢,这种机制不是很好。可以通过redis发布订阅机制来完善,抢不到redis锁的线程进行休眠,当抢到锁的线程释放了锁,且重入次数归零后,通过发布消息的机制来唤醒其他进程内在等待的线程。唤醒可以使用LockSupport.unpark(thread);