Feature - getReadWriteLock, getCountDownLatch, getSemaphore, getPermitExpirableSemaphore, getFairLock methods added to RSet and RSetCache objects #1786

pull/1792/head
Nikita Koksharov 6 years ago
parent 9a5f2cfcd2
commit 711586151e

@ -23,8 +23,12 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RSet; import org.redisson.api.RSet;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder; import org.redisson.api.SortOrder;
@ -602,20 +606,50 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SORT_TO, params.toArray()); return commandExecutor.writeAsync(getName(), codec, RedisCommands.SORT_TO, params.toArray());
} }
public String getLockName(Object value) { public String getLockName(Object value, String suffix) {
ByteBuf state = encode(value); ByteBuf state = encode(value);
try { try {
return suffixName(getName(value), Hash.hash128toBase64(state) + ":lock"); return suffixName(getName(value), Hash.hash128toBase64(state) + ":" + suffix);
} finally { } finally {
state.release(); state.release();
} }
} }
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) {
String lockName = getLockName(value, "permitexpirablesemaphore");
return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RSemaphore getSemaphore(V value) {
String lockName = getLockName(value, "semaphore");
return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RCountDownLatch getCountDownLatch(V value) {
String lockName = getLockName(value, "countdownlatch");
return new RedissonCountDownLatch(commandExecutor, lockName);
}
@Override
public RLock getFairLock(V value) {
String lockName = getLockName(value, "fairlock");
return new RedissonFairLock(commandExecutor, lockName);
}
@Override @Override
public RLock getLock(V value) { public RLock getLock(V value) {
String lockName = getLockName(value); String lockName = getLockName(value, "lock");
return new RedissonLock(commandExecutor, lockName); return new RedissonLock(commandExecutor, lockName);
} }
@Override
public RReadWriteLock getReadWriteLock(V value) {
String lockName = getLockName(value, "rw_lock");
return new RedissonReadWriteLock(commandExecutor, lockName);
}
@Override @Override
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos,

@ -24,8 +24,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RSetCache; import org.redisson.api.RSetCache;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RCollectionMapReduce;
@ -366,20 +370,50 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
delete(); delete();
} }
public String getLockName(Object value) { public String getLockName(Object value, String suffix) {
ByteBuf state = encode(value); ByteBuf state = encode(value);
try { try {
return suffixName(getName(value), Hash.hash128toBase64(state) + ":lock"); return suffixName(getName(value), Hash.hash128toBase64(state) + ":" + suffix);
} finally { } finally {
state.release(); state.release();
} }
} }
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) {
String lockName = getLockName(value, "permitexpirablesemaphore");
return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RSemaphore getSemaphore(V value) {
String lockName = getLockName(value, "semaphore");
return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RCountDownLatch getCountDownLatch(V value) {
String lockName = getLockName(value, "countdownlatch");
return new RedissonCountDownLatch(commandExecutor, lockName);
}
@Override
public RLock getFairLock(V value) {
String lockName = getLockName(value, "fairlock");
return new RedissonFairLock(commandExecutor, lockName);
}
@Override @Override
public RLock getLock(V value) { public RLock getLock(V value) {
String lockName = getLockName(value); String lockName = getLockName(value, "lock");
return new RedissonLock(commandExecutor, lockName); return new RedissonLock(commandExecutor, lockName);
} }
@Override
public RReadWriteLock getReadWriteLock(V value) {
String lockName = getLockName(value, "rw_lock");
return new RedissonReadWriteLock(commandExecutor, lockName);
}
@Override @Override
public void destroy() { public void destroy() {

@ -23,8 +23,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RSet; import org.redisson.api.RSet;
import org.redisson.api.SortOrder; import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RCollectionMapReduce;
@ -49,7 +53,8 @@ import org.redisson.command.CommandAsyncExecutor;
*/ */
public class RedissonSetMultimapValues<V> extends RedissonExpirable implements RSet<V> { public class RedissonSetMultimapValues<V> extends RedissonExpirable implements RSet<V> {
private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE); private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE);
private final RSet<V> set; private final RSet<V> set;
private final Object key; private final Object key;
@ -468,6 +473,31 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray()); Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
} }
@Override
public RCountDownLatch getCountDownLatch(V value) {
return set.getCountDownLatch(value);
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) {
return set.getPermitExpirableSemaphore(value);
}
@Override
public RSemaphore getSemaphore(V value) {
return set.getSemaphore(value);
}
@Override
public RLock getFairLock(V value) {
return set.getFairLock(value);
}
@Override
public RReadWriteLock getReadWriteLock(V value) {
return set.getReadWriteLock(value);
}
@Override @Override
public RLock getLock(V value) { public RLock getLock(V value) {
return set.getLock(value); return set.getLock(value);

@ -29,11 +29,51 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
*/ */
public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V>, RSortable<Set<V>> { public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V>, RSortable<Set<V>> {
/**
* Returns <code>RCountDownLatch</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RCountDownLatch object
*/
RCountDownLatch getCountDownLatch(V value);
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RPermitExpirableSemaphore object
*/
RPermitExpirableSemaphore getPermitExpirableSemaphore(V value);
/**
* Returns <code>RSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RSemaphore object
*/
RSemaphore getSemaphore(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLock getFairLock(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RReadWriteLock object
*/
RReadWriteLock getReadWriteLock(V value);
/** /**
* Returns lock instance associated with <code>value</code> * Returns lock instance associated with <code>value</code>
* *
* @param value - set value * @param value - set value
* @return lock * @return RLock object
*/ */
RLock getLock(V value); RLock getLock(V value);

@ -40,11 +40,51 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
*/ */
public interface RSetCache<V> extends Set<V>, RExpirable, RSetCacheAsync<V>, RDestroyable { public interface RSetCache<V> extends Set<V>, RExpirable, RSetCacheAsync<V>, RDestroyable {
/**
* Returns <code>RCountDownLatch</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RCountDownLatch object
*/
RCountDownLatch getCountDownLatch(V value);
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RPermitExpirableSemaphore object
*/
RPermitExpirableSemaphore getPermitExpirableSemaphore(V value);
/**
* Returns <code>RSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RSemaphore object
*/
RSemaphore getSemaphore(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLock getFairLock(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RReadWriteLock object
*/
RReadWriteLock getReadWriteLock(V value);
/** /**
* Returns lock instance associated with <code>value</code> * Returns lock instance associated with <code>value</code>
* *
* @param value - set value * @param value - set value
* @return lock * @return RLock object
*/ */
RLock getLock(V value); RLock getLock(V value);

@ -23,7 +23,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.RedissonSet; import org.redisson.RedissonSet;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.SortOrder; import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
@ -99,6 +104,36 @@ public class RedissonTransactionalSet<V> extends RedissonSet<V> {
checkState(); checkState();
return transactionalSet.scanIterator(name, client, startPos, pattern, count); return transactionalSet.scanIterator(name, client, startPos, pattern, count);
} }
@Override
public RLock getFairLock(V value) {
throw new UnsupportedOperationException("getFairLock method is not supported in transaction");
}
@Override
public RCountDownLatch getCountDownLatch(V value) {
throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction");
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) {
throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction");
}
@Override
public RSemaphore getSemaphore(V value) {
throw new UnsupportedOperationException("getSemaphore method is not supported in transaction");
}
@Override
public RLock getLock(V value) {
throw new UnsupportedOperationException("getLock method is not supported in transaction");
}
@Override
public RReadWriteLock getReadWriteLock(V value) {
throw new UnsupportedOperationException("getReadWriteLock method is not supported in transaction");
}
@Override @Override
public RFuture<Boolean> containsAsync(Object o) { public RFuture<Boolean> containsAsync(Object o) {

@ -77,7 +77,7 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
@Override @Override
protected RLock getLock(RCollectionAsync<V> set, V value) { protected RLock getLock(RCollectionAsync<V> set, V value) {
String lockName = ((RedissonSet<V>)set).getLockName(value); String lockName = ((RedissonSet<V>)set).getLockName(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
} }

@ -82,7 +82,7 @@ public class TransactionalSetCache<V> extends BaseTransactionalSet<V> {
@Override @Override
protected RLock getLock(RCollectionAsync<V> set, V value) { protected RLock getLock(RCollectionAsync<V> set, V value) {
String lockName = ((RedissonSetCache<V>)set).getLockName(value); String lockName = ((RedissonSetCache<V>)set).getLockName(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
} }

@ -40,12 +40,12 @@ public abstract class SetOperation extends TransactionalOperation {
} }
protected RLock getLock(RSetCache<?> setCache, CommandAsyncExecutor commandExecutor, Object value) { protected RLock getLock(RSetCache<?> setCache, CommandAsyncExecutor commandExecutor, Object value) {
String lockName = ((RedissonSetCache<?>)setCache).getLockName(value); String lockName = ((RedissonSetCache<?>)setCache).getLockName(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
} }
protected RLock getLock(RSet<?> setCache, CommandAsyncExecutor commandExecutor, Object value) { protected RLock getLock(RSet<?> setCache, CommandAsyncExecutor commandExecutor, Object value) {
String lockName = ((RedissonSet<?>)setCache).getLockName(value); String lockName = ((RedissonSet<?>)setCache).getLockName(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
} }

Loading…
Cancel
Save