Feature - RTransferQueue object added. #322

pull/2563/head
Nikita Koksharov 5 years ago
parent b4b1e3a052
commit 8b382013f2

@ -19,68 +19,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.api.BatchOptions;
import org.redisson.api.ClusterNodesGroup;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.MapOptions;
import org.redisson.api.Node;
import org.redisson.api.NodesGroup;
import org.redisson.api.RAtomicDouble;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RBatch;
import org.redisson.api.RBinaryStream;
import org.redisson.api.RBitSet;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RBoundedBlockingQueue;
import org.redisson.api.RBucket;
import org.redisson.api.RBuckets;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RDeque;
import org.redisson.api.RDoubleAdder;
import org.redisson.api.RGeo;
import org.redisson.api.RHyperLogLog;
import org.redisson.api.RKeys;
import org.redisson.api.RLexSortedSet;
import org.redisson.api.RList;
import org.redisson.api.RListMultimap;
import org.redisson.api.RListMultimapCache;
import org.redisson.api.RLiveObjectService;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RLock;
import org.redisson.api.RLongAdder;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RPatternTopic;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RPriorityBlockingDeque;
import org.redisson.api.RPriorityBlockingQueue;
import org.redisson.api.RPriorityDeque;
import org.redisson.api.RPriorityQueue;
import org.redisson.api.RQueue;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RRemoteService;
import org.redisson.api.RRingBuffer;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RScript;
import org.redisson.api.RSemaphore;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetMultimap;
import org.redisson.api.RSetMultimapCache;
import org.redisson.api.RSortedSet;
import org.redisson.api.RStream;
import org.redisson.api.RTopic;
import org.redisson.api.RTransaction;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.TransactionOptions;
import org.redisson.api.*;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor;
import org.redisson.config.Config;
@ -545,6 +484,20 @@ public class Redisson implements RedissonClient {
return new RedissonQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RTransferQueue<V> getTransferQueue(String name) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
return new RedissonTransferQueue<V>(connectionManager.getCommandExecutor(), name, service);
}
@Override
public <V> RTransferQueue<V> getTransferQueue(String name, Codec codec) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
return new RedissonTransferQueue<V>(codec, connectionManager.getCommandExecutor(), name, service);
}
@Override
public <V> RRingBuffer<V> getRingBuffer(String name) {
return new RedissonRingBuffer<V>(connectionManager.getCommandExecutor(), name, this);

@ -26,24 +26,7 @@ import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.reactive.CommandReactiveService;
import org.redisson.reactive.ReactiveProxyBuilder;
import org.redisson.reactive.RedissonBatchReactive;
import org.redisson.reactive.RedissonBlockingDequeReactive;
import org.redisson.reactive.RedissonBlockingQueueReactive;
import org.redisson.reactive.RedissonKeysReactive;
import org.redisson.reactive.RedissonLexSortedSetReactive;
import org.redisson.reactive.RedissonListMultimapReactive;
import org.redisson.reactive.RedissonListReactive;
import org.redisson.reactive.RedissonMapCacheReactive;
import org.redisson.reactive.RedissonMapReactive;
import org.redisson.reactive.RedissonReadWriteLockReactive;
import org.redisson.reactive.RedissonScoredSortedSetReactive;
import org.redisson.reactive.RedissonSetCacheReactive;
import org.redisson.reactive.RedissonSetMultimapReactive;
import org.redisson.reactive.RedissonSetReactive;
import org.redisson.reactive.RedissonTopicReactive;
import org.redisson.reactive.RedissonTransactionReactive;
import org.redisson.reactive.*;
import org.redisson.remote.ResponseEntry;
/**
@ -523,6 +506,24 @@ public class RedissonReactive implements RedissonReactiveClient {
new RedissonBlockingDequeReactive<V>(deque), RBlockingDequeReactive.class);
}
@Override
public <V> RTransferQueueReactive<V> getTransferQueue(String name) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(connectionManager.getCommandExecutor(), name, service);
return ReactiveProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueReactive<V>(queue), RTransferQueueReactive.class);
}
@Override
public <V> RTransferQueueReactive<V> getTransferQueue(String name, Codec codec) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(codec, connectionManager.getCommandExecutor(), name, service);
return ReactiveProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueReactive<V>(queue), RTransferQueueReactive.class);
}
@Override
public String getId() {
return commandExecutor.getConnectionManager().getId();

@ -24,26 +24,10 @@ import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.reactive.ReactiveProxyBuilder;
import org.redisson.reactive.RedissonTransferQueueReactive;
import org.redisson.remote.ResponseEntry;
import org.redisson.rx.CommandRxExecutor;
import org.redisson.rx.CommandRxService;
import org.redisson.rx.RedissonBatchRx;
import org.redisson.rx.RedissonBlockingDequeRx;
import org.redisson.rx.RedissonBlockingQueueRx;
import org.redisson.rx.RedissonKeysRx;
import org.redisson.rx.RedissonLexSortedSetRx;
import org.redisson.rx.RedissonListMultimapRx;
import org.redisson.rx.RedissonListRx;
import org.redisson.rx.RedissonMapCacheRx;
import org.redisson.rx.RedissonMapRx;
import org.redisson.rx.RedissonReadWriteLockRx;
import org.redisson.rx.RedissonScoredSortedSetRx;
import org.redisson.rx.RedissonSetCacheRx;
import org.redisson.rx.RedissonSetMultimapRx;
import org.redisson.rx.RedissonSetRx;
import org.redisson.rx.RedissonTopicRx;
import org.redisson.rx.RedissonTransactionRx;
import org.redisson.rx.RxProxyBuilder;
import org.redisson.rx.*;
/**
* Main infrastructure class allows to get access
@ -502,6 +486,24 @@ public class RedissonRx implements RedissonRxClient {
new RedissonBlockingDequeRx<V>(deque), RBlockingDequeRx.class);
}
@Override
public <V> RTransferQueueRx<V> getTransferQueue(String name) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(connectionManager.getCommandExecutor(), name, service);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueRx<V>(queue), RTransferQueueRx.class);
}
@Override
public <V> RTransferQueueRx<V> getTransferQueue(String name, Codec codec) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(codec, connectionManager.getCommandExecutor(), name, service);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueRx<V>(queue), RTransferQueueRx.class);
}
@Override
public String getId() {
return commandExecutor.getConnectionManager().getId();

@ -0,0 +1,756 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RTransferQueue;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.executor.RemotePromise;
import org.redisson.iterator.RedissonListIterator;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RemoteServiceRequest;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonTransferQueue<V> extends RedissonExpirable implements RTransferQueue<V> {
public interface TransferQueueService {
<V> void invoke(V value);
}
@RRemoteAsync(TransferQueueService.class)
public interface TransferQueueServiceAsync {
<V> RFuture<Void> invoke(V value);
}
public static class TransferQueueServiceImpl implements TransferQueueService {
private Object result;
@Override
public <V> void invoke(V value) {
result = value;
}
public Object getResult() {
return result;
}
}
private static final Convertor<Object> CONVERTER = obj -> {
if (obj != null) {
RemoteServiceRequest request = (RemoteServiceRequest) obj;
return request.getArgs()[0];
}
return null;
};
private static final RedisStrictCommand<Object> EVAL_REQUEST = new RedisStrictCommand<>("EVAL", CONVERTER);
private static final RedisCommand EVAL_LIST = new RedisCommand("EVAL", new ObjectListReplayDecoder<>(), CONVERTER);
private final String queueName;
private final String mapName;
private final TransferQueueServiceAsync service;
private final RRemoteService remoteService;
public RedissonTransferQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RRemoteService remoteService) {
super(codec, commandExecutor, name);
service = remoteService.get(TransferQueueServiceAsync.class, RemoteInvocationOptions.defaults().noAck());
this.remoteService = remoteService;
queueName = ((RedissonRemoteService) remoteService).getRequestQueueName(TransferQueueService.class);
mapName = ((RedissonRemoteService) remoteService).getRequestTasksMapName(TransferQueueService.class);
}
public RedissonTransferQueue(CommandAsyncExecutor commandExecutor, String name, RRemoteService remoteService) {
super(commandExecutor, name);
service = remoteService.get(TransferQueueServiceAsync.class, RemoteInvocationOptions.defaults().noAck());
this.remoteService = remoteService;
queueName = ((RedissonRemoteService) remoteService).getRequestQueueName(TransferQueueService.class);
mapName = ((RedissonRemoteService) remoteService).getRequestTasksMapName(TransferQueueService.class);
}
@Override
public boolean tryTransfer(V v) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
boolean added = get(future.getAddFuture());
if (added && !future.cancel(false)) {
get(future);
return true;
}
return false;
}
public RFuture<Boolean> tryTransferAsync(V v) {
RPromise<Boolean> result = new RedissonPromise<>();
result.setUncancellable();
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
future.getAddFuture().onComplete((added, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (!added) {
result.trySuccess(false);
return;
}
future.cancelAsync(false).onComplete((canceled, ex) -> {
if (ex != null) {
result.tryFailure(ex);
return;
}
if (canceled) {
result.trySuccess(false);
} else {
future.onComplete((res, exc) -> {
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(true);
});
}
});
});
return result;
}
@Override
public void transfer(V v) throws InterruptedException {
RFuture<Void> future = service.invoke(v);
commandExecutor.getInterrupted(future);
}
@Override
public RFuture<Void> transferAsync(V v) {
return service.invoke(v);
}
@Override
public boolean tryTransfer(V v, long timeout, TimeUnit unit) throws InterruptedException {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
long remainTime = unit.toMillis(timeout);
long startTime = System.currentTimeMillis();
if (!future.getAddFuture().await(remainTime, TimeUnit.MILLISECONDS)) {
if (!future.getAddFuture().cancel(false)) {
if (!future.cancel(false)) {
commandExecutor.getInterrupted(future);
return true;
}
}
return false;
}
remainTime -= System.currentTimeMillis() - startTime;
if (!future.await(remainTime)) {
if (!future.cancel(false)) {
commandExecutor.getInterrupted(future);
return true;
}
return false;
}
commandExecutor.getInterrupted(future);
return true;
}
public RFuture<Boolean> tryTransferAsync(V v, long timeout, TimeUnit unit) {
RPromise<Boolean> result = new RedissonPromise<>();
result.setUncancellable();
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
long remainTime = unit.toMillis(timeout);
long startTime = System.currentTimeMillis();
Timeout timeoutFuture = commandExecutor.getConnectionManager().newTimeout(tt -> {
if (!future.getAddFuture().cancel(false)) {
future.cancelAsync(false);
}
}, remainTime, TimeUnit.MILLISECONDS);
future.onComplete((res, exc) -> {
if (future.isCancelled()) {
result.trySuccess(false);
return;
}
timeoutFuture.cancel();
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(true);
});
future.getAddFuture().onComplete((added, e) -> {
if (future.getAddFuture().isCancelled()) {
result.trySuccess(false);
return;
}
if (e != null) {
timeoutFuture.cancel();
result.tryFailure(e);
return;
}
if (!added) {
timeoutFuture.cancel();
result.trySuccess(false);
return;
}
Runnable task = () -> {
future.cancelAsync(false).onComplete((canceled, ex) -> {
if (ex != null) {
timeoutFuture.cancel();
result.tryFailure(ex);
return;
}
if (canceled) {
timeoutFuture.cancel();
result.trySuccess(false);
}
});
};
long time = remainTime - (System.currentTimeMillis() - startTime);
if (time > 0) {
commandExecutor.getConnectionManager().newTimeout(tt -> {
task.run();
}, time, TimeUnit.MILLISECONDS);
} else {
task.run();
}
});
return result;
}
@Override
public boolean hasWaitingConsumer() {
throw new UnsupportedOperationException();
}
@Override
public int getWaitingConsumerCount() {
throw new UnsupportedOperationException();
}
@Override
public boolean add(V v) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
return get(future.getAddFuture());
}
public RFuture<Boolean> addAsync(V v) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(v);
return future.getAddFuture();
}
@Override
public boolean offer(V v) {
return add(v);
}
@Override
public V remove() {
V value = poll();
if (value == null) {
throw new NoSuchElementException();
}
return value;
}
@Override
public V poll() {
TransferQueueServiceImpl s = new TransferQueueServiceImpl();
RFuture<Boolean> r = remoteService.tryExecuteAsync(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, -1, null);
get(r);
return (V) s.getResult();
}
public RFuture<V> pollAsync() {
TransferQueueServiceImpl s = new TransferQueueServiceImpl();
RFuture<Boolean> future = remoteService.tryExecuteAsync(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, -1, null);
RPromise<V> result = new RedissonPromise<>();
result.setUncancellable();
future.onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess((V) s.getResult());
});
return result;
}
@Override
public V element() {
V value = peek();
if (value == null) {
throw new NoSuchElementException();
}
return value;
}
@Override
public V peek() {
return get(peekAsync());
}
public RFuture<V> peekAsync() {
return commandExecutor.evalReadAsync(queueName, codec, EVAL_REQUEST,
"local id = redis.call('lindex', KEYS[1], 0); "
+ "if id ~= false then "
+ "return redis.call('hget', KEYS[2], id); "
+ "end "
+ "return nil;",
Arrays.asList(queueName, mapName));
}
@Override
public void put(V v) throws InterruptedException {
add(v);
}
@Override
public boolean offer(V v, long timeout, TimeUnit unit) throws InterruptedException {
return add(v);
}
@Override
public V take() throws InterruptedException {
return poll(0, TimeUnit.MILLISECONDS);
}
public RFuture<V> takeAsync() {
return pollAsync(0, TimeUnit.MILLISECONDS);
}
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
TransferQueueServiceImpl s = new TransferQueueServiceImpl();
remoteService.tryExecute(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, timeout, unit);
return (V) s.getResult();
}
public RFuture<V> pollAsync(long timeout, TimeUnit unit) {
RPromise<V> result = new RedissonPromise<>();
result.setUncancellable();
TransferQueueServiceImpl s = new TransferQueueServiceImpl();
RFuture<Boolean> future = remoteService.tryExecuteAsync(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, timeout, unit);
future.onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess((V) s.getResult());
});
return result;
}
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(Collection<?> c) {
if (c.isEmpty()) {
return true;
}
boolean all = true;
for (Object obj : c) {
all &= contains(obj);
}
return all;
}
@Override
public boolean addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return false;
}
boolean added = false;
for (V obj : c) {
added |= add(obj);
}
return added;
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
RedissonKeys keys = new RedissonKeys(commandExecutor);
keys.delete(queueName, mapName);
}
public RFuture<Void> clearAsync() {
RPromise<Void> result = new RedissonPromise<>();
result.setUncancellable();
RedissonKeys keys = new RedissonKeys(commandExecutor);
keys.deleteAsync(queueName, mapName).onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess(null);
});
return result;
}
@Override
public int size() {
return remoteService.getPendingInvocations(TransferQueueService.class);
}
public RFuture<Integer> sizeAsync() {
return remoteService.getPendingInvocationsAsync(TransferQueueService.class);
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public boolean contains(Object o) {
ByteBuf encodedObject = encode(o);
boolean result = stream().anyMatch(v -> {
ByteBuf encodedValue = encode(v);
boolean res = encodedValue.equals(encodedObject);
encodedValue.release();
return res;
});
encodedObject.release();
return result;
}
public RFuture<V> getValueAsync(int index) {
return commandExecutor.evalReadAsync(queueName, codec, EVAL_REQUEST,
"local id = redis.call('lindex', KEYS[1], ARGV[1]); "
+ "if id ~= false then "
+ "return redis.call('hget', KEYS[2], id); "
+ "end "
+ "return nil;",
Arrays.asList(queueName, mapName), index);
}
@Override
public Iterator<V> iterator() {
return new RedissonListIterator<V>(0) {
@Override
public V getValue(int index) {
RFuture<V> future = getValueAsync(index);
return get(future);
}
@Override
public V remove(int index) {
if (index == 0) {
RFuture<V> future = commandExecutor.evalWriteAsync(queueName, codec, EVAL_REQUEST,
"local id = redis.call('lpop', KEYS[1]); "
+ "if id ~= false then "
+ "return redis.call('hget', KEYS[2], id); "
+ "end "
+ "return nil;",
Arrays.asList(queueName, mapName));
return get(future);
}
RFuture<V> future = commandExecutor.evalWriteAsync(queueName, codec, EVAL_REQUEST,
"local id = redis.call('lindex', KEYS[1], ARGV[1]); " +
"if id ~= false then " +
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" +
"local val = redis.call('hget', KEYS[2], id); " +
"redis.call('hdel', KEYS[2], id); " +
"return val; " +
"end; " +
"return nil;",
Arrays.asList(queueName, mapName), index);
return get(future);
}
@Override
public void fastSet(int index, V value) {
throw new UnsupportedOperationException();
}
@Override
public void add(int index, V value) {
throw new UnsupportedOperationException();
}
};
}
@Override
public Object[] toArray() {
List<V> list = readAll();
return list.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
List<V> list = readAll();
return list.toArray(a);
}
@Override
public int drainTo(Collection<? super V> c) {
return get(drainToAsync(c));
}
public RFuture<Integer> drainToAsync(Collection<? super V> c) {
if (c == null) {
throw new NullPointerException();
}
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c), CONVERTER),
"local ids = redis.call('lrange', KEYS[1], 0, -1); " +
"local result = {};"
+ "for i=1, #ids, 5000 do "
+ "local vals = redis.call('hmget', KEYS[2], unpack(ids, i, math.min(i+4999, #ids))); "
+ "for k,v in ipairs(vals) do "
+ "table.insert(result, v); "
+ "end; "
+ "end; " +
"redis.call('del', KEYS[1], KEYS[2]); " +
"return result",
Arrays.asList(queueName, mapName));
}
@Override
public int drainTo(Collection<? super V> c, int maxElements) {
if (maxElements <= 0) {
return 0;
}
return get(drainToAsync(c, maxElements));
}
public RFuture<Integer> drainToAsync(Collection<? super V> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c), CONVERTER),
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local ids = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +
"local result = {};"
+ "for i=1, #ids, 5000 do "
+ "local vals = redis.call('hmget', KEYS[2], unpack(ids, i, math.min(i+4999, #ids))); "
+ "redis.call('hdel', KEYS[2], unpack(ids, i, math.min(i+4999, #ids)));"
+ "for k,v in ipairs(vals) do "
+ "table.insert(result, v); "
+ "end; "
+ "end; " +
"return result",
Arrays.asList(queueName, mapName), maxElements);
}
@Override
public List<V> readAll() {
return get(readAllAsync());
}
public RFuture<List<V>> readAllAsync() {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_LIST,
"local ids = redis.call('lrange', KEYS[1], 0, -1); " +
"local result = {};"
+ "for i=1, #ids, 5000 do "
+ "local vals = redis.call('hmget', KEYS[2], unpack(ids, i, math.min(i+4999, #ids))); "
+ "for k,v in ipairs(vals) do "
+ "table.insert(result, v); "
+ "end; "
+ "end; " +
"return result;",
Arrays.asList(queueName, mapName));
}
@Override
public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public int subscribeOnElements(Consumer<V> consumer) {
return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer);
}
@Override
public void unsubscribe(int listenerId) {
commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId);
}
@Override
public V pollLastAndOfferFirstTo(String queueName) {
throw new UnsupportedOperationException();
}
@Override
public List<V> poll(int limit) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Void> putAsync(V value) {
RemotePromise<Void> future = (RemotePromise<Void>) service.invoke(value);
RPromise<Void> result = new RedissonPromise<>();
future.getAddFuture().onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess(null);
});
return result;
}
@Override
public RFuture<Boolean> offerAsync(V e) {
return addAsync(e);
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<List<V>> pollAsync(int limit) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> retainAllAsync(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> containsAsync(Object o) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> containsAllAsync(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> removeAsync(Object o) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> addAllAsync(Collection<? extends V> c) {
throw new UnsupportedOperationException();
}
}

@ -0,0 +1,36 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TransferQueue;
/**
* Redis based implementation of {@link java.util.concurrent.TransferQueue}
*
* @author Nikita Koksharov
*
*/
public interface RTransferQueue<V> extends TransferQueue<V>, RBlockingQueue<V>, RTransferQueueAsync<V> {
/**
* Returns all queue elements at once
*
* @return elements
*/
List<V> readAll();
}

@ -0,0 +1,76 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Async interface for Redis based implementation of {@link java.util.concurrent.TransferQueue}
*
* @author Nikita Koksharov
*
*/
public interface RTransferQueueAsync<V> extends RBlockingQueueAsync<V> {
/**
* Tries to transfer the element to waiting consumer
* which invoked {@link #takeAsync} or {@link #pollAsync} method
* at the moment of transfer.
*
* @param e element to transfer
* @return {@code true} if element was transferred, otherwise
* {@code false}
*/
RFuture<Boolean> tryTransferAsync(V e);
/**
* Transfers the element to waiting consumer
* which invoked {@link #takeAsync} or {@link #pollAsync} method
* at the moment of transfer.
* Waits if necessary for a consumer.
*
* @param e the element to transfer
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
RFuture<Void> transferAsync(V e);
/**
* Transfers the element to waiting consumer
* which invoked {@link #takeAsync} or {@link #pollAsync} method
* at the moment of transfer.
* Waits up to defined <code>timeout</code> if necessary for a consumer.
*
* @param e the element to transfer
* @param timeout the maximum time to wait
* @param unit the time unit
* @return <code>true</code> if the element was transferred and <code>false</code>
* otherwise
*/
RFuture<Boolean> tryTransferAsync(V e, long timeout, TimeUnit unit);
/**
* Returns all queue elements at once
*
* @return elements
*/
List<V> readAll();
}

@ -0,0 +1,70 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import reactor.core.publisher.Mono;
import java.util.concurrent.TimeUnit;
/**
* Reactive interface of Redis based implementation of {@link java.util.concurrent.TransferQueue}
*
* @author Nikita Koksharov
* @param <V> the type of elements held in this collection
*/
public interface RTransferQueueReactive<V> extends RBlockingQueueReactive<V> {
/**
* Tries to transfer the element to waiting consumer
* which invoked {@link #take} or {@link #poll} method
* at the moment of transfer.
*
* @param e element to transfer
* @return {@code true} if element was transferred, otherwise
* {@code false}
*/
Mono<Boolean> tryTransfer(V e);
/**
* Transfers the element to waiting consumer
* which invoked {@link #take} or {@link #poll} method
* at the moment of transfer.
* Waits if necessary for a consumer.
*
* @param e the element to transfer
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
Mono<Void> transfer(V e);
/**
* Transfers the element to waiting consumer
* which invoked {@link #take} or {@link #poll} method
* at the moment of transfer.
* Waits up to defined <code>timeout</code> if necessary for a consumer.
*
* @param e the element to transfer
* @param timeout the maximum time to wait
* @param unit the time unit
* @return <code>true</code> if the element was transferred and <code>false</code>
* otherwise
*/
Mono<Boolean> tryTransfer(V e, long timeout, TimeUnit unit);
}

@ -0,0 +1,71 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import io.reactivex.Completable;
import io.reactivex.Single;
import java.util.concurrent.TimeUnit;
/**
* RxJava2 interface of Redis based implementation of {@link java.util.concurrent.TransferQueue}
*
* @author Nikita Koksharov
* @param <V> the type of elements held in this collection
*/
public interface RTransferQueueRx<V> extends RBlockingQueueRx<V> {
/**
* Tries to transfer the element to waiting consumer
* which invoked {@link #take} or {@link #poll} method
* at the moment of transfer.
*
* @param e element to transfer
* @return {@code true} if element was transferred, otherwise
* {@code false}
*/
Single<Boolean> tryTransfer(V e);
/**
* Transfers the element to waiting consumer
* which invoked {@link #take} or {@link #poll} method
* at the moment of transfer.
* Waits if necessary for a consumer.
*
* @param e the element to transfer
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
Completable transfer(V e);
/**
* Transfers the element to waiting consumer
* which invoked {@link #take} or {@link #poll} method
* at the moment of transfer.
* Waits up to defined <code>timeout</code> if necessary for a consumer.
*
* @param e the element to transfer
* @param timeout the maximum time to wait
* @param unit the time unit
* @return <code>true</code> if the element was transferred and <code>false</code>
* otherwise
*/
Single<Boolean> tryTransfer(V e, long timeout, TimeUnit unit);
}

@ -613,6 +613,26 @@ public interface RedissonClient {
*/
<V> RQueue<V> getQueue(String name);
/**
* Returns transfer queue instance by name.
*
* @param <V> type of values
* @param name - name of object
* @return TransferQueue object
*/
<V> RTransferQueue<V> getTransferQueue(String name);
/**
* Returns transfer queue instance by name
* using provided codec for queue objects.
*
* @param <V> type of values
* @param name - name of object
* @param codec - code for values
* @return TransferQueue object
*/
<V> RTransferQueue<V> getTransferQueue(String name, Codec codec);
/**
* Returns unbounded delayed queue instance by name.
* <p>

@ -572,6 +572,26 @@ public interface RedissonReactiveClient {
*/
<V> RBlockingDequeReactive<V> getBlockingDeque(String name, Codec codec);
/**
* Returns transfer queue instance by name.
*
* @param <V> type of values
* @param name - name of object
* @return TransferQueue object
*/
<V> RTransferQueueReactive<V> getTransferQueue(String name);
/**
* Returns transfer queue instance by name
* using provided codec for queue objects.
*
* @param <V> type of values
* @param name - name of object
* @param codec - code for values
* @return TransferQueue object
*/
<V> RTransferQueueReactive<V> getTransferQueue(String name, Codec codec);
/**
* Returns deque instance by name.
*

@ -560,6 +560,26 @@ public interface RedissonRxClient {
*/
<V> RBlockingDequeRx<V> getBlockingDeque(String name, Codec codec);
/**
* Returns transfer queue instance by name.
*
* @param <V> type of values
* @param name - name of object
* @return TransferQueue object
*/
<V> RTransferQueueRx<V> getTransferQueue(String name);
/**
* Returns transfer queue instance by name
* using provided codec for queue objects.
*
* @param <V> type of values
* @param name - name of object
* @param codec - code for values
* @return TransferQueue object
*/
<V> RTransferQueueRx<V> getTransferQueue(String name, Codec codec);
/**
* Returns deque instance by name.
*

@ -45,8 +45,12 @@ public class RemotePromise<T> extends RedissonPromise<T> {
return addFuture;
}
public void doCancel() {
super.cancel(true);
public void doCancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
}
public RFuture<Boolean> cancelAsync(boolean mayInterruptIfRunning) {
return RemotePromise.newSucceededFuture(false);
}
}

@ -0,0 +1,105 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.RedissonTransferQueue;
import org.redisson.api.RFuture;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
/**
*
* @author Nikita Koksharov
*
* @param <V> - value type
*/
public class RedissonTransferQueueReactive<V> {
private final RedissonTransferQueue<V> queue;
public RedissonTransferQueueReactive(RedissonTransferQueue<V> queue) {
this.queue = queue;
}
public Flux<V> takeElements() {
return ElementsStream.takeElements(queue::takeAsync);
}
public Publisher<V> iterator() {
return Flux.create(new Consumer<FluxSink<V>>() {
@Override
public void accept(FluxSink<V> emitter) {
emitter.onRequest(new LongConsumer() {
int currentIndex = 0;
@Override
public void accept(long value) {
onRequest(true, emitter, value);
}
protected void onRequest(boolean forward, FluxSink<V> emitter, long n) {
queue.getValueAsync(currentIndex).onComplete((value, e) -> {
if (e != null) {
emitter.error(e);
return;
}
if (value != null) {
emitter.next(value);
if (forward) {
currentIndex++;
} else {
currentIndex--;
}
}
if (value == null) {
emitter.complete();
return;
}
if (n-1 == 0) {
return;
}
onRequest(forward, emitter, n-1);
});
}
});
}
});
}
public Publisher<Boolean> addAll(Publisher<? extends V> c) {
return new PublisherAdder<V>() {
@Override
public RFuture<Boolean> add(Object o) {
return queue.addAsync((V) o);
}
}.addAll(c);
}
}

@ -15,15 +15,6 @@
*/
package org.redisson.remote;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.RedissonBucket;
import org.redisson.RedissonList;
import org.redisson.RedissonMap;
@ -40,6 +31,16 @@ import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
*
@ -240,7 +241,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
if (res instanceof RemoteServiceCancelResponse) {
result.doCancel();
result.doCancel(true);
return;
}
@ -292,7 +293,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
return true;
}
return doCancel(mayInterruptIfRunning);
return executeCancel(mayInterruptIfRunning);
}
boolean removed = commandExecutor.get(remoteService.removeAsync(requestQueueName, requestId));
@ -301,10 +302,10 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
return true;
}
return doCancel(mayInterruptIfRunning);
return executeCancel(mayInterruptIfRunning);
}
private boolean doCancel(boolean mayInterruptIfRunning) {
private boolean executeCancel(boolean mayInterruptIfRunning) {
if (isCancelled()) {
return true;
}
@ -313,7 +314,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
return false;
}
cancelExecution(optionsCopy, mayInterruptIfRunning, this);
cancelExecution(optionsCopy, mayInterruptIfRunning, this, cancelRequestMapName);
try {
awaitUninterruptibly(60, TimeUnit.SECONDS);
@ -322,12 +323,107 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
return isCancelled();
}
@Override
public RFuture<Boolean> cancelAsync(boolean mayInterruptIfRunning) {
return AsyncRemoteProxy.this.cancelAsync(optionsCopy, this, requestId, requestQueueName, ackTimeout, mayInterruptIfRunning);
}
};
return result;
}
private RFuture<Boolean> cancelAsync(RemoteInvocationOptions optionsCopy, RemotePromise<Object> promise,
RequestId requestId, String requestQueueName, Long ackTimeout, boolean mayInterruptIfRunning) {
if (promise.isCancelled()) {
return RedissonPromise.newSucceededFuture(true);
}
if (promise.isDone()) {
return RedissonPromise.newSucceededFuture(false);
}
RPromise<Boolean> result = new RedissonPromise<>();
if (optionsCopy.isAckExpected()) {
String ackName = remoteService.getAckName(requestId);
RFuture<Boolean> future = commandExecutor.evalWriteAsync(responseQueueName, LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
// + "redis.call('lrem', KEYS[3], 1, ARGV[1]);"
// + "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object> asList(ackName),
// Arrays.<Object> asList(ackName, responseQueueName, requestQueueName),
ackTimeout);
future.onComplete((ackNotSent, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (ackNotSent) {
RList<Object> list = new RedissonList<>(LongCodec.INSTANCE, commandExecutor, requestQueueName, null);
RFuture<Boolean> f = list.removeAsync(requestId.toString());
f.onComplete((res, ex) -> {
if (ex != null) {
result.tryFailure(ex);
return;
}
promise.doCancel(mayInterruptIfRunning);
result.trySuccess(true);
});
}
doCancelAsync(mayInterruptIfRunning, result, promise, optionsCopy);
});
return result;
}
RFuture<Boolean> removeFuture = remoteService.removeAsync(requestQueueName, requestId);
removeFuture.onComplete((removed, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (removed) {
promise.doCancel(mayInterruptIfRunning);
}
doCancelAsync(mayInterruptIfRunning, result, promise, optionsCopy);
});
return result;
}
private void doCancelAsync(boolean mayInterruptIfRunning, RPromise<Boolean> result, RemotePromise<Object> promise, RemoteInvocationOptions optionsCopy) {
if (promise.isCancelled()) {
result.trySuccess(true);
return;
}
if (promise.isDone()) {
result.trySuccess(false);
return;
}
cancelExecution(optionsCopy, mayInterruptIfRunning, promise, cancelRequestMapName);
promise.onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess(promise.isCancelled());
});
}
private void cancelExecution(RemoteInvocationOptions optionsCopy,
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise) {
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise, String cancelRequestMapName) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = new RedissonMap<>(new CompositeCodec(StringCodec.INSTANCE, codec, codec), commandExecutor, cancelRequestMapName, null, null, null);
canceledRequests.fastPutAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);

@ -0,0 +1,93 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
import org.reactivestreams.Publisher;
import org.redisson.RedissonTransferQueue;
import org.redisson.api.RFuture;
/**
*
* @author Nikita Koksharov
*
* @param <V> - value type
*/
public class RedissonTransferQueueRx<V> {
private final RedissonTransferQueue<V> queue;
public RedissonTransferQueueRx(RedissonTransferQueue<V> queue) {
this.queue = queue;
}
public Flowable<V> takeElements() {
return ElementsStream.takeElements(queue::takeAsync);
}
public Publisher<V> iterator() {
ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
private int currentIndex = 0;
@Override
public void accept(long n) throws Exception {
queue.getValueAsync(currentIndex).onComplete((value, e) -> {
if (e != null) {
p.onError(e);
return;
}
if (value != null) {
p.onNext(value);
currentIndex++;
}
if (value == null) {
p.onComplete();
return;
}
if (n-1 == 0) {
return;
}
try {
accept(n-1);
} catch (Exception e1) {
e1.printStackTrace();
}
});
}
});
}
public Single<Boolean> addAll(Publisher<? extends V> c) {
return new PublisherAdder<V>() {
@Override
public RFuture<Boolean> add(Object o) {
return queue.addAsync((V) o);
}
}.addAll(c);
}
}

@ -0,0 +1,167 @@
package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RBlockingQueueReactive;
import org.redisson.api.RTransferQueue;
import org.redisson.api.RTransferQueueReactive;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.shouldHaveThrown;
import static org.junit.Assert.assertTrue;
public class RedissonTransferQueueReactiveTest extends BaseReactiveTest {
@Test
public void testTakeElements() {
RBlockingQueueReactive<Integer> queue = redisson.getTransferQueue("test");
List<Integer> elements = new ArrayList<>();
queue.takeElements().subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(4);
}
@Override
public void onNext(Integer t) {
elements.add(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
for (int i = 0; i < 10; i++) {
sync(queue.add(i));
}
assertThat(elements).containsExactly(0, 1, 2, 3);
}
@Test
public void testTake() throws InterruptedException {
RBlockingQueueReactive<Integer> queue1 = redisson.getTransferQueue("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingQueueReactive<Integer> queue = redisson.getTransferQueue("queue:take");
sync(queue.put(3));
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = sync(queue1.take());
Assert.assertEquals(3, l);
Assert.assertTrue(System.currentTimeMillis() - s > 9000);
}
@Test
public void testPoll() throws InterruptedException {
RBlockingQueueReactive<Integer> queue1 = redisson.getTransferQueue("queue1");
sync(queue1.put(1));
Assert.assertEquals((Integer)1, sync(queue1.poll(2, TimeUnit.SECONDS)));
long s = System.currentTimeMillis();
Assert.assertNull(sync(queue1.poll(5, TimeUnit.SECONDS)));
Assert.assertTrue(System.currentTimeMillis() - s > 5000);
}
@Test
public void testAwait() throws InterruptedException {
RBlockingQueueReactive<Integer> queue1 = redisson.getTransferQueue("queue1");
sync(queue1.put(1));
Assert.assertEquals((Integer)1, sync(queue1.poll(10, TimeUnit.SECONDS)));
}
@Test
public void testDrainTo() {
RBlockingQueueReactive<Integer> queue = redisson.getTransferQueue("queue");
for (int i = 0 ; i < 100; i++) {
sync(queue.offer(i));
}
Assert.assertEquals(100, sync(queue.size()).intValue());
Set<Integer> batch = new HashSet<Integer>();
int count = sync(queue.drainTo(batch, 10));
Assert.assertEquals(10, count);
Assert.assertEquals(10, batch.size());
Assert.assertEquals(90, sync(queue.size()).intValue());
sync(queue.drainTo(batch, 10));
sync(queue.drainTo(batch, 20));
sync(queue.drainTo(batch, 60));
Assert.assertEquals(0, sync(queue.size()).intValue());
}
@Test
public void testBlockingQueue() throws InterruptedException {
RTransferQueueReactive<Integer> queue = redisson.getTransferQueue("test_:blocking:queue:");
ExecutorService executor = Executors.newFixedThreadPool(10);
final AtomicInteger counter = new AtomicInteger();
int total = 100;
for (int i = 0; i < total; i++) {
// runnable won't be executed in any particular order, and hence, int value as well.
executor.submit(() -> {
sync(redisson.getTransferQueue("test_:blocking:queue:").add(counter.incrementAndGet()));
});
}
executor.shutdown();
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
int count = 0;
while (count < total) {
int item = sync(queue.take());
assertTrue(item > 0 && item <= total);
count++;
}
assertThat(counter.get()).isEqualTo(total);
}
@Test
public void testDrainToCollection() throws Exception {
RBlockingQueueReactive<Object> queue1 = redisson.getTransferQueue("queue1");
sync(queue1.put(1));
sync(queue1.put(2L));
sync(queue1.put("e"));
ArrayList<Object> dst = new ArrayList<Object>();
sync(queue1.drainTo(dst));
assertThat(dst).containsExactly(1, 2L, "e");
Assert.assertEquals(0, sync(queue1.size()).intValue());
}
@Test
public void testDrainToCollectionLimited() throws Exception {
RBlockingQueueReactive<Object> queue1 = redisson.getTransferQueue("queue1");
sync(queue1.put(1));
sync(queue1.put(2L));
sync(queue1.put("e"));
ArrayList<Object> dst = new ArrayList<Object>();
sync(queue1.drainTo(dst, 2));
assertThat(dst).containsExactly(1, 2L);
Assert.assertEquals(1, sync(queue1.size()).intValue());
dst.clear();
sync(queue1.drainTo(dst, 2));
assertThat(dst).containsExactly("e");
}
}

@ -0,0 +1,244 @@
package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RTransferQueue;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Nikita Koksharov
*/
public class RedissonTransferQueueTest extends BaseTest {
@Test
public void testTryTransferWithDelay() throws InterruptedException, ExecutionException {
RTransferQueue<Integer> queue1 = redisson.getTransferQueue("queue");
Future<?> f = Executors.newSingleThreadExecutor().submit(() -> {
RTransferQueue<Integer> queue = redisson.getTransferQueue("queue");
try {
long time = System.currentTimeMillis();
boolean res = queue.tryTransfer(3, 2, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - time).isGreaterThan(1900);
assertThat(res).isFalse();
Thread.sleep(1000);
boolean res2 = queue.tryTransfer(4, 1, TimeUnit.SECONDS);
assertThat(res2).isTrue();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
}
});
Thread.sleep(2100);
assertThat(queue1.size()).isZero();
Thread.sleep(1100);
assertThat(queue1.size()).isEqualTo(1);
assertThat(queue1.peek()).isEqualTo(4);
assertThat(queue1.poll()).isEqualTo(4);
f.get();
assertThat(queue1.size()).isZero();
assertThat(queue1.peek()).isNull();
}
@Test
public void testTransfer() throws InterruptedException, ExecutionException {
RTransferQueue<Integer> queue1 = redisson.getTransferQueue("queue");
AtomicBoolean takeExecuted = new AtomicBoolean();
Future<?> f = Executors.newSingleThreadExecutor().submit(() -> {
RTransferQueue<Integer> queue = redisson.getTransferQueue("queue");
try {
long time = System.currentTimeMillis();
queue.transfer(3);
assertThat(takeExecuted.get()).isTrue();
assertThat(System.currentTimeMillis() - time).isGreaterThan(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread.sleep(3000);
assertThat(queue1.size()).isEqualTo(1);
assertThat(queue1.peek()).isEqualTo(3);
assertThat(queue1.take()).isEqualTo(3);
takeExecuted.set(true);
f.get();
assertThat(queue1.size()).isZero();
assertThat(queue1.peek()).isNull();
}
@Test
public void testTryTransfer() throws InterruptedException, ExecutionException {
RTransferQueue<Integer> queue1 = redisson.getTransferQueue("queue");
AtomicBoolean takeExecuted = new AtomicBoolean();
ScheduledFuture<?> f = Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RTransferQueue<Integer> queue = redisson.getTransferQueue("queue");
boolean res = queue.tryTransfer(3);
assertThat(takeExecuted.get()).isTrue();
assertThat(res).isTrue();
boolean res2 = queue.tryTransfer(4);
assertThat(res2).isFalse();
}, 4, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.take();
takeExecuted.set(true);
Assert.assertEquals(3, l);
Assert.assertTrue(System.currentTimeMillis() - s > 3900);
f.get();
assertThat(queue1.size()).isZero();
assertThat(queue1.peek()).isNull();
}
@Test
public void testTake() throws InterruptedException {
RTransferQueue<Integer> queue1 = redisson.getTransferQueue("queue");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RTransferQueue<Integer> queue = redisson.getTransferQueue("queue");
try {
queue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.take();
Assert.assertEquals(3, l);
Assert.assertTrue(System.currentTimeMillis() - s > 9000);
}
@Test
public void testPoll() throws InterruptedException {
RTransferQueue<Integer> queue1 = redisson.getTransferQueue("queue");
queue1.put(1);
Assert.assertEquals((Integer)1, queue1.poll(2, TimeUnit.SECONDS));
long s = System.currentTimeMillis();
Assert.assertNull(queue1.poll(5, TimeUnit.SECONDS));
Assert.assertTrue(System.currentTimeMillis() - s > 4900);
}
@Test
public void testPeek() {
RTransferQueue<String> queue = redisson.getTransferQueue("queue");
assertThat(queue.peek()).isNull();
queue.add("1");
assertThat(queue.size()).isEqualTo(1);
assertThat(queue.peek()).isEqualTo("1");
}
@Test
public void testReadAll() {
RTransferQueue<String> queue = redisson.getTransferQueue("queue");
queue.add("1");
queue.add("2");
queue.add("3");
queue.add("4");
assertThat(queue.readAll()).containsExactly("1", "2", "3", "4");
}
@Test
public void testDrainTo() {
RTransferQueue<Integer> queue = redisson.getTransferQueue("queue");
for (int i = 0 ; i < 100; i++) {
queue.offer(i);
}
Assert.assertEquals(100, queue.size());
Set<Integer> batch = new HashSet<>();
int count = queue.drainTo(batch, 10);
Assert.assertEquals(10, count);
Assert.assertEquals(10, batch.size());
Assert.assertEquals(90, queue.size());
queue.drainTo(batch, 10);
queue.drainTo(batch, 20);
queue.drainTo(batch, 60);
Assert.assertEquals(0, queue.size());
}
@Test
public void testDrainToSingle() {
RTransferQueue<Integer> queue = redisson.getTransferQueue("queue");
Assert.assertTrue(queue.add(1));
Assert.assertEquals(1, queue.size());
Set<Integer> batch = new HashSet<>();
int count = queue.drainTo(batch);
Assert.assertEquals(1, count);
Assert.assertEquals(1, batch.size());
Assert.assertTrue(queue.isEmpty());
}
@Test
public void testClear() throws ExecutionException, InterruptedException {
RTransferQueue<String> queue = redisson.getTransferQueue("queue");
queue.add("1");
queue.add("4");
queue.add("2");
queue.add("5");
queue.add("3");
ScheduledFuture<?> f = Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RTransferQueue<Integer> queue1 = redisson.getTransferQueue("queue");
try {
queue1.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, TimeUnit.SECONDS);
f.get();
queue.clear();
assertThat(redisson.getKeys().count()).isZero();
}
@Test
public void testIteratorRemove() {
RTransferQueue<String> queue = redisson.getTransferQueue("queue");
queue.add("1");
queue.add("4");
queue.add("2");
queue.add("5");
queue.add("3");
for (Iterator<String> iterator = queue.iterator(); iterator.hasNext();) {
String value = iterator.next();
if (value.equals("2")) {
iterator.remove();
}
}
assertThat(queue).containsExactly("1", "4", "5", "3");
int iteration = 0;
for (Iterator<String> iterator = queue.iterator(); iterator.hasNext();) {
iterator.next();
iterator.remove();
iteration++;
}
Assert.assertEquals(4, iteration);
Assert.assertEquals(0, queue.size());
Assert.assertTrue(queue.isEmpty());
}
}
Loading…
Cancel
Save