refactoring

pull/605/head
Nikita 9 years ago
parent 943f135383
commit 8280b417a9

@ -33,12 +33,10 @@ import java.util.concurrent.locks.Lock;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonFuture;
import org.redisson.misc.RedissonPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
/**
* Groups multiple independent locks and manages them as one lock.
@ -81,7 +79,7 @@ public class RedissonMultiLock implements Lock {
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
RPromise<Void> promise = new RedissonPromise<Void>(ImmediateEventExecutor.INSTANCE.<Void>newPromise());
RPromise<Void> promise = new RedissonPromise<Void>();
long currentThreadId = Thread.currentThread().getId();
Queue<RLock> lockedLocks = new ConcurrentLinkedQueue<RLock>();
@ -184,8 +182,6 @@ public class RedissonMultiLock implements Lock {
if (future instanceof RedissonPromise) {
future = ((RedissonPromise<Boolean>)future).getInnerPromise();
} else if (future instanceof RedissonFuture) {
future = ((RedissonFuture<Boolean>)future).getInnerFuture();
}
tryLockFutures.put(future, lock);

@ -131,9 +131,9 @@ public class RedissonNode {
}
if (hasRedissonInstance) {
redisson.shutdown();
}
log.info("Redisson node has been shutdown successfully");
}
}
/**
* Start Redisson node instance

@ -19,8 +19,8 @@ import java.util.concurrent.TimeUnit;
/**
* Distributed and concurrent implementation of {@link java.util.concurrent.Semaphore}.
* <p/>
* Works in non-fair mode. Therefore order of acquiring is unpredictable.
*
* <p>Works in non-fair mode. Therefore order of acquiring is unpredictable.
*
* @author Nikita Koksharov
*
@ -236,10 +236,7 @@ public interface RSemaphore extends RExpirable, RSemaphoreAsync {
/**
* Shrinks the number of available permits by the indicated
* reduction. This method can be useful in subclasses that use
* semaphores to track resources that become unavailable. This
* method differs from {@code acquire} in that it does not block
* waiting for permits to become available.
* reduction.
*
* @param reduction the number of permits to remove
* @throws IllegalArgumentException if {@code reduction} is negative

@ -131,7 +131,7 @@ public class RedisClient {
}
public RFuture<RedisConnection> connectAsync() {
final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>(ImmediateEventExecutor.INSTANCE.<RedisConnection>newPromise());
final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();
ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() {
@Override
@ -158,7 +158,7 @@ public class RedisClient {
}
public RFuture<RedisPubSubConnection> connectPubSubAsync() {
final RPromise<RedisPubSubConnection> f = new RedissonPromise<RedisPubSubConnection>(ImmediateEventExecutor.INSTANCE.<RedisPubSubConnection>newPromise());
final RPromise<RedisPubSubConnection> f = new RedissonPromise<RedisPubSubConnection>();
ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() {
@Override

@ -28,7 +28,6 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonFuture;
import org.redisson.misc.RedissonPromise;
import io.netty.channel.Channel;
@ -36,7 +35,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
@ -53,7 +51,7 @@ public class RedisConnection implements RedisCommands {
private ReconnectListener reconnectListener;
private long lastUsageTime;
private final RFuture<?> acquireFuture = new RedissonFuture(ImmediateEventExecutor.INSTANCE.newSucceededFuture(this));
private final RFuture<?> acquireFuture = RedissonPromise.newSucceededFuture(this);
public RedisConnection(RedisClient redisClient, Channel channel) {
super();
@ -155,7 +153,7 @@ public class RedisConnection implements RedisCommands {
}
public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
RPromise<R> promise = new RedissonPromise<R>(ImmediateEventExecutor.INSTANCE.<R>newPromise());
RPromise<R> promise = new RedissonPromise<R>();
send(new CommandData<T, R>(promise, encoder, command, params));
return await(promise);
}
@ -173,7 +171,7 @@ public class RedisConnection implements RedisCommands {
}
public <T, R> RFuture<R> async(long timeout, Codec encoder, RedisCommand<T> command, Object ... params) {
final RPromise<R> promise = new RedissonPromise<R>(ImmediateEventExecutor.INSTANCE.<R>newPromise());
final RPromise<R> promise = new RedissonPromise<R>();
if (timeout == -1) {
timeout = redisClient.getCommandTimeout();
}
@ -197,7 +195,7 @@ public class RedisConnection implements RedisCommands {
}
public <T, R> CommandData<T, R> create(Codec encoder, RedisCommand<T> command, Object ... params) {
RPromise<R> promise = new RedissonPromise<R>(ImmediateEventExecutor.INSTANCE.<R>newPromise());
RPromise<R> promise = new RedissonPromise<R>();
return new CommandData<T, R>(promise, encoder, command, params);
}

@ -126,7 +126,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
if (connection.getReconnectListener() != null) {
// new connection used only for channel init
RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel);
RPromise<RedisConnection> connectionFuture = new RedissonPromise<RedisConnection>(ImmediateEventExecutor.INSTANCE.<RedisConnection>newPromise());
RPromise<RedisConnection> connectionFuture = new RedissonPromise<RedisConnection>();
connection.getReconnectListener().onReconnect(rc, connectionFuture);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override

@ -50,7 +50,6 @@ import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonFuture;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.TransferListener;
@ -69,7 +68,6 @@ import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.internal.PlatformDependent;
/**
@ -711,17 +709,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public <R> RPromise<R> newPromise() {
return new RedissonPromise<R>(ImmediateEventExecutor.INSTANCE.<R>newPromise());
return new RedissonPromise<R>();
}
@Override
public <R> RFuture<R> newSucceededFuture(R value) {
return new RedissonFuture<R>(ImmediateEventExecutor.INSTANCE.<R>newSucceededFuture(value));
return RedissonPromise.newSucceededFuture(value);
}
@Override
public <R> RFuture<R> newFailedFuture(Throwable cause) {
return new RedissonFuture<R>(ImmediateEventExecutor.INSTANCE.<R>newFailedFuture(cause));
return RedissonPromise.newFailedFuture(cause);
}
@Override

@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class PromiseDelegator<T> implements RPromise<T> {
@ -56,7 +55,7 @@ public class PromiseDelegator<T> implements RPromise<T> {
}
@Override
public Promise<T> setFailure(Throwable cause) {
public RPromise<T> setFailure(Throwable cause) {
return promise.setFailure(cause);
}

@ -52,7 +52,7 @@ public interface RPromise<T> extends RFuture<T> {
*
* If it is success or failed already it will throw an {@link IllegalStateException}.
*/
Promise<T> setFailure(Throwable cause);
RPromise<T> setFailure(Throwable cause);
/**
* Marks this future as a failure and notifies all

@ -1,140 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.redisson.api.RFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
* @param <T>
*/
public class RedissonFuture<T> implements RFuture<T> {
private final Future<T> future;
public RedissonFuture(Future<T> future) {
super();
this.future = future;
}
public Future<T> getInnerFuture() {
return future;
}
public boolean isSuccess() {
return future.isSuccess();
}
public boolean isCancellable() {
return future.isCancellable();
}
public Throwable cause() {
return future.cause();
}
public RFuture<T> addListener(FutureListener<? super T> listener) {
future.addListener(listener);
return this;
}
public RFuture<T> addListeners(FutureListener<? super T>... listeners) {
future.addListeners(listeners);
return this;
}
public RFuture<T> removeListener(FutureListener<? super T> listener) {
future.removeListener(listener);
return this;
}
public RFuture<T> removeListeners(FutureListener<? super T>... listeners) {
future.removeListeners(listeners);
return this;
}
public RFuture<T> sync() throws InterruptedException {
future.sync();
return this;
}
public RFuture<T> syncUninterruptibly() {
future.syncUninterruptibly();
return this;
}
public RFuture<T> await() throws InterruptedException {
future.await();
return this;
}
public RFuture<T> awaitUninterruptibly() {
future.awaitUninterruptibly();
return this;
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return future.await(timeout, unit);
}
public boolean isCancelled() {
return future.isCancelled();
}
public boolean isDone() {
return future.isDone();
}
public boolean await(long timeoutMillis) throws InterruptedException {
return future.await(timeoutMillis);
}
public T get() throws InterruptedException, ExecutionException {
return future.get();
}
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return future.awaitUninterruptibly(timeout, unit);
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return future.get(timeout, unit);
}
public boolean awaitUninterruptibly(long timeoutMillis) {
return future.awaitUninterruptibly(timeoutMillis);
}
public T getNow() {
return future.getNow();
}
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
}

@ -19,7 +19,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.redisson.api.RFuture;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
/**
@ -30,12 +33,24 @@ import io.netty.util.concurrent.Promise;
*/
public class RedissonPromise<T> implements RPromise<T> {
private final Promise<T> promise;
private final Promise<T> promise = ImmediateEventExecutor.INSTANCE.newPromise();
public RedissonPromise() {
}
public RedissonPromise(Promise<T> promise) {
this.promise = promise;
public static <V> RFuture<V> newFailedFuture(Throwable cause) {
RedissonPromise<V> future = new RedissonPromise<V>();
future.setFailure(cause);
return future;
}
public static <V> RFuture<V> newSucceededFuture(V result) {
RedissonPromise<V> future = new RedissonPromise<V>();
future.setSuccess(result);
return future;
}
public Promise<T> getInnerPromise() {
return promise;
}
@ -62,8 +77,9 @@ public class RedissonPromise<T> implements RPromise<T> {
}
@Override
public Promise<T> setFailure(Throwable cause) {
return promise.setFailure(cause);
public RPromise<T> setFailure(Throwable cause) {
promise.setFailure(cause);
return this;
}
@Override

@ -33,12 +33,17 @@ public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
value.getLatch().release();
synchronized (value) {
while (true) {
Runnable runnable = value.getListeners().poll();
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnable.run();
} else {
value.getListeners().add(runnable);
value.addListener(runnable);
return;
}
} else {
return;
}
}
}

@ -28,6 +28,22 @@ public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
value.getLatch().release(message.intValue());
synchronized (value) {
while (true) {
Runnable runnable = value.getListeners().poll();
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnable.run();
} else {
value.addListener(runnable);
return;
}
} else {
return;
}
}
}
}
}

@ -145,7 +145,7 @@ public class RedisClientTest {
CommandData<String, String> cmd4 = conn.create(null, RedisCommands.PING);
commands.add(cmd4);
RPromise<Void> p = new RedissonPromise<Void>(ImmediateEventExecutor.INSTANCE.newPromise());
RPromise<Void> p = new RedissonPromise<Void>();
conn.send(new CommandsData(p, commands));
assertThat(cmd1.getPromise().get()).isEqualTo("PONG");
@ -182,7 +182,7 @@ public class RedisClientTest {
commands.add(cmd1);
}
RPromise<Void> p = new RedissonPromise<Void>(ImmediateEventExecutor.INSTANCE.newPromise());
RPromise<Void> p = new RedissonPromise<Void>();
conn.send(new CommandsData(p, commands));
for (CommandData<?, ?> commandData : commands) {

Loading…
Cancel
Save