|
|
|
@ -109,7 +109,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
HashValue toKeyHash(Object key) {
|
|
|
|
|
ByteBuf keyState = ((RedissonObject)map).encodeMapKey(key);
|
|
|
|
|
ByteBuf keyState = ((RedissonObject) map).encodeMapKey(key);
|
|
|
|
|
try {
|
|
|
|
|
return new HashValue(Hash.hash128(keyState));
|
|
|
|
|
} finally {
|
|
|
|
@ -189,7 +189,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
|
|
|
|
|
protected MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
|
|
|
|
|
long startPos, String pattern, int count) {
|
|
|
|
|
MapScanResult<Object, Object> res = ((RedissonMap<?, ?>)map).scanIterator(name, client, startPos, pattern, count);
|
|
|
|
|
MapScanResult<Object, Object> res = ((RedissonMap<?, ?>) map).scanIterator(name, client, startPos, pattern, count);
|
|
|
|
|
Map<HashValue, MapEntry> newstate = new HashMap<HashValue, MapEntry>(state);
|
|
|
|
|
for (Iterator<Object> iterator = res.getMap().keySet().iterator(); iterator.hasNext();) {
|
|
|
|
|
Object entry = iterator.next();
|
|
|
|
@ -519,7 +519,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
if (entry == MapEntry.NULL) {
|
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
|
} else {
|
|
|
|
|
ByteBuf valueState = ((RedissonObject)map).encodeMapValue(entry.getValue());
|
|
|
|
|
ByteBuf valueState = ((RedissonObject) map).encodeMapValue(entry.getValue());
|
|
|
|
|
try {
|
|
|
|
|
return RedissonPromise.newSucceededFuture(valueState.readableBytes());
|
|
|
|
|
} finally {
|
|
|
|
@ -538,10 +538,10 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
if (entry == MapEntry.NULL) {
|
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
|
} else {
|
|
|
|
|
return RedissonPromise.newSucceededFuture((V)entry.getValue());
|
|
|
|
|
return RedissonPromise.newSucceededFuture((V) entry.getValue());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return ((RedissonMap<K, V>)map).getOperationAsync(key);
|
|
|
|
|
return ((RedissonMap<K, V>) map).getOperationAsync(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<Set<K>> readAllKeySetAsync() {
|
|
|
|
@ -607,7 +607,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<Map<K, V>> readAllMapAsync() {
|
|
|
|
|
RPromise<Map<K, V>> result = new RedissonPromise<Map<K, V>>();
|
|
|
|
|
RPromise<Map<K, V>> result = new RedissonPromise<>();
|
|
|
|
|
RFuture<Map<K, V>> future = map.readAllMapAsync();
|
|
|
|
|
future.onComplete((map, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
@ -615,7 +615,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Map<HashValue, MapEntry> newstate = new HashMap<HashValue, MapEntry>(state);
|
|
|
|
|
Map<HashValue, MapEntry> newstate = new HashMap<>(state);
|
|
|
|
|
for (Iterator<K> iterator = map.keySet().iterator(); iterator.hasNext();) {
|
|
|
|
|
K key = iterator.next();
|
|
|
|
|
MapEntry entry = newstate.remove(toKeyHash(key));
|
|
|
|
@ -630,7 +630,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
if (entry == MapEntry.NULL) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
map.put((K)entry.getKey(), (V)entry.getValue());
|
|
|
|
|
map.put((K) entry.getKey(), (V) entry.getValue());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result.trySuccess(map);
|
|
|
|
@ -640,7 +640,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected RFuture<Map<K, V>> getAllOperationAsync(Set<K> keys) {
|
|
|
|
|
RPromise<Map<K, V>> result = new RedissonPromise<Map<K, V>>();
|
|
|
|
|
RPromise<Map<K, V>> result = new RedissonPromise<>();
|
|
|
|
|
Set<K> keysToLoad = new HashSet<K>(keys);
|
|
|
|
|
Map<K, V> map = new HashMap<K, V>();
|
|
|
|
|
for (K key : keys) {
|
|
|
|
@ -649,14 +649,14 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
MapEntry entry = state.get(keyHash);
|
|
|
|
|
if (entry != null) {
|
|
|
|
|
if (entry != MapEntry.NULL) {
|
|
|
|
|
map.put(key, (V)entry.getValue());
|
|
|
|
|
map.put(key, (V) entry.getValue());
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
keysToLoad.add(key);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RFuture<Map<K, V>> future = ((RedissonMap<K, V>)this.map).getAllOperationAsync(keysToLoad);
|
|
|
|
|
RFuture<Map<K, V>> future = ((RedissonMap<K, V>) this.map).getAllOperationAsync(keysToLoad);
|
|
|
|
|
future.onComplete((res, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
result.tryFailure(e);
|
|
|
|
@ -671,7 +671,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected RFuture<V> removeOperationAsync(K key) {
|
|
|
|
|
RPromise<V> result = new RedissonPromise<V>();
|
|
|
|
|
RPromise<V> result = new RedissonPromise<>();
|
|
|
|
|
executeLocked(result, key, new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
@ -706,8 +706,8 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
|
|
|
|
|
RPromise<Boolean> result = new RedissonPromise<Boolean>();
|
|
|
|
|
executeLocked(result, (K)key, new Runnable() {
|
|
|
|
|
RPromise<Boolean> result = new RedissonPromise<>();
|
|
|
|
|
executeLocked(result, (K) key, new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
HashValue keyHash = toKeyHash(key);
|
|
|
|
@ -729,7 +729,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
map.getAsync((K)key).onComplete((r, e) -> {
|
|
|
|
|
map.getAsync((K) key).onComplete((r, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
result.tryFailure(e);
|
|
|
|
|
return;
|
|
|
|
@ -747,8 +747,8 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean isEqual(Object value, Object oldValue) {
|
|
|
|
|
ByteBuf valueBuf = ((RedissonObject)map).encodeMapValue(value);
|
|
|
|
|
ByteBuf oldValueBuf = ((RedissonObject)map).encodeMapValue(oldValue);
|
|
|
|
|
ByteBuf valueBuf = ((RedissonObject) map).encodeMapValue(value);
|
|
|
|
|
ByteBuf oldValueBuf = ((RedissonObject) map).encodeMapValue(oldValue);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
return valueBuf.equals(oldValueBuf);
|
|
|
|
@ -759,7 +759,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected RFuture<Void> putAllOperationAsync(Map<? extends K, ? extends V> entries) {
|
|
|
|
|
RPromise<Void> result = new RedissonPromise<Void>();
|
|
|
|
|
RPromise<Void> result = new RedissonPromise<>();
|
|
|
|
|
executeLocked(result, new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
@ -775,12 +775,12 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
|
|
|
|
|
result.trySuccess(null);
|
|
|
|
|
}
|
|
|
|
|
}, (Collection<K>)entries.keySet());
|
|
|
|
|
}, (Collection<K>) entries.keySet());
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected RFuture<Boolean> replaceOperationAsync(K key, V oldValue, V newValue) {
|
|
|
|
|
RPromise<Boolean> result = new RedissonPromise<Boolean>();
|
|
|
|
|
RPromise<Boolean> result = new RedissonPromise<>();
|
|
|
|
|
executeLocked(result, key, new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
@ -822,7 +822,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected RFuture<V> replaceOperationAsync(K key, V value) {
|
|
|
|
|
RPromise<V> result = new RedissonPromise<V>();
|
|
|
|
|
RPromise<V> result = new RedissonPromise<>();
|
|
|
|
|
executeLocked(result, key, new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
@ -878,7 +878,7 @@ public class BaseTransactionalMap<K, V> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected <R> void executeLocked(RPromise<R> promise, Runnable runnable, Collection<K> keys) {
|
|
|
|
|
List<RLock> locks = new ArrayList<RLock>(keys.size());
|
|
|
|
|
List<RLock> locks = new ArrayList<>(keys.size());
|
|
|
|
|
for (K key : keys) {
|
|
|
|
|
RLock lock = getLock(key);
|
|
|
|
|
locks.add(lock);
|
|
|
|
|