Improvement - Virtual Threads compatibility #5499

pull/5520/head
Nikita Koksharov 1 year ago
parent 7b89326c7a
commit 103e16eee4

@ -15,6 +15,8 @@
*/ */
package org.redisson.cache; package org.redisson.cache;
import org.redisson.misc.WrappedLock;
import java.util.*; import java.util.*;
import java.util.AbstractMap.SimpleEntry; import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -486,16 +488,16 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
@Override @Override
public boolean remove(Object key, Object value) { public boolean remove(Object key, Object value) {
CachedValue<K, V> e = null; CachedValue<K, V> e = lock.execute(() -> {
synchronized (map) {
CachedValue<K, V> entry = map.get(key); CachedValue<K, V> entry = map.get(key);
if (entry != null if (entry != null
&& entry.getValue().equals(value) && entry.getValue().equals(value)
&& !isValueExpired(entry)) { && !isValueExpired(entry)) {
map.remove(key); map.remove(key);
e = entry; return entry;
} }
} return null;
});
if (e != null) { if (e != null) {
onValueRemove(e); onValueRemove(e);
return true; return true;
@ -503,19 +505,21 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
return false; return false;
} }
private final WrappedLock lock = new WrappedLock();
@Override @Override
public boolean replace(K key, V oldValue, V newValue) { public boolean replace(K key, V oldValue, V newValue) {
CachedValue<K, V> e = null; CachedValue<K, V> e = lock.execute(() -> {
synchronized (map) {
CachedValue<K, V> entry = map.get(key); CachedValue<K, V> entry = map.get(key);
if (entry != null if (entry != null
&& entry.getValue().equals(oldValue) && entry.getValue().equals(oldValue)
&& !isValueExpired(entry)) { && !isValueExpired(entry)) {
CachedValue<K, V> newEntry = create(key, newValue, timeToLiveInMillis, maxIdleInMillis); CachedValue<K, V> newEntry = create(key, newValue, timeToLiveInMillis, maxIdleInMillis);
map.put(key, newEntry); map.put(key, newEntry);
e = entry; return entry;
} }
} return null;
});
if (e != null) { if (e != null) {
onValueRemove(e); onValueRemove(e);
return true; return true;
@ -525,16 +529,16 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
@Override @Override
public V replace(K key, V value) { public V replace(K key, V value) {
CachedValue<K, V> e = null; CachedValue<K, V> e = lock.execute(() -> {
synchronized (map) {
CachedValue<K, V> entry = map.get(key); CachedValue<K, V> entry = map.get(key);
if (entry != null if (entry != null
&& !isValueExpired(entry)) { && !isValueExpired(entry)) {
CachedValue<K, V> newEntry = create(key, value, timeToLiveInMillis, maxIdleInMillis); CachedValue<K, V> newEntry = create(key, value, timeToLiveInMillis, maxIdleInMillis);
map.put(key, newEntry); map.put(key, newEntry);
e = entry; return entry;
} }
} return null;
});
if (e != null) { if (e != null) {
onValueRemove(e); onValueRemove(e);
return e.getValue(); return e.getValue();

@ -15,6 +15,8 @@
*/ */
package org.redisson.cache; package org.redisson.cache;
import org.redisson.misc.WrappedLock;
/** /**
* Created by jribble on 2/20/17. * Created by jribble on 2/20/17.
*/ */
@ -23,4 +25,6 @@ public interface CachedValue<K, V> extends ExpirableValue {
K getKey(); K getKey();
V getValue(); V getValue();
WrappedLock getLock();
} }

@ -57,12 +57,12 @@ public class LFUCacheMap<K, V> extends AbstractCacheMap<K, V> {
} }
public static class LFUCachedValue extends StdCachedValue<Object, Object> { public static class LFUCachedValue<K, V> extends StdCachedValue<K, V> {
private final Long id; private final Long id;
private long accessCount; private long accessCount;
public LFUCachedValue(long id, Object key, Object value, long ttl, long maxIdleTime) { public LFUCachedValue(long id, K key, V value, long ttl, long maxIdleTime) {
super(key, value, ttl, maxIdleTime); super(key, value, ttl, maxIdleTime);
this.id = id; this.id = id;
} }
@ -74,46 +74,47 @@ public class LFUCacheMap<K, V> extends AbstractCacheMap<K, V> {
} }
private final AtomicLong idGenerator = new AtomicLong(); private final AtomicLong idGenerator = new AtomicLong();
private final ConcurrentNavigableMap<MapKey, LFUCachedValue> accessMap = new ConcurrentSkipListMap<MapKey, LFUCachedValue>(); private final ConcurrentNavigableMap<MapKey, LFUCachedValue<K, V>> accessMap = new ConcurrentSkipListMap<>();
public LFUCacheMap(int size, long timeToLiveInMillis, long maxIdleInMillis) { public LFUCacheMap(int size, long timeToLiveInMillis, long maxIdleInMillis) {
super(size, timeToLiveInMillis, maxIdleInMillis); super(size, timeToLiveInMillis, maxIdleInMillis);
} }
@Override @Override
protected CachedValue create(K key, V value, long ttl, long maxIdleTime) { protected CachedValue<K, V> create(K key, V value, long ttl, long maxIdleTime) {
return new LFUCachedValue(idGenerator.incrementAndGet(), key, value, ttl, maxIdleTime); return new LFUCachedValue<K, V>(idGenerator.incrementAndGet(), key, value, ttl, maxIdleTime);
} }
@Override @Override
protected void onValueCreate(CachedValue value) { protected void onValueCreate(CachedValue<K, V> value) {
MapKey key = toKey((LFUCachedValue) value); MapKey key = toKey((LFUCachedValue<K, V>) value);
accessMap.put(key, (LFUCachedValue) value); accessMap.put(key, (LFUCachedValue<K, V>) value);
} }
@Override @Override
protected void onValueRead(CachedValue value) { protected void onValueRead(CachedValue<K, V> value) {
addAccessCount((LFUCachedValue) value, 1); addAccessCount((LFUCachedValue<K, V>) value, 1);
} }
private MapKey toKey(LFUCachedValue value) { private MapKey toKey(LFUCachedValue<K, V> value) {
return new MapKey(value.accessCount, value); return new MapKey(value.accessCount, value);
} }
@Override @Override
protected void onValueRemove(CachedValue value) { protected void onValueRemove(CachedValue<K, V> value) {
synchronized (value) { value.getLock().execute(() -> {
MapKey key = toKey((LFUCachedValue) value); MapKey key = toKey((LFUCachedValue<K, V>) value);
accessMap.remove(key); accessMap.remove(key);
} });
} }
private void addAccessCount(LFUCachedValue value, long count) { private void addAccessCount(LFUCachedValue<K, V> value, long c) {
synchronized (value) { value.getLock().execute(() -> {
long count = c;
if (count < 0 && value.accessCount == 0) { if (count < 0 && value.accessCount == 0) {
return; return;
} }
MapKey key = toKey(value); MapKey key = toKey(value);
if (accessMap.remove(key) == null) { if (accessMap.remove(key) == null) {
return; return;
@ -123,15 +124,15 @@ public class LFUCacheMap<K, V> extends AbstractCacheMap<K, V> {
count = -Math.min(value.accessCount, -count); count = -Math.min(value.accessCount, -count);
} }
value.addAccessCount(count); value.addAccessCount(count);
key = toKey(value); key = toKey(value);
accessMap.put(key, value); accessMap.put(key, value);
} });
} }
@Override @Override
protected void onMapFull() { protected void onMapFull() {
Map.Entry<MapKey, LFUCachedValue> entry = accessMap.pollFirstEntry(); Map.Entry<MapKey, LFUCachedValue<K, V>> entry = accessMap.pollFirstEntry();
if (entry == null) { if (entry == null) {
return; return;
} }
@ -143,7 +144,7 @@ public class LFUCacheMap<K, V> extends AbstractCacheMap<K, V> {
// TODO optimize // TODO optimize
// decrease all values // decrease all values
for (LFUCachedValue value : accessMap.values()) { for (LFUCachedValue<K, V> value : accessMap.values()) {
addAccessCount(value, -entry.getValue().accessCount); addAccessCount(value, -entry.getValue().accessCount);
} }
} }

@ -17,6 +17,8 @@ package org.redisson.cache;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* LRU (least recently used) cache. * LRU (least recently used) cache.
@ -29,15 +31,16 @@ import java.util.concurrent.atomic.AtomicLong;
public class LRUCacheMap<K, V> extends AbstractCacheMap<K, V> { public class LRUCacheMap<K, V> extends AbstractCacheMap<K, V> {
private final AtomicLong index = new AtomicLong(); private final AtomicLong index = new AtomicLong();
private final List<Collection<CachedValue<K, V>>> queues = private final List<Collection<CachedValue<K, V>>> queues = new ArrayList<>();
new ArrayList<Collection<CachedValue<K, V>>>(); private final Map<Collection<CachedValue<K, V>>, Lock> queueLocks = new IdentityHashMap<>();
public LRUCacheMap(int size, long timeToLiveInMillis, long maxIdleInMillis) { public LRUCacheMap(int size, long timeToLiveInMillis, long maxIdleInMillis) {
super(size, timeToLiveInMillis, maxIdleInMillis); super(size, timeToLiveInMillis, maxIdleInMillis);
for (int i = 0; i < Runtime.getRuntime().availableProcessors()*2; i++) { for (int i = 0; i < Runtime.getRuntime().availableProcessors()*2; i++) {
Set<CachedValue<K, V>> instance = Collections.synchronizedSet(new LinkedHashSet<CachedValue<K, V>>()); Set<CachedValue<K, V>> instance = Collections.synchronizedSet(new LinkedHashSet<>());
queues.add(instance); queues.add(instance);
queueLocks.put(instance, new ReentrantLock());
} }
} }
@ -60,7 +63,7 @@ public class LRUCacheMap<K, V> extends AbstractCacheMap<K, V> {
@Override @Override
protected void onValueRead(CachedValue<K, V> value) { protected void onValueRead(CachedValue<K, V> value) {
Collection<CachedValue<K, V>> queue = getQueue(value); Collection<CachedValue<K, V>> queue = getQueue(value);
// move value to tail of queue // move value to the tail of the queue
if (queue.remove(value)) { if (queue.remove(value)) {
queue.add(value); queue.add(value);
} }
@ -80,12 +83,16 @@ public class LRUCacheMap<K, V> extends AbstractCacheMap<K, V> {
Collection<CachedValue<K, V>> queue = queues.get(queueIndex); Collection<CachedValue<K, V>> queue = queues.get(queueIndex);
CachedValue<K, V> removedValue = null; CachedValue<K, V> removedValue = null;
synchronized (queue) { Lock lock = queueLocks.get(queue);
lock.lock();
try {
Iterator<CachedValue<K, V>> iter = queue.iterator(); Iterator<CachedValue<K, V>> iter = queue.iterator();
if (iter.hasNext()) { if (iter.hasNext()) {
removedValue = iter.next(); removedValue = iter.next();
iter.remove(); iter.remove();
} }
} finally {
lock.unlock();
} }
if (removedValue != null) { if (removedValue != null) {

@ -15,6 +15,8 @@
*/ */
package org.redisson.cache; package org.redisson.cache;
import org.redisson.misc.WrappedLock;
/** /**
* Created by jribble on 2/20/17. * Created by jribble on 2/20/17.
*/ */
@ -30,6 +32,8 @@ public class StdCachedValue<K, V> implements CachedValue<K, V> {
private long creationTime; private long creationTime;
private long lastAccess; private long lastAccess;
private final WrappedLock lock = new WrappedLock();
public StdCachedValue(K key, V value, long ttl, long maxIdleTime) { public StdCachedValue(K key, V value, long ttl, long maxIdleTime) {
this.value = value; this.value = value;
this.ttl = ttl; this.ttl = ttl;
@ -73,4 +77,8 @@ public class StdCachedValue<K, V> implements CachedValue<K, V> {
return "CachedValue [key=" + key + ", value=" + value + "]"; return "CachedValue [key=" + key + ", value=" + value + "]";
} }
@Override
public WrappedLock getLock() {
return lock;
}
} }

Loading…
Cancel
Save