diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index dd8edd91d..602dfcdd8 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -19,68 +19,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import org.redisson.api.BatchOptions; -import org.redisson.api.ClusterNodesGroup; -import org.redisson.api.ExecutorOptions; -import org.redisson.api.LocalCachedMapOptions; -import org.redisson.api.MapOptions; -import org.redisson.api.Node; -import org.redisson.api.NodesGroup; -import org.redisson.api.RAtomicDouble; -import org.redisson.api.RAtomicLong; -import org.redisson.api.RBatch; -import org.redisson.api.RBinaryStream; -import org.redisson.api.RBitSet; -import org.redisson.api.RBlockingDeque; -import org.redisson.api.RBlockingQueue; -import org.redisson.api.RBloomFilter; -import org.redisson.api.RBoundedBlockingQueue; -import org.redisson.api.RBucket; -import org.redisson.api.RBuckets; -import org.redisson.api.RCountDownLatch; -import org.redisson.api.RDelayedQueue; -import org.redisson.api.RDeque; -import org.redisson.api.RDoubleAdder; -import org.redisson.api.RGeo; -import org.redisson.api.RHyperLogLog; -import org.redisson.api.RKeys; -import org.redisson.api.RLexSortedSet; -import org.redisson.api.RList; -import org.redisson.api.RListMultimap; -import org.redisson.api.RListMultimapCache; -import org.redisson.api.RLiveObjectService; -import org.redisson.api.RLocalCachedMap; -import org.redisson.api.RLock; -import org.redisson.api.RLongAdder; -import org.redisson.api.RMap; -import org.redisson.api.RMapCache; -import org.redisson.api.RPatternTopic; -import org.redisson.api.RPermitExpirableSemaphore; -import org.redisson.api.RPriorityBlockingDeque; -import org.redisson.api.RPriorityBlockingQueue; -import org.redisson.api.RPriorityDeque; -import org.redisson.api.RPriorityQueue; -import org.redisson.api.RQueue; -import org.redisson.api.RRateLimiter; -import org.redisson.api.RReadWriteLock; -import org.redisson.api.RRemoteService; -import org.redisson.api.RRingBuffer; -import org.redisson.api.RScheduledExecutorService; -import org.redisson.api.RScoredSortedSet; -import org.redisson.api.RScript; -import org.redisson.api.RSemaphore; -import org.redisson.api.RSet; -import org.redisson.api.RSetCache; -import org.redisson.api.RSetMultimap; -import org.redisson.api.RSetMultimapCache; -import org.redisson.api.RSortedSet; -import org.redisson.api.RStream; -import org.redisson.api.RTopic; -import org.redisson.api.RTransaction; -import org.redisson.api.RedissonClient; -import org.redisson.api.RedissonReactiveClient; -import org.redisson.api.RedissonRxClient; -import org.redisson.api.TransactionOptions; +import org.redisson.api.*; import org.redisson.client.codec.Codec; import org.redisson.command.CommandExecutor; import org.redisson.config.Config; @@ -545,6 +484,20 @@ public class Redisson implements RedissonClient { return new RedissonQueue(codec, connectionManager.getCommandExecutor(), name, this); } + @Override + public RTransferQueue getTransferQueue(String name) { + String remoteName = RedissonObject.suffixName(name, "remoteService"); + RRemoteService service = getRemoteService(remoteName); + return new RedissonTransferQueue(connectionManager.getCommandExecutor(), name, service); + } + + @Override + public RTransferQueue getTransferQueue(String name, Codec codec) { + String remoteName = RedissonObject.suffixName(name, "remoteService"); + RRemoteService service = getRemoteService(remoteName); + return new RedissonTransferQueue(codec, connectionManager.getCommandExecutor(), name, service); + } + @Override public RRingBuffer getRingBuffer(String name) { return new RedissonRingBuffer(connectionManager.getCommandExecutor(), name, this); diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index d954be752..832ca1ff9 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -26,24 +26,7 @@ import org.redisson.config.Config; import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; -import org.redisson.reactive.CommandReactiveService; -import org.redisson.reactive.ReactiveProxyBuilder; -import org.redisson.reactive.RedissonBatchReactive; -import org.redisson.reactive.RedissonBlockingDequeReactive; -import org.redisson.reactive.RedissonBlockingQueueReactive; -import org.redisson.reactive.RedissonKeysReactive; -import org.redisson.reactive.RedissonLexSortedSetReactive; -import org.redisson.reactive.RedissonListMultimapReactive; -import org.redisson.reactive.RedissonListReactive; -import org.redisson.reactive.RedissonMapCacheReactive; -import org.redisson.reactive.RedissonMapReactive; -import org.redisson.reactive.RedissonReadWriteLockReactive; -import org.redisson.reactive.RedissonScoredSortedSetReactive; -import org.redisson.reactive.RedissonSetCacheReactive; -import org.redisson.reactive.RedissonSetMultimapReactive; -import org.redisson.reactive.RedissonSetReactive; -import org.redisson.reactive.RedissonTopicReactive; -import org.redisson.reactive.RedissonTransactionReactive; +import org.redisson.reactive.*; import org.redisson.remote.ResponseEntry; /** @@ -523,6 +506,24 @@ public class RedissonReactive implements RedissonReactiveClient { new RedissonBlockingDequeReactive(deque), RBlockingDequeReactive.class); } + @Override + public RTransferQueueReactive getTransferQueue(String name) { + String remoteName = RedissonObject.suffixName(name, "remoteService"); + RRemoteService service = getRemoteService(remoteName); + RedissonTransferQueue queue = new RedissonTransferQueue(connectionManager.getCommandExecutor(), name, service); + return ReactiveProxyBuilder.create(commandExecutor, queue, + new RedissonTransferQueueReactive(queue), RTransferQueueReactive.class); + } + + @Override + public RTransferQueueReactive getTransferQueue(String name, Codec codec) { + String remoteName = RedissonObject.suffixName(name, "remoteService"); + RRemoteService service = getRemoteService(remoteName); + RedissonTransferQueue queue = new RedissonTransferQueue(codec, connectionManager.getCommandExecutor(), name, service); + return ReactiveProxyBuilder.create(commandExecutor, queue, + new RedissonTransferQueueReactive(queue), RTransferQueueReactive.class); + } + @Override public String getId() { return commandExecutor.getConnectionManager().getId(); diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 9a7dcb382..db29b9ffc 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -24,26 +24,10 @@ import org.redisson.config.Config; import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; +import org.redisson.reactive.ReactiveProxyBuilder; +import org.redisson.reactive.RedissonTransferQueueReactive; import org.redisson.remote.ResponseEntry; -import org.redisson.rx.CommandRxExecutor; -import org.redisson.rx.CommandRxService; -import org.redisson.rx.RedissonBatchRx; -import org.redisson.rx.RedissonBlockingDequeRx; -import org.redisson.rx.RedissonBlockingQueueRx; -import org.redisson.rx.RedissonKeysRx; -import org.redisson.rx.RedissonLexSortedSetRx; -import org.redisson.rx.RedissonListMultimapRx; -import org.redisson.rx.RedissonListRx; -import org.redisson.rx.RedissonMapCacheRx; -import org.redisson.rx.RedissonMapRx; -import org.redisson.rx.RedissonReadWriteLockRx; -import org.redisson.rx.RedissonScoredSortedSetRx; -import org.redisson.rx.RedissonSetCacheRx; -import org.redisson.rx.RedissonSetMultimapRx; -import org.redisson.rx.RedissonSetRx; -import org.redisson.rx.RedissonTopicRx; -import org.redisson.rx.RedissonTransactionRx; -import org.redisson.rx.RxProxyBuilder; +import org.redisson.rx.*; /** * Main infrastructure class allows to get access @@ -502,6 +486,24 @@ public class RedissonRx implements RedissonRxClient { new RedissonBlockingDequeRx(deque), RBlockingDequeRx.class); } + @Override + public RTransferQueueRx getTransferQueue(String name) { + String remoteName = RedissonObject.suffixName(name, "remoteService"); + RRemoteService service = getRemoteService(remoteName); + RedissonTransferQueue queue = new RedissonTransferQueue(connectionManager.getCommandExecutor(), name, service); + return RxProxyBuilder.create(commandExecutor, queue, + new RedissonTransferQueueRx(queue), RTransferQueueRx.class); + } + + @Override + public RTransferQueueRx getTransferQueue(String name, Codec codec) { + String remoteName = RedissonObject.suffixName(name, "remoteService"); + RRemoteService service = getRemoteService(remoteName); + RedissonTransferQueue queue = new RedissonTransferQueue(codec, connectionManager.getCommandExecutor(), name, service); + return RxProxyBuilder.create(commandExecutor, queue, + new RedissonTransferQueueRx(queue), RTransferQueueRx.class); + } + @Override public String getId() { return commandExecutor.getConnectionManager().getId(); diff --git a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java new file mode 100644 index 000000000..dd7711f3d --- /dev/null +++ b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java @@ -0,0 +1,756 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import io.netty.buffer.ByteBuf; +import io.netty.util.Timeout; +import io.netty.util.concurrent.ImmediateEventExecutor; +import org.redisson.api.RFuture; +import org.redisson.api.RRemoteService; +import org.redisson.api.RTransferQueue; +import org.redisson.api.RemoteInvocationOptions; +import org.redisson.api.annotation.RRemoteAsync; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.client.protocol.convertor.Convertor; +import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.connection.decoder.ListDrainToDecoder; +import org.redisson.executor.RemotePromise; +import org.redisson.iterator.RedissonListIterator; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; +import org.redisson.remote.RemoteServiceRequest; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonTransferQueue extends RedissonExpirable implements RTransferQueue { + + public interface TransferQueueService { + + void invoke(V value); + + } + + @RRemoteAsync(TransferQueueService.class) + public interface TransferQueueServiceAsync { + + RFuture invoke(V value); + + } + + public static class TransferQueueServiceImpl implements TransferQueueService { + + private Object result; + + @Override + public void invoke(V value) { + result = value; + } + + public Object getResult() { + return result; + } + + } + + private static final Convertor CONVERTER = obj -> { + if (obj != null) { + RemoteServiceRequest request = (RemoteServiceRequest) obj; + return request.getArgs()[0]; + } + return null; + }; + + private static final RedisStrictCommand EVAL_REQUEST = new RedisStrictCommand<>("EVAL", CONVERTER); + private static final RedisCommand EVAL_LIST = new RedisCommand("EVAL", new ObjectListReplayDecoder<>(), CONVERTER); + + private final String queueName; + private final String mapName; + private final TransferQueueServiceAsync service; + private final RRemoteService remoteService; + + public RedissonTransferQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RRemoteService remoteService) { + super(codec, commandExecutor, name); + service = remoteService.get(TransferQueueServiceAsync.class, RemoteInvocationOptions.defaults().noAck()); + this.remoteService = remoteService; + + queueName = ((RedissonRemoteService) remoteService).getRequestQueueName(TransferQueueService.class); + mapName = ((RedissonRemoteService) remoteService).getRequestTasksMapName(TransferQueueService.class); + } + + public RedissonTransferQueue(CommandAsyncExecutor commandExecutor, String name, RRemoteService remoteService) { + super(commandExecutor, name); + service = remoteService.get(TransferQueueServiceAsync.class, RemoteInvocationOptions.defaults().noAck()); + this.remoteService = remoteService; + + queueName = ((RedissonRemoteService) remoteService).getRequestQueueName(TransferQueueService.class); + mapName = ((RedissonRemoteService) remoteService).getRequestTasksMapName(TransferQueueService.class); + } + + @Override + public boolean tryTransfer(V v) { + RemotePromise future = (RemotePromise) service.invoke(v); + boolean added = get(future.getAddFuture()); + if (added && !future.cancel(false)) { + get(future); + return true; + } + return false; + } + + public RFuture tryTransferAsync(V v) { + RPromise result = new RedissonPromise<>(); + result.setUncancellable(); + + RemotePromise future = (RemotePromise) service.invoke(v); + future.getAddFuture().onComplete((added, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + if (!added) { + result.trySuccess(false); + return; + } + + future.cancelAsync(false).onComplete((canceled, ex) -> { + if (ex != null) { + result.tryFailure(ex); + return; + } + + if (canceled) { + result.trySuccess(false); + } else { + future.onComplete((res, exc) -> { + if (exc != null) { + result.tryFailure(exc); + return; + } + + result.trySuccess(true); + }); + } + }); + }); + + return result; + } + + @Override + public void transfer(V v) throws InterruptedException { + RFuture future = service.invoke(v); + commandExecutor.getInterrupted(future); + } + + @Override + public RFuture transferAsync(V v) { + return service.invoke(v); + } + + @Override + public boolean tryTransfer(V v, long timeout, TimeUnit unit) throws InterruptedException { + RemotePromise future = (RemotePromise) service.invoke(v); + long remainTime = unit.toMillis(timeout); + long startTime = System.currentTimeMillis(); + if (!future.getAddFuture().await(remainTime, TimeUnit.MILLISECONDS)) { + if (!future.getAddFuture().cancel(false)) { + if (!future.cancel(false)) { + commandExecutor.getInterrupted(future); + return true; + } + } + return false; + } + remainTime -= System.currentTimeMillis() - startTime; + + if (!future.await(remainTime)) { + if (!future.cancel(false)) { + commandExecutor.getInterrupted(future); + return true; + } + return false; + } + commandExecutor.getInterrupted(future); + return true; + } + + public RFuture tryTransferAsync(V v, long timeout, TimeUnit unit) { + RPromise result = new RedissonPromise<>(); + result.setUncancellable(); + + RemotePromise future = (RemotePromise) service.invoke(v); + + long remainTime = unit.toMillis(timeout); + long startTime = System.currentTimeMillis(); + + Timeout timeoutFuture = commandExecutor.getConnectionManager().newTimeout(tt -> { + if (!future.getAddFuture().cancel(false)) { + future.cancelAsync(false); + } + }, remainTime, TimeUnit.MILLISECONDS); + + + future.onComplete((res, exc) -> { + if (future.isCancelled()) { + result.trySuccess(false); + return; + } + + timeoutFuture.cancel(); + if (exc != null) { + result.tryFailure(exc); + return; + } + + result.trySuccess(true); + }); + + future.getAddFuture().onComplete((added, e) -> { + if (future.getAddFuture().isCancelled()) { + result.trySuccess(false); + return; + } + + if (e != null) { + timeoutFuture.cancel(); + result.tryFailure(e); + return; + } + + if (!added) { + timeoutFuture.cancel(); + result.trySuccess(false); + return; + } + + Runnable task = () -> { + future.cancelAsync(false).onComplete((canceled, ex) -> { + if (ex != null) { + timeoutFuture.cancel(); + result.tryFailure(ex); + return; + } + + if (canceled) { + timeoutFuture.cancel(); + result.trySuccess(false); + } + }); + }; + + long time = remainTime - (System.currentTimeMillis() - startTime); + if (time > 0) { + commandExecutor.getConnectionManager().newTimeout(tt -> { + task.run(); + }, time, TimeUnit.MILLISECONDS); + } else { + task.run(); + } + }); + return result; + } + + @Override + public boolean hasWaitingConsumer() { + throw new UnsupportedOperationException(); + } + + @Override + public int getWaitingConsumerCount() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(V v) { + RemotePromise future = (RemotePromise) service.invoke(v); + return get(future.getAddFuture()); + } + + public RFuture addAsync(V v) { + RemotePromise future = (RemotePromise) service.invoke(v); + return future.getAddFuture(); + } + + @Override + public boolean offer(V v) { + return add(v); + } + + @Override + public V remove() { + V value = poll(); + if (value == null) { + throw new NoSuchElementException(); + } + return value; + } + + @Override + public V poll() { + TransferQueueServiceImpl s = new TransferQueueServiceImpl(); + RFuture r = remoteService.tryExecuteAsync(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, -1, null); + get(r); + return (V) s.getResult(); + } + + public RFuture pollAsync() { + TransferQueueServiceImpl s = new TransferQueueServiceImpl(); + RFuture future = remoteService.tryExecuteAsync(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, -1, null); + + RPromise result = new RedissonPromise<>(); + result.setUncancellable(); + + future.onComplete((r, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + result.trySuccess((V) s.getResult()); + }); + return result; + } + + + @Override + public V element() { + V value = peek(); + if (value == null) { + throw new NoSuchElementException(); + } + return value; + } + + @Override + public V peek() { + return get(peekAsync()); + } + + public RFuture peekAsync() { + return commandExecutor.evalReadAsync(queueName, codec, EVAL_REQUEST, + "local id = redis.call('lindex', KEYS[1], 0); " + + "if id ~= false then " + + "return redis.call('hget', KEYS[2], id); " + + "end " + + "return nil;", + Arrays.asList(queueName, mapName)); + } + + @Override + public void put(V v) throws InterruptedException { + add(v); + } + + @Override + public boolean offer(V v, long timeout, TimeUnit unit) throws InterruptedException { + return add(v); + } + + @Override + public V take() throws InterruptedException { + return poll(0, TimeUnit.MILLISECONDS); + } + + public RFuture takeAsync() { + return pollAsync(0, TimeUnit.MILLISECONDS); + } + + @Override + public V poll(long timeout, TimeUnit unit) throws InterruptedException { + TransferQueueServiceImpl s = new TransferQueueServiceImpl(); + remoteService.tryExecute(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, timeout, unit); + return (V) s.getResult(); + } + + public RFuture pollAsync(long timeout, TimeUnit unit) { + RPromise result = new RedissonPromise<>(); + result.setUncancellable(); + + TransferQueueServiceImpl s = new TransferQueueServiceImpl(); + RFuture future = remoteService.tryExecuteAsync(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, timeout, unit); + future.onComplete((r, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + result.trySuccess((V) s.getResult()); + }); + + return result; + } + + @Override + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + if (c.isEmpty()) { + return true; + } + + boolean all = true; + for (Object obj : c) { + all &= contains(obj); + } + return all; + } + + @Override + public boolean addAll(Collection c) { + if (c.isEmpty()) { + return false; + } + + boolean added = false; + for (V obj : c) { + added |= add(obj); + } + return added; + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + RedissonKeys keys = new RedissonKeys(commandExecutor); + keys.delete(queueName, mapName); + } + + public RFuture clearAsync() { + RPromise result = new RedissonPromise<>(); + result.setUncancellable(); + + RedissonKeys keys = new RedissonKeys(commandExecutor); + keys.deleteAsync(queueName, mapName).onComplete((r, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + result.trySuccess(null); + }); + return result; + } + + @Override + public int size() { + return remoteService.getPendingInvocations(TransferQueueService.class); + } + + public RFuture sizeAsync() { + return remoteService.getPendingInvocationsAsync(TransferQueueService.class); + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean contains(Object o) { + ByteBuf encodedObject = encode(o); + boolean result = stream().anyMatch(v -> { + ByteBuf encodedValue = encode(v); + boolean res = encodedValue.equals(encodedObject); + encodedValue.release(); + return res; + }); + encodedObject.release(); + return result; + } + + public RFuture getValueAsync(int index) { + return commandExecutor.evalReadAsync(queueName, codec, EVAL_REQUEST, + "local id = redis.call('lindex', KEYS[1], ARGV[1]); " + + "if id ~= false then " + + "return redis.call('hget', KEYS[2], id); " + + "end " + + "return nil;", + Arrays.asList(queueName, mapName), index); + } + + + @Override + public Iterator iterator() { + return new RedissonListIterator(0) { + + @Override + public V getValue(int index) { + RFuture future = getValueAsync(index); + return get(future); + } + + @Override + public V remove(int index) { + if (index == 0) { + RFuture future = commandExecutor.evalWriteAsync(queueName, codec, EVAL_REQUEST, + "local id = redis.call('lpop', KEYS[1]); " + + "if id ~= false then " + + "return redis.call('hget', KEYS[2], id); " + + "end " + + "return nil;", + Arrays.asList(queueName, mapName)); + + return get(future); + } + + RFuture future = commandExecutor.evalWriteAsync(queueName, codec, EVAL_REQUEST, + "local id = redis.call('lindex', KEYS[1], ARGV[1]); " + + "if id ~= false then " + + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" + + "local val = redis.call('hget', KEYS[2], id); " + + "redis.call('hdel', KEYS[2], id); " + + "return val; " + + "end; " + + "return nil;", + Arrays.asList(queueName, mapName), index); + + return get(future); + } + + @Override + public void fastSet(int index, V value) { + throw new UnsupportedOperationException(); + } + + @Override + public void add(int index, V value) { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public Object[] toArray() { + List list = readAll(); + return list.toArray(); + } + + @Override + public T[] toArray(T[] a) { + List list = readAll(); + return list.toArray(a); + } + + @Override + public int drainTo(Collection c) { + return get(drainToAsync(c)); + } + + public RFuture drainToAsync(Collection c) { + if (c == null) { + throw new NullPointerException(); + } + + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c), CONVERTER), + "local ids = redis.call('lrange', KEYS[1], 0, -1); " + + "local result = {};" + + "for i=1, #ids, 5000 do " + + "local vals = redis.call('hmget', KEYS[2], unpack(ids, i, math.min(i+4999, #ids))); " + + "for k,v in ipairs(vals) do " + + "table.insert(result, v); " + + "end; " + + "end; " + + "redis.call('del', KEYS[1], KEYS[2]); " + + "return result", + Arrays.asList(queueName, mapName)); + } + + @Override + public int drainTo(Collection c, int maxElements) { + if (maxElements <= 0) { + return 0; + } + + return get(drainToAsync(c, maxElements)); + } + + public RFuture drainToAsync(Collection c, int maxElements) { + if (c == null) { + throw new NullPointerException(); + } + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c), CONVERTER), + "local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" + + "local ids = redis.call('lrange', KEYS[1], 0, elemNum); " + + "redis.call('ltrim', KEYS[1], elemNum + 1, -1); " + + "local result = {};" + + "for i=1, #ids, 5000 do " + + "local vals = redis.call('hmget', KEYS[2], unpack(ids, i, math.min(i+4999, #ids))); " + + "redis.call('hdel', KEYS[2], unpack(ids, i, math.min(i+4999, #ids)));" + + "for k,v in ipairs(vals) do " + + "table.insert(result, v); " + + "end; " + + "end; " + + "return result", + Arrays.asList(queueName, mapName), maxElements); + } + + @Override + public List readAll() { + return get(readAllAsync()); + } + + public RFuture> readAllAsync() { + return commandExecutor.evalReadAsync(getName(), codec, EVAL_LIST, + "local ids = redis.call('lrange', KEYS[1], 0, -1); " + + "local result = {};" + + "for i=1, #ids, 5000 do " + + "local vals = redis.call('hmget', KEYS[2], unpack(ids, i, math.min(i+4999, #ids))); " + + "for k,v in ipairs(vals) do " + + "table.insert(result, v); " + + "end; " + + "end; " + + "return result;", + Arrays.asList(queueName, mapName)); + } + + @Override + public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public int subscribeOnElements(Consumer consumer) { + return commandExecutor.getConnectionManager().getElementsSubscribeService().subscribeOnElements(this::takeAsync, consumer); + } + + @Override + public void unsubscribe(int listenerId) { + commandExecutor.getConnectionManager().getElementsSubscribeService().unsubscribe(listenerId); + } + + @Override + public V pollLastAndOfferFirstTo(String queueName) { + throw new UnsupportedOperationException(); + } + + @Override + public List poll(int limit) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture takeLastAndOfferFirstToAsync(String queueName) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture putAsync(V value) { + RemotePromise future = (RemotePromise) service.invoke(value); + RPromise result = new RedissonPromise<>(); + future.getAddFuture().onComplete((r, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + result.trySuccess(null); + }); + return result; + } + + @Override + public RFuture offerAsync(V e) { + return addAsync(e); + } + + @Override + public RFuture pollLastAndOfferFirstToAsync(String queueName) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture> pollAsync(int limit) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture retainAllAsync(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture removeAllAsync(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture containsAsync(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture containsAllAsync(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture removeAsync(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture addAllAsync(Collection c) { + throw new UnsupportedOperationException(); + } +} diff --git a/redisson/src/main/java/org/redisson/api/RTransferQueue.java b/redisson/src/main/java/org/redisson/api/RTransferQueue.java new file mode 100644 index 000000000..a03219c7d --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RTransferQueue.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.List; +import java.util.concurrent.TransferQueue; + +/** + * Redis based implementation of {@link java.util.concurrent.TransferQueue} + * + * @author Nikita Koksharov + * + */ +public interface RTransferQueue extends TransferQueue, RBlockingQueue, RTransferQueueAsync { + + /** + * Returns all queue elements at once + * + * @return elements + */ + List readAll(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RTransferQueueAsync.java b/redisson/src/main/java/org/redisson/api/RTransferQueueAsync.java new file mode 100644 index 000000000..d218476b2 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RTransferQueueAsync.java @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Async interface for Redis based implementation of {@link java.util.concurrent.TransferQueue} + * + * @author Nikita Koksharov + * + */ +public interface RTransferQueueAsync extends RBlockingQueueAsync { + + /** + * Tries to transfer the element to waiting consumer + * which invoked {@link #takeAsync} or {@link #pollAsync} method + * at the moment of transfer. + * + * @param e element to transfer + * @return {@code true} if element was transferred, otherwise + * {@code false} + */ + RFuture tryTransferAsync(V e); + + /** + * Transfers the element to waiting consumer + * which invoked {@link #takeAsync} or {@link #pollAsync} method + * at the moment of transfer. + * Waits if necessary for a consumer. + * + * @param e the element to transfer + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + RFuture transferAsync(V e); + + /** + * Transfers the element to waiting consumer + * which invoked {@link #takeAsync} or {@link #pollAsync} method + * at the moment of transfer. + * Waits up to defined timeout if necessary for a consumer. + * + * @param e the element to transfer + * @param timeout the maximum time to wait + * @param unit the time unit + * @return true if the element was transferred and false + * otherwise + */ + RFuture tryTransferAsync(V e, long timeout, TimeUnit unit); + + /** + * Returns all queue elements at once + * + * @return elements + */ + List readAll(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RTransferQueueReactive.java b/redisson/src/main/java/org/redisson/api/RTransferQueueReactive.java new file mode 100644 index 000000000..17232941b --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RTransferQueueReactive.java @@ -0,0 +1,70 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import reactor.core.publisher.Mono; + +import java.util.concurrent.TimeUnit; + +/** + * Reactive interface of Redis based implementation of {@link java.util.concurrent.TransferQueue} + * + * @author Nikita Koksharov + * @param the type of elements held in this collection + */ +public interface RTransferQueueReactive extends RBlockingQueueReactive { + + /** + * Tries to transfer the element to waiting consumer + * which invoked {@link #take} or {@link #poll} method + * at the moment of transfer. + * + * @param e element to transfer + * @return {@code true} if element was transferred, otherwise + * {@code false} + */ + Mono tryTransfer(V e); + + /** + * Transfers the element to waiting consumer + * which invoked {@link #take} or {@link #poll} method + * at the moment of transfer. + * Waits if necessary for a consumer. + * + * @param e the element to transfer + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + Mono transfer(V e); + + /** + * Transfers the element to waiting consumer + * which invoked {@link #take} or {@link #poll} method + * at the moment of transfer. + * Waits up to defined timeout if necessary for a consumer. + * + * @param e the element to transfer + * @param timeout the maximum time to wait + * @param unit the time unit + * @return true if the element was transferred and false + * otherwise + */ + Mono tryTransfer(V e, long timeout, TimeUnit unit); + +} diff --git a/redisson/src/main/java/org/redisson/api/RTransferQueueRx.java b/redisson/src/main/java/org/redisson/api/RTransferQueueRx.java new file mode 100644 index 000000000..6b7fee387 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RTransferQueueRx.java @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import io.reactivex.Completable; +import io.reactivex.Single; + +import java.util.concurrent.TimeUnit; + +/** + * RxJava2 interface of Redis based implementation of {@link java.util.concurrent.TransferQueue} + * + * @author Nikita Koksharov + * @param the type of elements held in this collection + */ +public interface RTransferQueueRx extends RBlockingQueueRx { + + /** + * Tries to transfer the element to waiting consumer + * which invoked {@link #take} or {@link #poll} method + * at the moment of transfer. + * + * @param e element to transfer + * @return {@code true} if element was transferred, otherwise + * {@code false} + */ + Single tryTransfer(V e); + + /** + * Transfers the element to waiting consumer + * which invoked {@link #take} or {@link #poll} method + * at the moment of transfer. + * Waits if necessary for a consumer. + * + * @param e the element to transfer + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + Completable transfer(V e); + + /** + * Transfers the element to waiting consumer + * which invoked {@link #take} or {@link #poll} method + * at the moment of transfer. + * Waits up to defined timeout if necessary for a consumer. + * + * @param e the element to transfer + * @param timeout the maximum time to wait + * @param unit the time unit + * @return true if the element was transferred and false + * otherwise + */ + Single tryTransfer(V e, long timeout, TimeUnit unit); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index affc2a07c..27d414910 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -612,7 +612,27 @@ public interface RedissonClient { * @return queue object */ RQueue getQueue(String name); - + + /** + * Returns transfer queue instance by name. + * + * @param type of values + * @param name - name of object + * @return TransferQueue object + */ + RTransferQueue getTransferQueue(String name); + + /** + * Returns transfer queue instance by name + * using provided codec for queue objects. + * + * @param type of values + * @param name - name of object + * @param codec - code for values + * @return TransferQueue object + */ + RTransferQueue getTransferQueue(String name, Codec codec); + /** * Returns unbounded delayed queue instance by name. *

diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index 9c9bbf9c0..1e714be7f 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -571,7 +571,27 @@ public interface RedissonReactiveClient { * @return BlockingDeque object */ RBlockingDequeReactive getBlockingDeque(String name, Codec codec); - + + /** + * Returns transfer queue instance by name. + * + * @param type of values + * @param name - name of object + * @return TransferQueue object + */ + RTransferQueueReactive getTransferQueue(String name); + + /** + * Returns transfer queue instance by name + * using provided codec for queue objects. + * + * @param type of values + * @param name - name of object + * @param codec - code for values + * @return TransferQueue object + */ + RTransferQueueReactive getTransferQueue(String name, Codec codec); + /** * Returns deque instance by name. * diff --git a/redisson/src/main/java/org/redisson/api/RedissonRxClient.java b/redisson/src/main/java/org/redisson/api/RedissonRxClient.java index 4d438a603..318c6de89 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonRxClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonRxClient.java @@ -559,7 +559,27 @@ public interface RedissonRxClient { * @return BlockingDeque object */ RBlockingDequeRx getBlockingDeque(String name, Codec codec); - + + /** + * Returns transfer queue instance by name. + * + * @param type of values + * @param name - name of object + * @return TransferQueue object + */ + RTransferQueueRx getTransferQueue(String name); + + /** + * Returns transfer queue instance by name + * using provided codec for queue objects. + * + * @param type of values + * @param name - name of object + * @param codec - code for values + * @return TransferQueue object + */ + RTransferQueueRx getTransferQueue(String name, Codec codec); + /** * Returns deque instance by name. * diff --git a/redisson/src/main/java/org/redisson/executor/RemotePromise.java b/redisson/src/main/java/org/redisson/executor/RemotePromise.java index c336d5ea1..0cb68911e 100644 --- a/redisson/src/main/java/org/redisson/executor/RemotePromise.java +++ b/redisson/src/main/java/org/redisson/executor/RemotePromise.java @@ -45,8 +45,12 @@ public class RemotePromise extends RedissonPromise { return addFuture; } - public void doCancel() { - super.cancel(true); + public void doCancel(boolean mayInterruptIfRunning) { + super.cancel(mayInterruptIfRunning); + } + + public RFuture cancelAsync(boolean mayInterruptIfRunning) { + return RemotePromise.newSucceededFuture(false); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransferQueueReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransferQueueReactive.java new file mode 100644 index 000000000..b4f62702c --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransferQueueReactive.java @@ -0,0 +1,105 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonTransferQueue; +import org.redisson.api.RFuture; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +import java.util.concurrent.Callable; +import java.util.function.Consumer; +import java.util.function.LongConsumer; + +/** + * + * @author Nikita Koksharov + * + * @param - value type + */ +public class RedissonTransferQueueReactive { + + private final RedissonTransferQueue queue; + + public RedissonTransferQueueReactive(RedissonTransferQueue queue) { + this.queue = queue; + } + + public Flux takeElements() { + return ElementsStream.takeElements(queue::takeAsync); + } + + public Publisher iterator() { + return Flux.create(new Consumer>() { + + @Override + public void accept(FluxSink emitter) { + emitter.onRequest(new LongConsumer() { + + int currentIndex = 0; + + @Override + public void accept(long value) { + onRequest(true, emitter, value); + } + + protected void onRequest(boolean forward, FluxSink emitter, long n) { + queue.getValueAsync(currentIndex).onComplete((value, e) -> { + if (e != null) { + emitter.error(e); + return; + } + + if (value != null) { + emitter.next(value); + if (forward) { + currentIndex++; + } else { + currentIndex--; + } + } + + if (value == null) { + emitter.complete(); + return; + } + if (n-1 == 0) { + return; + } + onRequest(forward, emitter, n-1); + }); + } + }); + + } + + }); + } + + public Publisher addAll(Publisher c) { + return new PublisherAdder() { + + @Override + public RFuture add(Object o) { + return queue.addAsync((V) o); + } + + }.addAll(c); + } + + +} diff --git a/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java index 164e4d570..d4af22a34 100644 --- a/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java @@ -15,15 +15,6 @@ */ package org.redisson.remote; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - import org.redisson.RedissonBucket; import org.redisson.RedissonList; import org.redisson.RedissonMap; @@ -40,6 +31,16 @@ import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; /** * @@ -225,35 +226,35 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { if (!optionsCopy.isResultExpected()) { return; } - + responseFuture.onComplete((res, e) -> { if (e != null) { result.tryFailure(e); return; } - + if (res == null) { RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + result.getRequestId()); result.tryFailure(ex); return; } - + if (res instanceof RemoteServiceCancelResponse) { - result.doCancel(); + result.doCancel(true); return; } - + RemoteServiceResponse response = (RemoteServiceResponse) res; if (response.getError() != null) { result.tryFailure(response.getError()); return; } - + result.trySuccess(response.getResult()); }); } - + private RemotePromise createResultPromise(RemoteInvocationOptions optionsCopy, RequestId requestId, String requestQueueName, Long ackTimeout) { RemotePromise result = new RemotePromise(requestId) { @@ -292,7 +293,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { return true; } - return doCancel(mayInterruptIfRunning); + return executeCancel(mayInterruptIfRunning); } boolean removed = commandExecutor.get(remoteService.removeAsync(requestQueueName, requestId)); @@ -301,10 +302,10 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { return true; } - return doCancel(mayInterruptIfRunning); + return executeCancel(mayInterruptIfRunning); } - private boolean doCancel(boolean mayInterruptIfRunning) { + private boolean executeCancel(boolean mayInterruptIfRunning) { if (isCancelled()) { return true; } @@ -313,7 +314,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { return false; } - cancelExecution(optionsCopy, mayInterruptIfRunning, this); + cancelExecution(optionsCopy, mayInterruptIfRunning, this, cancelRequestMapName); try { awaitUninterruptibly(60, TimeUnit.SECONDS); @@ -322,12 +323,107 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { } return isCancelled(); } + + @Override + public RFuture cancelAsync(boolean mayInterruptIfRunning) { + return AsyncRemoteProxy.this.cancelAsync(optionsCopy, this, requestId, requestQueueName, ackTimeout, mayInterruptIfRunning); + } }; return result; } - + + private RFuture cancelAsync(RemoteInvocationOptions optionsCopy, RemotePromise promise, + RequestId requestId, String requestQueueName, Long ackTimeout, boolean mayInterruptIfRunning) { + if (promise.isCancelled()) { + return RedissonPromise.newSucceededFuture(true); + } + + if (promise.isDone()) { + return RedissonPromise.newSucceededFuture(false); + } + + RPromise result = new RedissonPromise<>(); + if (optionsCopy.isAckExpected()) { + String ackName = remoteService.getAckName(requestId); + RFuture future = commandExecutor.evalWriteAsync(responseQueueName, LongCodec.INSTANCE, + RedisCommands.EVAL_BOOLEAN, + "if redis.call('setnx', KEYS[1], 1) == 1 then " + + "redis.call('pexpire', KEYS[1], ARGV[1]);" +// + "redis.call('lrem', KEYS[3], 1, ARGV[1]);" +// + "redis.call('pexpire', KEYS[2], ARGV[2]);" + + "return 1;" + + "end;" + + "return 0;", + Arrays. asList(ackName), +// Arrays. asList(ackName, responseQueueName, requestQueueName), + ackTimeout); + + future.onComplete((ackNotSent, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + if (ackNotSent) { + RList list = new RedissonList<>(LongCodec.INSTANCE, commandExecutor, requestQueueName, null); + RFuture f = list.removeAsync(requestId.toString()); + f.onComplete((res, ex) -> { + if (ex != null) { + result.tryFailure(ex); + return; + } + + promise.doCancel(mayInterruptIfRunning); + result.trySuccess(true); + }); + } + + doCancelAsync(mayInterruptIfRunning, result, promise, optionsCopy); + }); + return result; + } + + RFuture removeFuture = remoteService.removeAsync(requestQueueName, requestId); + removeFuture.onComplete((removed, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + if (removed) { + promise.doCancel(mayInterruptIfRunning); + } + + doCancelAsync(mayInterruptIfRunning, result, promise, optionsCopy); + }); + return result; + } + + private void doCancelAsync(boolean mayInterruptIfRunning, RPromise result, RemotePromise promise, RemoteInvocationOptions optionsCopy) { + if (promise.isCancelled()) { + result.trySuccess(true); + return; + } + + if (promise.isDone()) { + result.trySuccess(false); + return; + } + + cancelExecution(optionsCopy, mayInterruptIfRunning, promise, cancelRequestMapName); + + promise.onComplete((r, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + result.trySuccess(promise.isCancelled()); + }); + } + private void cancelExecution(RemoteInvocationOptions optionsCopy, - boolean mayInterruptIfRunning, RemotePromise remotePromise) { + boolean mayInterruptIfRunning, RemotePromise remotePromise, String cancelRequestMapName) { RMap canceledRequests = new RedissonMap<>(new CompositeCodec(StringCodec.INSTANCE, codec, codec), commandExecutor, cancelRequestMapName, null, null, null); canceledRequests.fastPutAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/rx/RedissonTransferQueueRx.java b/redisson/src/main/java/org/redisson/rx/RedissonTransferQueueRx.java new file mode 100644 index 000000000..6f6b4f822 --- /dev/null +++ b/redisson/src/main/java/org/redisson/rx/RedissonTransferQueueRx.java @@ -0,0 +1,93 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.rx; + +import io.reactivex.Flowable; +import io.reactivex.Single; +import io.reactivex.functions.LongConsumer; +import io.reactivex.processors.ReplayProcessor; +import org.reactivestreams.Publisher; +import org.redisson.RedissonTransferQueue; +import org.redisson.api.RFuture; + +/** + * + * @author Nikita Koksharov + * + * @param - value type + */ +public class RedissonTransferQueueRx { + + private final RedissonTransferQueue queue; + + public RedissonTransferQueueRx(RedissonTransferQueue queue) { + this.queue = queue; + } + + public Flowable takeElements() { + return ElementsStream.takeElements(queue::takeAsync); + } + + public Publisher iterator() { + ReplayProcessor p = ReplayProcessor.create(); + return p.doOnRequest(new LongConsumer() { + + private int currentIndex = 0; + + @Override + public void accept(long n) throws Exception { + queue.getValueAsync(currentIndex).onComplete((value, e) -> { + if (e != null) { + p.onError(e); + return; + } + + if (value != null) { + p.onNext(value); + currentIndex++; + } + + if (value == null) { + p.onComplete(); + return; + } + if (n-1 == 0) { + return; + } + try { + accept(n-1); + } catch (Exception e1) { + e1.printStackTrace(); + } + }); + } + + }); + } + + public Single addAll(Publisher c) { + return new PublisherAdder() { + + @Override + public RFuture add(Object o) { + return queue.addAsync((V) o); + } + + }.addAll(c); + } + + +} diff --git a/redisson/src/test/java/org/redisson/RedissonTransferQueueReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonTransferQueueReactiveTest.java new file mode 100644 index 000000000..dd48cd479 --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonTransferQueueReactiveTest.java @@ -0,0 +1,167 @@ +package org.redisson; + +import org.junit.Assert; +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.api.RBlockingQueueReactive; +import org.redisson.api.RTransferQueue; +import org.redisson.api.RTransferQueueReactive; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.shouldHaveThrown; +import static org.junit.Assert.assertTrue; + +public class RedissonTransferQueueReactiveTest extends BaseReactiveTest { + + @Test + public void testTakeElements() { + RBlockingQueueReactive queue = redisson.getTransferQueue("test"); + List elements = new ArrayList<>(); + queue.takeElements().subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(4); + } + + @Override + public void onNext(Integer t) { + elements.add(t); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + for (int i = 0; i < 10; i++) { + sync(queue.add(i)); + } + + assertThat(elements).containsExactly(0, 1, 2, 3); + } + + @Test + public void testTake() throws InterruptedException { + RBlockingQueueReactive queue1 = redisson.getTransferQueue("queue:take"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RBlockingQueueReactive queue = redisson.getTransferQueue("queue:take"); + sync(queue.put(3)); + }, 10, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = sync(queue1.take()); + + Assert.assertEquals(3, l); + Assert.assertTrue(System.currentTimeMillis() - s > 9000); + } + + @Test + public void testPoll() throws InterruptedException { + RBlockingQueueReactive queue1 = redisson.getTransferQueue("queue1"); + sync(queue1.put(1)); + Assert.assertEquals((Integer)1, sync(queue1.poll(2, TimeUnit.SECONDS))); + + long s = System.currentTimeMillis(); + Assert.assertNull(sync(queue1.poll(5, TimeUnit.SECONDS))); + Assert.assertTrue(System.currentTimeMillis() - s > 5000); + } + @Test + public void testAwait() throws InterruptedException { + RBlockingQueueReactive queue1 = redisson.getTransferQueue("queue1"); + sync(queue1.put(1)); + + Assert.assertEquals((Integer)1, sync(queue1.poll(10, TimeUnit.SECONDS))); + } + + @Test + public void testDrainTo() { + RBlockingQueueReactive queue = redisson.getTransferQueue("queue"); + for (int i = 0 ; i < 100; i++) { + sync(queue.offer(i)); + } + Assert.assertEquals(100, sync(queue.size()).intValue()); + Set batch = new HashSet(); + int count = sync(queue.drainTo(batch, 10)); + Assert.assertEquals(10, count); + Assert.assertEquals(10, batch.size()); + Assert.assertEquals(90, sync(queue.size()).intValue()); + sync(queue.drainTo(batch, 10)); + sync(queue.drainTo(batch, 20)); + sync(queue.drainTo(batch, 60)); + Assert.assertEquals(0, sync(queue.size()).intValue()); + } + + @Test + public void testBlockingQueue() throws InterruptedException { + + RTransferQueueReactive queue = redisson.getTransferQueue("test_:blocking:queue:"); + + ExecutorService executor = Executors.newFixedThreadPool(10); + + final AtomicInteger counter = new AtomicInteger(); + int total = 100; + for (int i = 0; i < total; i++) { + // runnable won't be executed in any particular order, and hence, int value as well. + executor.submit(() -> { + sync(redisson.getTransferQueue("test_:blocking:queue:").add(counter.incrementAndGet())); + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + + int count = 0; + while (count < total) { + int item = sync(queue.take()); + assertTrue(item > 0 && item <= total); + count++; + } + + assertThat(counter.get()).isEqualTo(total); + } + + @Test + public void testDrainToCollection() throws Exception { + RBlockingQueueReactive queue1 = redisson.getTransferQueue("queue1"); + sync(queue1.put(1)); + sync(queue1.put(2L)); + sync(queue1.put("e")); + + ArrayList dst = new ArrayList(); + sync(queue1.drainTo(dst)); + assertThat(dst).containsExactly(1, 2L, "e"); + Assert.assertEquals(0, sync(queue1.size()).intValue()); + } + + @Test + public void testDrainToCollectionLimited() throws Exception { + RBlockingQueueReactive queue1 = redisson.getTransferQueue("queue1"); + sync(queue1.put(1)); + sync(queue1.put(2L)); + sync(queue1.put("e")); + + ArrayList dst = new ArrayList(); + sync(queue1.drainTo(dst, 2)); + assertThat(dst).containsExactly(1, 2L); + Assert.assertEquals(1, sync(queue1.size()).intValue()); + + dst.clear(); + sync(queue1.drainTo(dst, 2)); + assertThat(dst).containsExactly("e"); + } +} diff --git a/redisson/src/test/java/org/redisson/RedissonTransferQueueTest.java b/redisson/src/test/java/org/redisson/RedissonTransferQueueTest.java new file mode 100644 index 000000000..360f9e484 --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonTransferQueueTest.java @@ -0,0 +1,244 @@ +package org.redisson; + +import org.junit.Assert; +import org.junit.Test; +import org.redisson.api.RTransferQueue; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Nikita Koksharov + */ +public class RedissonTransferQueueTest extends BaseTest { + + @Test + public void testTryTransferWithDelay() throws InterruptedException, ExecutionException { + RTransferQueue queue1 = redisson.getTransferQueue("queue"); + Future f = Executors.newSingleThreadExecutor().submit(() -> { + RTransferQueue queue = redisson.getTransferQueue("queue"); + try { + long time = System.currentTimeMillis(); + boolean res = queue.tryTransfer(3, 2, TimeUnit.SECONDS); + assertThat(System.currentTimeMillis() - time).isGreaterThan(1900); + assertThat(res).isFalse(); + + Thread.sleep(1000); + + boolean res2 = queue.tryTransfer(4, 1, TimeUnit.SECONDS); + assertThat(res2).isTrue(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + Thread.sleep(2100); + + assertThat(queue1.size()).isZero(); + + Thread.sleep(1100); + + assertThat(queue1.size()).isEqualTo(1); + assertThat(queue1.peek()).isEqualTo(4); + assertThat(queue1.poll()).isEqualTo(4); + f.get(); + assertThat(queue1.size()).isZero(); + assertThat(queue1.peek()).isNull(); + } + + @Test + public void testTransfer() throws InterruptedException, ExecutionException { + RTransferQueue queue1 = redisson.getTransferQueue("queue"); + AtomicBoolean takeExecuted = new AtomicBoolean(); + Future f = Executors.newSingleThreadExecutor().submit(() -> { + RTransferQueue queue = redisson.getTransferQueue("queue"); + try { + long time = System.currentTimeMillis(); + queue.transfer(3); + assertThat(takeExecuted.get()).isTrue(); + assertThat(System.currentTimeMillis() - time).isGreaterThan(3000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + Thread.sleep(3000); + + assertThat(queue1.size()).isEqualTo(1); + assertThat(queue1.peek()).isEqualTo(3); + assertThat(queue1.take()).isEqualTo(3); + takeExecuted.set(true); + f.get(); + assertThat(queue1.size()).isZero(); + assertThat(queue1.peek()).isNull(); + } + + @Test + public void testTryTransfer() throws InterruptedException, ExecutionException { + RTransferQueue queue1 = redisson.getTransferQueue("queue"); + AtomicBoolean takeExecuted = new AtomicBoolean(); + ScheduledFuture f = Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RTransferQueue queue = redisson.getTransferQueue("queue"); + boolean res = queue.tryTransfer(3); + assertThat(takeExecuted.get()).isTrue(); + assertThat(res).isTrue(); + boolean res2 = queue.tryTransfer(4); + assertThat(res2).isFalse(); + }, 4, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.take(); + takeExecuted.set(true); + + Assert.assertEquals(3, l); + Assert.assertTrue(System.currentTimeMillis() - s > 3900); + f.get(); + assertThat(queue1.size()).isZero(); + assertThat(queue1.peek()).isNull(); + } + + @Test + public void testTake() throws InterruptedException { + RTransferQueue queue1 = redisson.getTransferQueue("queue"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RTransferQueue queue = redisson.getTransferQueue("queue"); + try { + queue.put(3); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }, 10, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.take(); + + Assert.assertEquals(3, l); + Assert.assertTrue(System.currentTimeMillis() - s > 9000); + } + + @Test + public void testPoll() throws InterruptedException { + RTransferQueue queue1 = redisson.getTransferQueue("queue"); + queue1.put(1); + Assert.assertEquals((Integer)1, queue1.poll(2, TimeUnit.SECONDS)); + + long s = System.currentTimeMillis(); + Assert.assertNull(queue1.poll(5, TimeUnit.SECONDS)); + Assert.assertTrue(System.currentTimeMillis() - s > 4900); + } + + @Test + public void testPeek() { + RTransferQueue queue = redisson.getTransferQueue("queue"); + assertThat(queue.peek()).isNull(); + + queue.add("1"); + + assertThat(queue.size()).isEqualTo(1); + assertThat(queue.peek()).isEqualTo("1"); + } + + @Test + public void testReadAll() { + RTransferQueue queue = redisson.getTransferQueue("queue"); + + queue.add("1"); + queue.add("2"); + queue.add("3"); + queue.add("4"); + + assertThat(queue.readAll()).containsExactly("1", "2", "3", "4"); + } + + @Test + public void testDrainTo() { + RTransferQueue queue = redisson.getTransferQueue("queue"); + for (int i = 0 ; i < 100; i++) { + queue.offer(i); + } + Assert.assertEquals(100, queue.size()); + Set batch = new HashSet<>(); + int count = queue.drainTo(batch, 10); + Assert.assertEquals(10, count); + Assert.assertEquals(10, batch.size()); + Assert.assertEquals(90, queue.size()); + queue.drainTo(batch, 10); + queue.drainTo(batch, 20); + queue.drainTo(batch, 60); + Assert.assertEquals(0, queue.size()); + } + + @Test + public void testDrainToSingle() { + RTransferQueue queue = redisson.getTransferQueue("queue"); + Assert.assertTrue(queue.add(1)); + Assert.assertEquals(1, queue.size()); + Set batch = new HashSet<>(); + int count = queue.drainTo(batch); + Assert.assertEquals(1, count); + Assert.assertEquals(1, batch.size()); + Assert.assertTrue(queue.isEmpty()); + } + + @Test + public void testClear() throws ExecutionException, InterruptedException { + RTransferQueue queue = redisson.getTransferQueue("queue"); + queue.add("1"); + queue.add("4"); + queue.add("2"); + queue.add("5"); + queue.add("3"); + + ScheduledFuture f = Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RTransferQueue queue1 = redisson.getTransferQueue("queue"); + try { + queue1.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }, 1, TimeUnit.SECONDS); + + f.get(); + queue.clear(); + assertThat(redisson.getKeys().count()).isZero(); + } + + @Test + public void testIteratorRemove() { + RTransferQueue queue = redisson.getTransferQueue("queue"); + queue.add("1"); + queue.add("4"); + queue.add("2"); + queue.add("5"); + queue.add("3"); + + for (Iterator iterator = queue.iterator(); iterator.hasNext();) { + String value = iterator.next(); + if (value.equals("2")) { + iterator.remove(); + } + } + + assertThat(queue).containsExactly("1", "4", "5", "3"); + + int iteration = 0; + for (Iterator iterator = queue.iterator(); iterator.hasNext();) { + iterator.next(); + iterator.remove(); + iteration++; + } + + Assert.assertEquals(4, iteration); + + Assert.assertEquals(0, queue.size()); + Assert.assertTrue(queue.isEmpty()); + } + +}