diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index 517d155ab..901c1ff82 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -33,12 +33,10 @@ import java.util.concurrent.locks.Lock; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonFuture; import org.redisson.misc.RedissonPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ImmediateEventExecutor; /** * Groups multiple independent locks and manages them as one lock. @@ -81,7 +79,7 @@ public class RedissonMultiLock implements Lock { } public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { - RPromise promise = new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + RPromise promise = new RedissonPromise(); long currentThreadId = Thread.currentThread().getId(); Queue lockedLocks = new ConcurrentLinkedQueue(); @@ -184,8 +182,6 @@ public class RedissonMultiLock implements Lock { if (future instanceof RedissonPromise) { future = ((RedissonPromise)future).getInnerPromise(); - } else if (future instanceof RedissonFuture) { - future = ((RedissonFuture)future).getInnerFuture(); } tryLockFutures.put(future, lock); diff --git a/redisson/src/main/java/org/redisson/RedissonNode.java b/redisson/src/main/java/org/redisson/RedissonNode.java index 09dc3ecf3..ce3bc99d9 100644 --- a/redisson/src/main/java/org/redisson/RedissonNode.java +++ b/redisson/src/main/java/org/redisson/RedissonNode.java @@ -131,8 +131,8 @@ public class RedissonNode { } if (hasRedissonInstance) { redisson.shutdown(); + log.info("Redisson node has been shutdown successfully"); } - log.info("Redisson node has been shutdown successfully"); } /** diff --git a/redisson/src/main/java/org/redisson/api/RSemaphore.java b/redisson/src/main/java/org/redisson/api/RSemaphore.java index 0a5b7b122..223ceb2a0 100644 --- a/redisson/src/main/java/org/redisson/api/RSemaphore.java +++ b/redisson/src/main/java/org/redisson/api/RSemaphore.java @@ -19,8 +19,8 @@ import java.util.concurrent.TimeUnit; /** * Distributed and concurrent implementation of {@link java.util.concurrent.Semaphore}. - *

- * Works in non-fair mode. Therefore order of acquiring is unpredictable. + * + *

Works in non-fair mode. Therefore order of acquiring is unpredictable. * * @author Nikita Koksharov * @@ -236,10 +236,7 @@ public interface RSemaphore extends RExpirable, RSemaphoreAsync { /** * Shrinks the number of available permits by the indicated - * reduction. This method can be useful in subclasses that use - * semaphores to track resources that become unavailable. This - * method differs from {@code acquire} in that it does not block - * waiting for permits to become available. + * reduction. * * @param reduction the number of permits to remove * @throws IllegalArgumentException if {@code reduction} is negative diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index cfd7f7524..e3e6257bf 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -131,7 +131,7 @@ public class RedisClient { } public RFuture connectAsync() { - final RPromise f = new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + final RPromise f = new RedissonPromise(); ChannelFuture channelFuture = bootstrap.connect(); channelFuture.addListener(new ChannelFutureListener() { @Override @@ -158,7 +158,7 @@ public class RedisClient { } public RFuture connectPubSubAsync() { - final RPromise f = new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + final RPromise f = new RedissonPromise(); ChannelFuture channelFuture = bootstrap.connect(); channelFuture.addListener(new ChannelFutureListener() { @Override diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index a4845c4c7..0b23c0be0 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -28,7 +28,6 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonFuture; import org.redisson.misc.RedissonPromise; import io.netty.channel.Channel; @@ -36,7 +35,6 @@ import io.netty.channel.ChannelFuture; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; @@ -53,7 +51,7 @@ public class RedisConnection implements RedisCommands { private ReconnectListener reconnectListener; private long lastUsageTime; - private final RFuture acquireFuture = new RedissonFuture(ImmediateEventExecutor.INSTANCE.newSucceededFuture(this)); + private final RFuture acquireFuture = RedissonPromise.newSucceededFuture(this); public RedisConnection(RedisClient redisClient, Channel channel) { super(); @@ -155,7 +153,7 @@ public class RedisConnection implements RedisCommands { } public R sync(Codec encoder, RedisCommand command, Object ... params) { - RPromise promise = new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + RPromise promise = new RedissonPromise(); send(new CommandData(promise, encoder, command, params)); return await(promise); } @@ -173,7 +171,7 @@ public class RedisConnection implements RedisCommands { } public RFuture async(long timeout, Codec encoder, RedisCommand command, Object ... params) { - final RPromise promise = new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + final RPromise promise = new RedissonPromise(); if (timeout == -1) { timeout = redisClient.getCommandTimeout(); } @@ -197,7 +195,7 @@ public class RedisConnection implements RedisCommands { } public CommandData create(Codec encoder, RedisCommand command, Object ... params) { - RPromise promise = new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + RPromise promise = new RedissonPromise(); return new CommandData(promise, encoder, command, params); } diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index c15bce8aa..897b96ee9 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -126,7 +126,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { if (connection.getReconnectListener() != null) { // new connection used only for channel init RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel); - RPromise connectionFuture = new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + RPromise connectionFuture = new RedissonPromise(); connection.getReconnectListener().onReconnect(rc, connectionFuture); connectionFuture.addListener(new FutureListener() { @Override diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index eb496253e..e0c2838d6 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -50,7 +50,6 @@ import org.redisson.config.ReadMode; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonFuture; import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.TransferListener; @@ -69,7 +68,6 @@ import io.netty.util.Timer; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.internal.PlatformDependent; /** @@ -711,17 +709,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public RPromise newPromise() { - return new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + return new RedissonPromise(); } @Override public RFuture newSucceededFuture(R value) { - return new RedissonFuture(ImmediateEventExecutor.INSTANCE.newSucceededFuture(value)); + return RedissonPromise.newSucceededFuture(value); } @Override public RFuture newFailedFuture(Throwable cause) { - return new RedissonFuture(ImmediateEventExecutor.INSTANCE.newFailedFuture(cause)); + return RedissonPromise.newFailedFuture(cause); } @Override diff --git a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java index b6cf398ba..3b0856a6a 100644 --- a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java +++ b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; public class PromiseDelegator implements RPromise { @@ -56,7 +55,7 @@ public class PromiseDelegator implements RPromise { } @Override - public Promise setFailure(Throwable cause) { + public RPromise setFailure(Throwable cause) { return promise.setFailure(cause); } diff --git a/redisson/src/main/java/org/redisson/misc/RPromise.java b/redisson/src/main/java/org/redisson/misc/RPromise.java index 100907e78..4f0a4b7c8 100644 --- a/redisson/src/main/java/org/redisson/misc/RPromise.java +++ b/redisson/src/main/java/org/redisson/misc/RPromise.java @@ -52,7 +52,7 @@ public interface RPromise extends RFuture { * * If it is success or failed already it will throw an {@link IllegalStateException}. */ - Promise setFailure(Throwable cause); + RPromise setFailure(Throwable cause); /** * Marks this future as a failure and notifies all diff --git a/redisson/src/main/java/org/redisson/misc/RedissonFuture.java b/redisson/src/main/java/org/redisson/misc/RedissonFuture.java deleted file mode 100644 index e41bce293..000000000 --- a/redisson/src/main/java/org/redisson/misc/RedissonFuture.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Copyright 2016 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.misc; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.redisson.api.RFuture; - -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - -/** - * - * @author Nikita Koksharov - * - * @param - */ -public class RedissonFuture implements RFuture { - - private final Future future; - - public RedissonFuture(Future future) { - super(); - this.future = future; - } - - public Future getInnerFuture() { - return future; - } - - public boolean isSuccess() { - return future.isSuccess(); - } - - public boolean isCancellable() { - return future.isCancellable(); - } - - public Throwable cause() { - return future.cause(); - } - - public RFuture addListener(FutureListener listener) { - future.addListener(listener); - return this; - } - - public RFuture addListeners(FutureListener... listeners) { - future.addListeners(listeners); - return this; - } - - public RFuture removeListener(FutureListener listener) { - future.removeListener(listener); - return this; - } - - public RFuture removeListeners(FutureListener... listeners) { - future.removeListeners(listeners); - return this; - } - - public RFuture sync() throws InterruptedException { - future.sync(); - return this; - } - - public RFuture syncUninterruptibly() { - future.syncUninterruptibly(); - return this; - } - - public RFuture await() throws InterruptedException { - future.await(); - return this; - } - - public RFuture awaitUninterruptibly() { - future.awaitUninterruptibly(); - return this; - } - - public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return future.await(timeout, unit); - } - - public boolean isCancelled() { - return future.isCancelled(); - } - - public boolean isDone() { - return future.isDone(); - } - - public boolean await(long timeoutMillis) throws InterruptedException { - return future.await(timeoutMillis); - } - - public T get() throws InterruptedException, ExecutionException { - return future.get(); - } - - public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { - return future.awaitUninterruptibly(timeout, unit); - } - - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return future.get(timeout, unit); - } - - public boolean awaitUninterruptibly(long timeoutMillis) { - return future.awaitUninterruptibly(timeoutMillis); - } - - public T getNow() { - return future.getNow(); - } - - public boolean cancel(boolean mayInterruptIfRunning) { - return future.cancel(mayInterruptIfRunning); - } - - - -} diff --git a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java index 95303e46e..41ba8064f 100644 --- a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java +++ b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java @@ -19,7 +19,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.redisson.api.RFuture; + import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; /** @@ -30,12 +33,24 @@ import io.netty.util.concurrent.Promise; */ public class RedissonPromise implements RPromise { - private final Promise promise; + private final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); + + public RedissonPromise() { + } - public RedissonPromise(Promise promise) { - this.promise = promise; + public static RFuture newFailedFuture(Throwable cause) { + RedissonPromise future = new RedissonPromise(); + future.setFailure(cause); + return future; } + public static RFuture newSucceededFuture(V result) { + RedissonPromise future = new RedissonPromise(); + future.setSuccess(result); + return future; + } + + public Promise getInnerPromise() { return promise; } @@ -62,8 +77,9 @@ public class RedissonPromise implements RPromise { } @Override - public Promise setFailure(Throwable cause) { - return promise.setFailure(cause); + public RPromise setFailure(Throwable cause) { + promise.setFailure(cause); + return this; } @Override diff --git a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java index cbc572741..cf701665e 100644 --- a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java @@ -33,12 +33,17 @@ public class LockPubSub extends PublishSubscribe { value.getLatch().release(); synchronized (value) { - Runnable runnable = value.getListeners().poll(); - if (runnable != null) { - if (value.getLatch().tryAcquire()) { - runnable.run(); + while (true) { + Runnable runnable = value.getListeners().poll(); + if (runnable != null) { + if (value.getLatch().tryAcquire()) { + runnable.run(); + } else { + value.addListener(runnable); + return; + } } else { - value.getListeners().add(runnable); + return; } } } diff --git a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java index 6fb3bde77..650aed9f9 100644 --- a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java @@ -28,6 +28,22 @@ public class SemaphorePubSub extends PublishSubscribe { @Override protected void onMessage(RedissonLockEntry value, Long message) { value.getLatch().release(message.intValue()); + + synchronized (value) { + while (true) { + Runnable runnable = value.getListeners().poll(); + if (runnable != null) { + if (value.getLatch().tryAcquire()) { + runnable.run(); + } else { + value.addListener(runnable); + return; + } + } else { + return; + } + } + } } } diff --git a/redisson/src/test/java/org/redisson/RedisClientTest.java b/redisson/src/test/java/org/redisson/RedisClientTest.java index 9419e07ee..23b0b77b8 100644 --- a/redisson/src/test/java/org/redisson/RedisClientTest.java +++ b/redisson/src/test/java/org/redisson/RedisClientTest.java @@ -145,7 +145,7 @@ public class RedisClientTest { CommandData cmd4 = conn.create(null, RedisCommands.PING); commands.add(cmd4); - RPromise p = new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + RPromise p = new RedissonPromise(); conn.send(new CommandsData(p, commands)); assertThat(cmd1.getPromise().get()).isEqualTo("PONG"); @@ -182,7 +182,7 @@ public class RedisClientTest { commands.add(cmd1); } - RPromise p = new RedissonPromise(ImmediateEventExecutor.INSTANCE.newPromise()); + RPromise p = new RedissonPromise(); conn.send(new CommandsData(p, commands)); for (CommandData commandData : commands) {