Skip to content

Commit e515132

Browse files
Meteorkorartembilan
authored andcommitted
GH-3655: Add automatically delete for Redis Locks
Fixes #3655 * support automatically clean up cache * RedisLockRegistry.capacity desc
1 parent 25100d1 commit e515132

File tree

3 files changed

+217
-11
lines changed

3 files changed

+217
-11
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,9 +19,10 @@
1919
import java.text.SimpleDateFormat;
2020
import java.util.Collections;
2121
import java.util.Date;
22+
import java.util.LinkedHashMap;
2223
import java.util.Map;
24+
import java.util.Map.Entry;
2325
import java.util.UUID;
24-
import java.util.concurrent.ConcurrentHashMap;
2526
import java.util.concurrent.Executor;
2627
import java.util.concurrent.ExecutorService;
2728
import java.util.concurrent.Executors;
@@ -68,6 +69,7 @@
6869
* @author Konstantin Yakimov
6970
* @author Artem Bilan
7071
* @author Vedran Pavic
72+
* @author Unseok Kim
7173
*
7274
* @since 4.0
7375
*
@@ -78,6 +80,8 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl
7880

7981
private static final long DEFAULT_EXPIRE_AFTER = 60000L;
8082

83+
private static final int DEFAULT_CAPACITY = 1_000_000;
84+
8185
private static final String OBTAIN_LOCK_SCRIPT =
8286
"local lockClientId = redis.call('GET', KEYS[1])\n" +
8387
"if lockClientId == ARGV[1] then\n" +
@@ -90,7 +94,15 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl
9094
"return false";
9195

9296

93-
private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
97+
private final Map<String, RedisLock> locks =
98+
new LinkedHashMap<String, RedisLock>(16, 0.75F, true) {
99+
100+
@Override
101+
protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
102+
return size() > RedisLockRegistry.this.capacity;
103+
}
104+
105+
};
94106

95107
private final String clientId = UUID.randomUUID().toString();
96108

@@ -102,6 +114,8 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl
102114

103115
private final long expireAfter;
104116

117+
private int capacity = DEFAULT_CAPACITY;
118+
105119
/**
106120
* An {@link ExecutorService} to call {@link StringRedisTemplate#delete} in
107121
* the separate thread when the current one is interrupted.
@@ -152,21 +166,34 @@ public void setExecutor(Executor executor) {
152166
this.executorExplicitlySet = true;
153167
}
154168

169+
/**
170+
* Set the capacity of cached locks.
171+
* @param capacity The capacity of cached lock, (default 1_000_000).
172+
* @since 5.5.6
173+
*/
174+
public void setCapacity(int capacity) {
175+
this.capacity = capacity;
176+
}
177+
155178
@Override
156179
public Lock obtain(Object lockKey) {
157180
Assert.isInstanceOf(String.class, lockKey);
158181
String path = (String) lockKey;
159-
return this.locks.computeIfAbsent(path, RedisLock::new);
182+
synchronized (this.locks) {
183+
return this.locks.computeIfAbsent(path, RedisLock::new);
184+
}
160185
}
161186

162187
@Override
163188
public void expireUnusedOlderThan(long age) {
164189
long now = System.currentTimeMillis();
165-
this.locks.entrySet()
166-
.removeIf((entry) -> {
167-
RedisLock lock = entry.getValue();
168-
return now - lock.getLockedAt() > age && !lock.isAcquiredInThisProcess();
169-
});
190+
synchronized (this.locks) {
191+
this.locks.entrySet()
192+
.removeIf((entry) -> {
193+
RedisLock lock = entry.getValue();
194+
return now - lock.getLockedAt() > age && !lock.isAcquiredInThisProcess();
195+
});
196+
}
170197
}
171198

172199
@Override

spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java

Lines changed: 178 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,10 +24,13 @@
2424

2525
import java.util.Map;
2626
import java.util.Properties;
27+
import java.util.Queue;
2728
import java.util.UUID;
2829
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.ExecutorService;
2931
import java.util.concurrent.Executors;
3032
import java.util.concurrent.Future;
33+
import java.util.concurrent.LinkedBlockingQueue;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicBoolean;
3336
import java.util.concurrent.locks.Lock;
@@ -51,6 +54,7 @@
5154
* @author Konstantin Yakimov
5255
* @author Artem Bilan
5356
* @author Vedran Pavic
57+
* @author Unseok Kim
5458
*
5559
* @since 4.0
5660
*
@@ -435,9 +439,176 @@ public void testExpireNotChanged() throws Exception {
435439
lock.unlock();
436440
}
437441

442+
@Test
443+
@RedisAvailable
444+
public void concurrentObtainCapacityTest() throws InterruptedException {
445+
final int KEY_CNT = 500;
446+
final int CAPACITY_CNT = 179;
447+
final int THREAD_CNT = 4;
448+
449+
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_CNT);
450+
final RedisConnectionFactory connectionFactory = getConnectionFactoryForTest();
451+
final RedisLockRegistry registry = new RedisLockRegistry(connectionFactory, this.registryKey, 10000);
452+
registry.setCapacity(CAPACITY_CNT);
453+
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);
454+
455+
for (int i = 0; i < KEY_CNT; i++) {
456+
int finalI = i;
457+
executorService.submit(() -> {
458+
countDownLatch.countDown();
459+
try {
460+
countDownLatch.await();
461+
}
462+
catch (InterruptedException e) {
463+
Thread.currentThread().interrupt();
464+
}
465+
String keyId = "foo:" + finalI;
466+
Lock obtain = registry.obtain(keyId);
467+
obtain.lock();
468+
obtain.unlock();
469+
});
470+
}
471+
executorService.shutdown();
472+
executorService.awaitTermination(5, TimeUnit.SECONDS);
473+
474+
//capacity limit test
475+
assertThat(TestUtils.getPropertyValue(registry, "locks", Map.class).size()).isEqualTo(CAPACITY_CNT);
476+
477+
478+
registry.expireUnusedOlderThan(-1000);
479+
assertThat(TestUtils.getPropertyValue(registry, "locks", Map.class).size()).isEqualTo(0);
480+
}
481+
482+
@Test
483+
@RedisAvailable
484+
public void concurrentObtainRemoveOrderTest() throws InterruptedException {
485+
final int THREAD_CNT = 2;
486+
final int DUMMY_LOCK_CNT = 3;
487+
488+
final int CAPACITY_CNT = THREAD_CNT;
489+
490+
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_CNT);
491+
final RedisConnectionFactory connectionFactory = getConnectionFactoryForTest();
492+
final RedisLockRegistry registry = new RedisLockRegistry(connectionFactory, this.registryKey, 10000);
493+
registry.setCapacity(CAPACITY_CNT);
494+
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);
495+
final Queue<String> remainLockCheckQueue = new LinkedBlockingQueue<>();
496+
497+
//Removed due to capcity limit
498+
for (int i = 0; i < DUMMY_LOCK_CNT; i++) {
499+
Lock obtainLock0 = registry.obtain("foo:" + i);
500+
obtainLock0.lock();
501+
obtainLock0.unlock();
502+
}
503+
504+
for (int i = DUMMY_LOCK_CNT; i < THREAD_CNT + DUMMY_LOCK_CNT; i++) {
505+
int finalI = i;
506+
executorService.submit(() -> {
507+
countDownLatch.countDown();
508+
try {
509+
countDownLatch.await();
510+
}
511+
catch (InterruptedException e) {
512+
Thread.currentThread().interrupt();
513+
}
514+
String keyId = "foo:" + finalI;
515+
remainLockCheckQueue.offer(keyId);
516+
Lock obtain = registry.obtain(keyId);
517+
obtain.lock();
518+
obtain.unlock();
519+
});
520+
}
521+
522+
executorService.shutdown();
523+
executorService.awaitTermination(5, TimeUnit.SECONDS);
524+
525+
assertThat(getRedisLockRegistryLocks(registry)).containsKeys(
526+
remainLockCheckQueue.toArray(new String[remainLockCheckQueue.size()]));
527+
}
528+
529+
@Test
530+
@RedisAvailable
531+
public void concurrentObtainAccessRemoveOrderTest() throws InterruptedException {
532+
final int THREAD_CNT = 2;
533+
final int DUMMY_LOCK_CNT = 3;
534+
535+
final int CAPACITY_CNT = THREAD_CNT + 1;
536+
final String REMAIN_DUMMY_LOCK_KEY = "foo:1";
537+
538+
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_CNT);
539+
final RedisConnectionFactory connectionFactory = getConnectionFactoryForTest();
540+
final RedisLockRegistry registry = new RedisLockRegistry(connectionFactory, this.registryKey, 10000);
541+
registry.setCapacity(CAPACITY_CNT);
542+
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);
543+
final Queue<String> remainLockCheckQueue = new LinkedBlockingQueue<>();
544+
545+
//Removed due to capcity limit
546+
for (int i = 0; i < DUMMY_LOCK_CNT; i++) {
547+
Lock obtainLock0 = registry.obtain("foo:" + i);
548+
obtainLock0.lock();
549+
obtainLock0.unlock();
550+
}
551+
552+
Lock obtainLock0 = registry.obtain(REMAIN_DUMMY_LOCK_KEY);
553+
obtainLock0.lock();
554+
obtainLock0.unlock();
555+
remainLockCheckQueue.offer(REMAIN_DUMMY_LOCK_KEY);
556+
557+
for (int i = DUMMY_LOCK_CNT; i < THREAD_CNT + DUMMY_LOCK_CNT; i++) {
558+
int finalI = i;
559+
executorService.submit(() -> {
560+
countDownLatch.countDown();
561+
try {
562+
countDownLatch.await();
563+
}
564+
catch (InterruptedException e) {
565+
Thread.currentThread().interrupt();
566+
}
567+
String keyId = "foo:" + finalI;
568+
remainLockCheckQueue.offer(keyId);
569+
Lock obtain = registry.obtain(keyId);
570+
obtain.lock();
571+
obtain.unlock();
572+
});
573+
}
574+
575+
executorService.shutdown();
576+
executorService.awaitTermination(5, TimeUnit.SECONDS);
577+
578+
assertThat(getRedisLockRegistryLocks(registry)).containsKeys(
579+
remainLockCheckQueue.toArray(new String[remainLockCheckQueue.size()]));
580+
}
581+
582+
@Test
583+
@RedisAvailable
584+
public void setCapacityTest() {
585+
final int CAPACITY_CNT = 4;
586+
final RedisConnectionFactory connectionFactory = getConnectionFactoryForTest();
587+
final RedisLockRegistry registry = new RedisLockRegistry(connectionFactory, this.registryKey, 10000);
588+
registry.setCapacity(CAPACITY_CNT);
589+
590+
registry.obtain("foo:1");
591+
registry.obtain("foo:2");
592+
registry.obtain("foo:3");
593+
594+
//capacity 4->3
595+
registry.setCapacity(CAPACITY_CNT - 1);
596+
597+
registry.obtain("foo:4");
598+
599+
assertThat(TestUtils.getPropertyValue(registry, "locks", Map.class).size()).isEqualTo(3);
600+
assertThat(getRedisLockRegistryLocks(registry)).containsKeys("foo:2", "foo:3", "foo:4");
601+
602+
//capacity 3->4
603+
registry.setCapacity(CAPACITY_CNT);
604+
registry.obtain("foo:5");
605+
assertThat(TestUtils.getPropertyValue(registry, "locks", Map.class).size()).isEqualTo(4);
606+
assertThat(getRedisLockRegistryLocks(registry)).containsKeys("foo:3", "foo:4", "foo:5");
607+
}
608+
438609
@SuppressWarnings({ "unchecked", "rawtypes" })
439610
@Test
440-
public void ntestUlink() {
611+
public void testUlink() {
441612
RedisOperations ops = mock(RedisOperations.class);
442613
Properties props = new Properties();
443614
willReturn(props).given(ops).execute(any(RedisCallback.class));
@@ -464,4 +635,9 @@ private void waitForExpire(String key) throws Exception {
464635
assertThat(n < 100).as(key + " key did not expire").isTrue();
465636
}
466637

638+
@SuppressWarnings("unchecked")
639+
private Map<String, Lock> getRedisLockRegistryLocks(RedisLockRegistry registry) {
640+
return TestUtils.getPropertyValue(registry, "locks", Map.class);
641+
}
642+
467643
}

src/reference/asciidoc/redis.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,3 +883,6 @@ However, the resources protected by such a lock may have been compromised, so su
883883
You should set the expiry at a large enough value to prevent this condition, but set it low enough that the lock can be recovered after a server failure in a reasonable amount of time.
884884

885885
Starting with version 5.0, the `RedisLockRegistry` implements `ExpirableLockRegistry`, which removes locks last acquired more than `age` ago and that are not currently locked.
886+
887+
String with version 5.5.6, the `RedisLockRegistry` is support automatically clean up cache for redisLocks in `RedisLockRegistry.locks` via `RedisLockRegistry.setCapacity()`.
888+
See its JavaDocs for more information.

0 commit comments

Comments
 (0)