Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java
pull/1303/head
Nikita 7 years ago
commit ae1a35fdaf

@ -54,6 +54,7 @@ import org.redisson.api.RMap;
import org.redisson.api.RMapCache; import org.redisson.api.RMapCache;
import org.redisson.api.RPatternTopic; import org.redisson.api.RPatternTopic;
import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RPriorityBlockingDeque;
import org.redisson.api.RPriorityBlockingQueue; import org.redisson.api.RPriorityBlockingQueue;
import org.redisson.api.RPriorityDeque; import org.redisson.api.RPriorityDeque;
import org.redisson.api.RPriorityQueue; import org.redisson.api.RPriorityQueue;
@ -73,7 +74,6 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient; import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.ConfigSupport; import org.redisson.config.ConfigSupport;
@ -104,7 +104,6 @@ public class Redisson implements RedissonClient {
protected final ConnectionManager connectionManager; protected final ConnectionManager connectionManager;
protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = PlatformDependent.newConcurrentHashMap(); protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = PlatformDependent.newConcurrentHashMap();
protected final ReferenceCodecProvider codecProvider;
protected final Config config; protected final Config config;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(); protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
@ -117,7 +116,6 @@ public class Redisson implements RedissonClient {
connectionManager = ConfigSupport.createConnectionManager(configCopy); connectionManager = ConfigSupport.createConnectionManager(configCopy);
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor()); evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
codecProvider = configCopy.getReferenceCodecProvider();
} }
public EvictionScheduler getEvictionScheduler() { public EvictionScheduler getEvictionScheduler() {
@ -584,7 +582,7 @@ public class Redisson implements RedissonClient {
@Override @Override
public RLiveObjectService getLiveObjectService() { public RLiveObjectService getLiveObjectService() {
return new RedissonLiveObjectService(this, liveObjectClassCache, codecProvider); return new RedissonLiveObjectService(this, liveObjectClassCache);
} }
@Override @Override
@ -603,11 +601,6 @@ public class Redisson implements RedissonClient {
return config; return config;
} }
@Override
public ReferenceCodecProvider getCodecProvider() {
return codecProvider;
}
@Override @Override
public NodesGroup<Node> getNodesGroup() { public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager); return new RedisNodes<Node>(connectionManager);
@ -654,6 +647,17 @@ public class Redisson implements RedissonClient {
public <V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name, Codec codec) { public <V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name, Codec codec) {
return new RedissonPriorityBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name, this); return new RedissonPriorityBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
} }
@Override
public <V> RPriorityBlockingDeque<V> getPriorityBlockingDeque(String name) {
return new RedissonPriorityBlockingDeque<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RPriorityBlockingDeque<V> getPriorityBlockingDeque(String name, Codec codec) {
return new RedissonPriorityBlockingDeque<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override @Override
public <V> RPriorityDeque<V> getPriorityDeque(String name) { public <V> RPriorityDeque<V> getPriorityDeque(String name) {

@ -35,7 +35,7 @@ import org.redisson.command.CommandAsyncExecutor;
*/ */
public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDouble { public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDouble {
protected RedissonAtomicDouble(CommandAsyncExecutor commandExecutor, String name) { public RedissonAtomicDouble(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name);
} }

@ -102,10 +102,6 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
protected abstract void doReset(); protected abstract void doReset();
public T sum() {
return get(sumAsync());
}
public void reset() { public void reset() {
get(resetAsync()); get(resetAsync());
} }

@ -28,38 +28,37 @@ import org.redisson.command.CommandAsyncExecutor;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class RedissonDoubleAdder extends RedissonExpirable implements RDoubleAdder { public class RedissonDoubleAdder extends RedissonBaseAdder<Double> implements RDoubleAdder {
private final DoubleAdder counter = new DoubleAdder(); private final DoubleAdder counter = new DoubleAdder();
private final RedissonBaseAdder<Double> adder; private final RAtomicDouble atomicDouble;
public RedissonDoubleAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) { public RedissonDoubleAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(connectionManager, name); super(connectionManager, name, redisson);
final RAtomicDouble atomicDouble = redisson.getAtomicDouble(getName()); atomicDouble = redisson.getAtomicDouble(getName());
adder = new RedissonBaseAdder<Double>(connectionManager, name, redisson) { }
@Override
protected void doReset() {
counter.reset();
}
@Override
protected RFuture<Double> addAndGetAsync() {
return atomicDouble.getAndAddAsync(counter.sum());
}
@Override @Override
protected RFuture<Double> getAsync() { protected void doReset() {
return atomicDouble.getAsync(); counter.reset();
} }
};
@Override
protected RFuture<Double> addAndGetAsync() {
return atomicDouble.getAndAddAsync(counter.sum());
}
@Override
protected RFuture<Double> getAsync() {
return atomicDouble.getAsync();
} }
@Override @Override
public void add(double x) { public void add(double x) {
counter.add(x); counter.add(x);
} }
@Override @Override
public void increment() { public void increment() {
add(1L); add(1L);
@ -72,27 +71,7 @@ public class RedissonDoubleAdder extends RedissonExpirable implements RDoubleAdd
@Override @Override
public double sum() { public double sum() {
return adder.sum(); return get(sumAsync());
}
@Override
public void reset() {
adder.reset();
}
@Override
public RFuture<Double> sumAsync() {
return adder.sumAsync();
}
@Override
public RFuture<Void> resetAsync() {
return adder.resetAsync();
}
@Override
public void destroy() {
adder.destroy();
} }
} }

@ -54,7 +54,6 @@ import org.redisson.api.annotation.RCascade;
import org.redisson.api.annotation.REntity; import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RFieldAccessor; import org.redisson.api.annotation.RFieldAccessor;
import org.redisson.api.annotation.RId; import org.redisson.api.annotation.RId;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.liveobject.LiveObjectTemplate; import org.redisson.liveobject.LiveObjectTemplate;
import org.redisson.liveobject.core.AccessorInterceptor; import org.redisson.liveobject.core.AccessorInterceptor;
import org.redisson.liveobject.core.FieldAccessorInterceptor; import org.redisson.liveobject.core.FieldAccessorInterceptor;
@ -85,14 +84,12 @@ public class RedissonLiveObjectService implements RLiveObjectService {
private static final ConcurrentMap<Class<? extends Resolver>, Resolver<?, ?, ?>> providerCache = PlatformDependent.newConcurrentHashMap(); private static final ConcurrentMap<Class<? extends Resolver>, Resolver<?, ?, ?>> providerCache = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Class<?>, Class<?>> classCache; private final ConcurrentMap<Class<?>, Class<?>> classCache;
private final RedissonClient redisson; private final RedissonClient redisson;
private final ReferenceCodecProvider codecProvider;
private final RedissonObjectBuilder objectBuilder; private final RedissonObjectBuilder objectBuilder;
public RedissonLiveObjectService(RedissonClient redisson, ConcurrentMap<Class<?>, Class<?>> classCache, ReferenceCodecProvider codecProvider) { public RedissonLiveObjectService(RedissonClient redisson, ConcurrentMap<Class<?>, Class<?>> classCache) {
this.redisson = redisson; this.redisson = redisson;
this.classCache = classCache; this.classCache = classCache;
this.codecProvider = codecProvider; this.objectBuilder = new RedissonObjectBuilder(redisson);
this.objectBuilder = new RedissonObjectBuilder(redisson, codecProvider);
} }
//TODO: Add ttl renewal functionality //TODO: Add ttl renewal functionality
@ -642,7 +639,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
.withBinders(FieldProxy.Binder .withBinders(FieldProxy.Binder
.install(LiveObjectInterceptor.Getter.class, .install(LiveObjectInterceptor.Getter.class,
LiveObjectInterceptor.Setter.class)) LiveObjectInterceptor.Setter.class))
.to(new LiveObjectInterceptor(redisson, codecProvider, entityClass, .to(new LiveObjectInterceptor(redisson, entityClass,
getRIdFieldName(entityClass)))) getRIdFieldName(entityClass))))
// .intercept(MethodDelegation.to( // .intercept(MethodDelegation.to(
// new LiveObjectInterceptor(redisson, codecProvider, entityClass, // new LiveObjectInterceptor(redisson, codecProvider, entityClass,

@ -27,32 +27,30 @@ import org.redisson.misc.LongAdder;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class RedissonLongAdder extends RedissonExpirable implements RLongAdder { public class RedissonLongAdder extends RedissonBaseAdder<Long> implements RLongAdder {
private final RAtomicLong atomicLong; private final RAtomicLong atomicLong;
private final LongAdder counter = new LongAdder(); private final LongAdder counter = new LongAdder();
private final RedissonBaseAdder<Long> adder;
public RedissonLongAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) { public RedissonLongAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(connectionManager, name); super(connectionManager, name, redisson);
atomicLong = redisson.getAtomicLong(getName()); atomicLong = redisson.getAtomicLong(getName());
adder = new RedissonBaseAdder<Long>(connectionManager, name, redisson) { }
@Override
protected void doReset() {
counter.reset();
}
@Override
protected RFuture<Long> addAndGetAsync() {
return atomicLong.getAndAddAsync(counter.sum());
}
@Override @Override
protected RFuture<Long> getAsync() { protected void doReset() {
return atomicLong.getAsync(); counter.reset();
} }
};
@Override
protected RFuture<Long> addAndGetAsync() {
return atomicLong.getAndAddAsync(counter.sum());
}
@Override
protected RFuture<Long> getAsync() {
return atomicLong.getAsync();
} }
@Override @Override
@ -69,30 +67,10 @@ public class RedissonLongAdder extends RedissonExpirable implements RLongAdder {
public void decrement() { public void decrement() {
add(-1L); add(-1L);
} }
@Override @Override
public long sum() { public long sum() {
return adder.sum(); return get(sumAsync());
} }
@Override
public void reset() {
adder.reset();
}
@Override
public RFuture<Long> sumAsync() {
return adder.sumAsync();
}
@Override
public RFuture<Void> resetAsync() {
return adder.resetAsync();
}
@Override
public void destroy() {
adder.destroy();
}
} }

@ -0,0 +1,240 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RPriorityBlockingDeque;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
/**
* <p>Distributed and concurrent implementation of priority blocking deque.
*
* <p>Queue size limited by Redis server memory amount. This is why {@link #remainingCapacity()} always
* returns <code>Integer.MAX_VALUE</code>
*
* @author Nikita Koksharov
*/
public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> implements RPriorityBlockingDeque<V> {
private final RedissonPriorityBlockingQueue<V> blockingQueue;
protected RedissonPriorityBlockingDeque(CommandExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
blockingQueue = (RedissonPriorityBlockingQueue<V>) redisson.getPriorityBlockingQueue(name);
}
protected RedissonPriorityBlockingDeque(Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
blockingQueue = (RedissonPriorityBlockingQueue<V>) redisson.getPriorityBlockingQueue(name, codec);
}
@Override
public void put(V e) throws InterruptedException {
add(e);
}
@Override
public boolean offer(V e, long timeout, TimeUnit unit) throws InterruptedException {
return offer(e);
}
@Override
public RFuture<V> takeAsync() {
return blockingQueue.takeAsync();
}
@Override
public V take() throws InterruptedException {
return blockingQueue.take();
}
public RFuture<V> pollAsync(long timeout, TimeUnit unit) {
return blockingQueue.pollAsync(timeout, unit);
}
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
return blockingQueue.poll(timeout, unit);
}
@Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
return blockingQueue.pollLastAndOfferFirstToAsync(queueName, timeout, unit);
}
@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
return blockingQueue.pollLastAndOfferFirstTo(queueName, timeout, unit);
}
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return get(takeLastAndOfferFirstToAsync(queueName));
}
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
}
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
@Override
public int drainTo(Collection<? super V> c) {
return blockingQueue.drainTo(c);
}
public RFuture<Integer> drainToAsync(Collection<? super V> c) {
return blockingQueue.drainToAsync(c);
}
@Override
public int drainTo(Collection<? super V> c, int maxElements) {
return blockingQueue.drainTo(c, maxElements);
}
public RFuture<Integer> drainToAsync(Collection<? super V> c, int maxElements) {
return blockingQueue.drainToAsync(c, maxElements);
}
@Override
public RFuture<Boolean> offerAsync(V e) {
throw new UnsupportedOperationException("use offer method");
}
@Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Void> putAsync(V e) {
throw new UnsupportedOperationException("use add method");
}
@Override
public RFuture<Void> putFirstAsync(V e) {
return addFirstAsync(e);
}
@Override
public RFuture<Void> putLastAsync(V e) {
return addLastAsync(e);
}
@Override
public void putFirst(V e) throws InterruptedException {
addFirst(e);
}
@Override
public void putLast(V e) throws InterruptedException {
addLast(e);
}
@Override
public boolean offerFirst(V e, long timeout, TimeUnit unit) throws InterruptedException {
addFirst(e);
return true;
}
@Override
public boolean offerLast(V e, long timeout, TimeUnit unit) throws InterruptedException {
addLast(e);
return true;
}
@Override
public V takeFirst() throws InterruptedException {
return get(takeFirstAsync());
}
@Override
public RFuture<V> takeFirstAsync() {
return takeAsync();
}
@Override
public RFuture<V> takeLastAsync() {
RPromise<V> result = new RedissonPromise<V>();
blockingQueue.takeAsync(result, 0, 0, RedisCommands.RPOP, getName());
return result;
}
@Override
public V takeLast() throws InterruptedException {
return get(takeLastAsync());
}
@Override
public RFuture<V> pollFirstAsync(long timeout, TimeUnit unit) {
return pollAsync(timeout, unit);
}
@Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
return get(pollFirstFromAnyAsync(timeout, unit, queueNames));
}
@Override
public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
return pollFromAnyAsync(timeout, unit, queueNames);
}
@Override
public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
return get(pollLastFromAnyAsync(timeout, unit, queueNames));
}
@Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
throw new UnsupportedOperationException();
}
@Override
public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
return get(pollFirstAsync(timeout, unit));
}
@Override
public RFuture<V> pollLastAsync(long timeout, TimeUnit unit) {
RPromise<V> result = new RedissonPromise<V>();
blockingQueue.takeAsync(result, 0, unit.toMicros(timeout), RedisCommands.RPOP, getName());
return result;
}
@Override
public V pollLast(long timeout, TimeUnit unit) throws InterruptedException {
return get(pollLastAsync(timeout, unit));
}
}

@ -20,6 +20,7 @@ import java.util.NoSuchElementException;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RPriorityDeque; import org.redisson.api.RPriorityDeque;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -38,22 +39,30 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder()); private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());
protected RedissonPriorityDeque(CommandExecutor commandExecutor, String name, Redisson redisson) { protected RedissonPriorityDeque(CommandExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson); super(commandExecutor, name, redisson);
} }
public RedissonPriorityDeque(Codec codec, CommandExecutor commandExecutor, String name, Redisson redisson) { public RedissonPriorityDeque(Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson); super(codec, commandExecutor, name, redisson);
} }
public RFuture<Void> addFirstAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
public RFuture<Void> addLastAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
@Override @Override
public void addFirst(V e) { public void addFirst(V e) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("use add or put method");
} }
@Override @Override
public void addLast(V e) { public void addLast(V e) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("use add or put method");
} }
@Override @Override
@ -108,14 +117,22 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
@Override @Override
public boolean offerFirst(V e) { public boolean offerFirst(V e) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("use add or put method");
} }
public RFuture<Boolean> offerFirstAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
@Override @Override
public boolean offerLast(V e) { public boolean offerLast(V e) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("use add or put method");
} }
public RFuture<Boolean> offerLastAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
// @Override // @Override
public RFuture<V> peekFirstAsync() { public RFuture<V> peekFirstAsync() {
return getAsync(0); return getAsync(0);
@ -128,12 +145,20 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
@Override @Override
public V peekLast() { public V peekLast() {
return get(getLastAsync()); return get(peekLastAsync());
}
public RFuture<V> peekLastAsync() {
return getLastAsync();
} }
@Override @Override
public V pollFirst() { public V pollFirst() {
return poll(); return get(pollFirstAsync());
}
public RFuture<V> pollFirstAsync() {
return pollAsync();
} }
public RFuture<V> pollLastAsync() { public RFuture<V> pollLastAsync() {
@ -157,9 +182,13 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
@Override @Override
public void push(V e) { public void push(V e) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("use add or put method");
} }
public RFuture<Void> pushAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
// @Override // @Override
public RFuture<Boolean> removeFirstOccurrenceAsync(Object o) { public RFuture<Boolean> removeFirstOccurrenceAsync(Object o) {
return removeAsync(o, 1); return removeAsync(o, 1);

@ -24,6 +24,7 @@ import org.redisson.api.ClusterNode;
import org.redisson.api.MapOptions; import org.redisson.api.MapOptions;
import org.redisson.api.Node; import org.redisson.api.Node;
import org.redisson.api.NodesGroup; import org.redisson.api.NodesGroup;
import org.redisson.api.RAtomicDoubleReactive;
import org.redisson.api.RAtomicLongReactive; import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RBatchReactive; import org.redisson.api.RBatchReactive;
import org.redisson.api.RBitSetReactive; import org.redisson.api.RBitSetReactive;
@ -59,6 +60,7 @@ import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.pubsub.SemaphorePubSub; import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.reactive.RedissonAtomicDoubleReactive;
import org.redisson.reactive.RedissonAtomicLongReactive; import org.redisson.reactive.RedissonAtomicLongReactive;
import org.redisson.reactive.RedissonBatchReactive; import org.redisson.reactive.RedissonBatchReactive;
import org.redisson.reactive.RedissonBitSetReactive; import org.redisson.reactive.RedissonBitSetReactive;
@ -302,6 +304,11 @@ public class RedissonReactive implements RedissonReactiveClient {
public RAtomicLongReactive getAtomicLong(String name) { public RAtomicLongReactive getAtomicLong(String name) {
return new RedissonAtomicLongReactive(commandExecutor, name); return new RedissonAtomicLongReactive(commandExecutor, name);
} }
@Override
public RAtomicDoubleReactive getAtomicDouble(String name) {
return new RedissonAtomicDoubleReactive(commandExecutor, name);
}
@Override @Override
public RBitSetReactive getBitSet(String name) { public RBitSetReactive getBitSet(String name) {

@ -0,0 +1,47 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
*/
public interface RAtomicDoubleReactive extends RExpirableReactive {
Publisher<Boolean> compareAndSet(double expect, double update);
Publisher<Double> addAndGet(double delta);
Publisher<Double> decrementAndGet();
Publisher<Double> get();
Publisher<Double> getAndAdd(double delta);
Publisher<Double> getAndSet(double newValue);
Publisher<Double> incrementAndGet();
Publisher<Double> getAndIncrement();
Publisher<Double> getAndDecrement();
Publisher<Void> set(double newValue);
}

@ -17,6 +17,11 @@ package org.redisson.api;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
*/
public interface RAtomicLongReactive extends RExpirableReactive { public interface RAtomicLongReactive extends RExpirableReactive {
Publisher<Boolean> compareAndSet(long expect, long update); Publisher<Boolean> compareAndSet(long expect, long update);

@ -35,7 +35,7 @@ import java.util.Set;
public interface RMapAsync<K, V> extends RExpirableAsync { public interface RMapAsync<K, V> extends RExpirableAsync {
/** /**
* Loads all map entries to this Redis map. * Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}.
* *
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise. * @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.
* @param parallelism - parallelism level, used to increase speed of process execution * @param parallelism - parallelism level, used to increase speed of process execution
@ -44,7 +44,7 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
RFuture<Void> loadAllAsync(boolean replaceExistingValues, int parallelism); RFuture<Void> loadAllAsync(boolean replaceExistingValues, int parallelism);
/** /**
* Loads map entries whose keys are listed in defined <code>keys</code> parameter. * Loads map entries using {@link org.redisson.api.map.MapLoader} whose keys are listed in defined <code>keys</code> parameter.
* *
* @param keys - map keys * @param keys - map keys
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise. * @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.

@ -35,7 +35,7 @@ import org.redisson.api.map.MapWriter;
public interface RMapReactive<K, V> extends RExpirableReactive { public interface RMapReactive<K, V> extends RExpirableReactive {
/** /**
* Loads all map entries to this Redis map. * Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}.
* *
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise. * @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.
* @param parallelism - parallelism level, used to increase speed of process execution * @param parallelism - parallelism level, used to increase speed of process execution
@ -44,7 +44,7 @@ public interface RMapReactive<K, V> extends RExpirableReactive {
Publisher<Void> loadAll(boolean replaceExistingValues, int parallelism); Publisher<Void> loadAll(boolean replaceExistingValues, int parallelism);
/** /**
* Loads map entries whose keys are listed in defined <code>keys</code> parameter. * Loads map entries using {@link org.redisson.api.map.MapLoader} whose keys are listed in defined <code>keys</code> parameter.
* *
* @param keys - map keys * @param keys - map keys
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise. * @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.

@ -0,0 +1,26 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* RPriorityBlockingDeque backed by Redis
*
* @author Nikita Koksharov
* @param <V> the type of elements held in this collection
*/
public interface RPriorityBlockingDeque<V> extends RBlockingDeque<V>, RPriorityDeque<V> {
}

@ -18,7 +18,6 @@ package org.redisson.api;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config; import org.redisson.config.Config;
/** /**
@ -612,7 +611,7 @@ public interface RedissonClient {
<V> RPriorityQueue<V> getPriorityQueue(String name, Codec codec); <V> RPriorityQueue<V> getPriorityQueue(String name, Codec codec);
/** /**
* Returns priority unbounded blocking queue instance by name. * Returns unbounded priority blocking queue instance by name.
* It uses comparator to sort objects. * It uses comparator to sort objects.
* *
* @param <V> type of value * @param <V> type of value
@ -622,7 +621,7 @@ public interface RedissonClient {
<V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name); <V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name);
/** /**
* Returns priority unbounded blocking queue instance by name * Returns unbounded priority blocking queue instance by name
* using provided codec for queue objects. * using provided codec for queue objects.
* It uses comparator to sort objects. * It uses comparator to sort objects.
* *
@ -633,6 +632,27 @@ public interface RedissonClient {
*/ */
<V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name, Codec codec); <V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name, Codec codec);
/**
* Returns unbounded priority blocking deque instance by name.
* It uses comparator to sort objects.
*
* @param <V> type of value
* @param name of object
* @return Queue object
*/
<V> RPriorityBlockingDeque<V> getPriorityBlockingDeque(String name);
/**
* Returns unbounded priority blocking deque instance by name
* using provided codec for queue objects.
* It uses comparator to sort objects.
*
* @param <V> type of value
* @param name - name of object
* @param codec - codec for message
* @return Queue object
*/
<V> RPriorityBlockingDeque<V> getPriorityBlockingDeque(String name, Codec codec);
/** /**
* Returns priority unbounded deque instance by name. * Returns priority unbounded deque instance by name.
@ -935,13 +955,6 @@ public interface RedissonClient {
*/ */
Config getConfig(); Config getConfig();
/**
* Returns the CodecProvider instance
*
* @return CodecProvider object
*/
public ReferenceCodecProvider getCodecProvider();
/** /**
* Get Redis nodes group for server operations * Get Redis nodes group for server operations
* *

@ -469,6 +469,14 @@ public interface RedissonReactiveClient {
*/ */
RAtomicLongReactive getAtomicLong(String name); RAtomicLongReactive getAtomicLong(String name);
/**
* Returns "atomic double" instance by name.
*
* @param name of the "atomic double"
* @return AtomicLong object
*/
RAtomicDoubleReactive getAtomicDouble(String name);
/** /**
* Returns bitSet instance by name. * Returns bitSet instance by name.
* *

@ -64,6 +64,10 @@ abstract class EvictionTask implements Runnable {
} }
Integer size = future.getNow(); Integer size = future.getNow();
if (size == -1) {
schedule();
return;
}
if (sizeHistory.size() == 2) { if (sizeHistory.size() == 2) {
if (sizeHistory.peekFirst() > sizeHistory.peekLast() if (sizeHistory.peekFirst() > sizeHistory.peekLast()

@ -56,9 +56,10 @@ public class MapCacheEvictionTask extends EvictionTask {
@Override @Override
RFuture<Integer> execute() { RFuture<Integer> execute() {
int latchExpireTime = Math.min(delay, 30);
return executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, return executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"if redis.call('setnx', KEYS[6], ARGV[4]) == 0 then " "if redis.call('setnx', KEYS[6], ARGV[4]) == 0 then "
+ "return 0;" + "return -1;"
+ "end;" + "end;"
+ "redis.call('expire', KEYS[6], ARGV[3]); " + "redis.call('expire', KEYS[6], ARGV[3]); "
+"local expiredKeys1 = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " +"local expiredKeys1 = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
@ -99,7 +100,7 @@ public class MapCacheEvictionTask extends EvictionTask {
+ "end; " + "end; "
+ "return #expiredKeys1 + #expiredKeys2;", + "return #expiredKeys1 + #expiredKeys2;",
Arrays.<Object>asList(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName, executeTaskOnceLatchName), Arrays.<Object>asList(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName, executeTaskOnceLatchName),
System.currentTimeMillis(), keysLimit, delay, 1); System.currentTimeMillis(), keysLimit, latchExpireTime, 1);
} }
} }

@ -29,7 +29,6 @@ import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.REntity.TransformationMode; import org.redisson.api.annotation.REntity.TransformationMode;
import org.redisson.api.annotation.RId; import org.redisson.api.annotation.RId;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.liveobject.misc.ClassUtils; import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior; import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme; import org.redisson.liveobject.resolver.NamingScheme;
@ -52,12 +51,10 @@ import net.bytebuddy.implementation.bind.annotation.This;
public class AccessorInterceptor { public class AccessorInterceptor {
private final RedissonClient redisson; private final RedissonClient redisson;
private final ReferenceCodecProvider codecProvider;
private final RedissonObjectBuilder objectBuilder; private final RedissonObjectBuilder objectBuilder;
public AccessorInterceptor(RedissonClient redisson, RedissonObjectBuilder objectBuilder) { public AccessorInterceptor(RedissonClient redisson, RedissonObjectBuilder objectBuilder) {
this.redisson = redisson; this.redisson = redisson;
this.codecProvider = redisson.getCodecProvider();
this.objectBuilder = objectBuilder; this.objectBuilder = objectBuilder;
} }
@ -104,7 +101,7 @@ public class AccessorInterceptor {
REntity anno = ClassUtils.getAnnotation(rEntity, REntity.class); REntity anno = ClassUtils.getAnnotation(rEntity, REntity.class);
NamingScheme ns = anno.namingScheme() NamingScheme ns = anno.namingScheme()
.getDeclaredConstructor(Codec.class) .getDeclaredConstructor(Codec.class)
.newInstance(codecProvider.getCodec(anno, (Class) rEntity)); .newInstance(redisson.getConfig().getReferenceCodecProvider().getCodec(anno, (Class) rEntity));
liveMap.fastPut(fieldName, new RedissonReference(rEntity, liveMap.fastPut(fieldName, new RedissonReference(rEntity,
ns.getName(rEntity, fieldType, getREntityIdFieldName(liveObject), ns.getName(rEntity, fieldType, getREntityIdFieldName(liveObject),
liveObject.getLiveObjectId()))); liveObject.getLiveObjectId())));

@ -57,9 +57,9 @@ public class LiveObjectInterceptor {
private final NamingScheme namingScheme; private final NamingScheme namingScheme;
private final Class<? extends Codec> codecClass; private final Class<? extends Codec> codecClass;
public LiveObjectInterceptor(RedissonClient redisson, ReferenceCodecProvider codecProvider, Class<?> entityClass, String idFieldName) { public LiveObjectInterceptor(RedissonClient redisson, Class<?> entityClass, String idFieldName) {
this.redisson = redisson; this.redisson = redisson;
this.codecProvider = codecProvider; this.codecProvider = redisson.getConfig().getReferenceCodecProvider();
this.originalClass = entityClass; this.originalClass = entityClass;
this.idFieldName = idFieldName; this.idFieldName = idFieldName;
REntity anno = (REntity) ClassUtils.getAnnotation(entityClass, REntity.class); REntity anno = (REntity) ClassUtils.getAnnotation(entityClass, REntity.class);

@ -76,10 +76,10 @@ public class RedissonObjectBuilder {
private final RedissonClient redisson; private final RedissonClient redisson;
private final ReferenceCodecProvider codecProvider; private final ReferenceCodecProvider codecProvider;
public RedissonObjectBuilder(RedissonClient redisson, ReferenceCodecProvider codecProvider) { public RedissonObjectBuilder(RedissonClient redisson) {
super(); super();
this.redisson = redisson; this.redisson = redisson;
this.codecProvider = codecProvider; this.codecProvider = redisson.getConfig().getReferenceCodecProvider();
} }
public void store(RObject ar, String fieldName, RMap<String, Object> liveMap) { public void store(RObject ar, String fieldName, RMap<String, Object> liveMap) {

@ -0,0 +1,132 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.RedissonAtomicDouble;
import org.redisson.api.RAtomicDoubleAsync;
import org.redisson.api.RAtomicDoubleReactive;
import org.redisson.api.RFuture;
import org.redisson.command.CommandReactiveExecutor;
import java.util.function.Supplier;
/**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
*
* @author Nikita Koksharov
*
*/
public class RedissonAtomicDoubleReactive extends RedissonExpirableReactive implements RAtomicDoubleReactive {
private final RAtomicDoubleAsync instance;
public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonAtomicDouble(commandExecutor, name);
}
@Override
public Publisher<Double> addAndGet(final double delta) {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.addAndGetAsync(delta);
}
});
}
@Override
public Publisher<Boolean> compareAndSet(final double expect, final double update) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.compareAndSetAsync(expect, update);
}
});
}
@Override
public Publisher<Double> decrementAndGet() {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.decrementAndGetAsync();
}
});
}
@Override
public Publisher<Double> get() {
return addAndGet(0);
}
@Override
public Publisher<Double> getAndAdd(final double delta) {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.getAndAddAsync(delta);
}
});
}
@Override
public Publisher<Double> getAndSet(final double newValue) {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.getAndSetAsync(newValue);
}
});
}
@Override
public Publisher<Double> incrementAndGet() {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.incrementAndGetAsync();
}
});
}
@Override
public Publisher<Double> getAndIncrement() {
return getAndAdd(1);
}
@Override
public Publisher<Double> getAndDecrement() {
return getAndAdd(-1);
}
@Override
public Publisher<Void> set(final double newValue) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(newValue);
}
});
}
public String toString() {
return instance.toString();
}
}

@ -24,7 +24,6 @@ import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
import reactor.core.publisher.Mono;
/** /**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
@ -128,7 +127,7 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem
} }
public String toString() { public String toString() {
return Long.toString(Mono.from(get()).block()); return instance.toString();
} }
} }

@ -3,7 +3,6 @@ package org.redisson;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import org.redisson.api.RDoubleAdder; import org.redisson.api.RDoubleAdder;
import org.redisson.api.RLongAdder;
public class RedissonDoubleAdderTest extends BaseTest { public class RedissonDoubleAdderTest extends BaseTest {

@ -0,0 +1,124 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.redisson.api.RBlockingDeque;
public class RedissonPriorityBlockingDequeTest extends BaseTest {
@Test(timeout = 3000)
public void testShortPoll() throws InterruptedException {
RBlockingDeque<Integer> queue = redisson.getPriorityBlockingDeque("queue:pollany");
queue.pollLastAsync(500, TimeUnit.MILLISECONDS);
queue.pollFirstAsync(10, TimeUnit.MICROSECONDS);
}
@Test
public void testTakeFirst() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getPriorityBlockingDeque("queue:take");
deque.add(1);
deque.add(2);
deque.add(3);
deque.add(4);
assertThat(deque.takeFirst()).isEqualTo(1);
assertThat(deque.takeFirst()).isEqualTo(2);
assertThat(deque.takeFirst()).isEqualTo(3);
assertThat(deque.takeFirst()).isEqualTo(4);
assertThat(deque.size()).isZero();
}
@Test
public void testTakeLast() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getPriorityBlockingDeque("queue:take");
deque.add(1);
deque.add(2);
deque.add(3);
deque.add(4);
assertThat(deque.takeLast()).isEqualTo(4);
assertThat(deque.takeLast()).isEqualTo(3);
assertThat(deque.takeLast()).isEqualTo(2);
assertThat(deque.takeLast()).isEqualTo(1);
assertThat(deque.size()).isZero();
}
@Test
public void testTakeFirstAwait() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getPriorityBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingDeque<Integer> deque1 = redisson.getBlockingDeque("queue:take");
deque1.add(1);
deque1.add(2);
deque1.add(3);
deque1.add(4);
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
assertThat(deque.takeFirst()).isEqualTo(1);
assertThat(System.currentTimeMillis() - s).isGreaterThan(9000);
Thread.sleep(50);
assertThat(deque.takeFirst()).isEqualTo(2);
assertThat(deque.takeFirst()).isEqualTo(3);
assertThat(deque.takeFirst()).isEqualTo(4);
}
@Test
public void testTakeLastAwait() throws InterruptedException {
RBlockingDeque<Integer> deque = redisson.getPriorityBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingDeque<Integer> deque1 = redisson.getBlockingDeque("queue:take");
deque1.add(1);
deque1.add(2);
deque1.add(3);
deque1.add(4);
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
assertThat(deque.takeLast()).isEqualTo(4);
assertThat(System.currentTimeMillis() - s).isGreaterThan(9000);
Thread.sleep(50);
assertThat(deque.takeLast()).isEqualTo(3);
assertThat(deque.takeLast()).isEqualTo(2);
assertThat(deque.takeLast()).isEqualTo(1);
}
@Test
public void testPollFirst() throws InterruptedException {
RBlockingDeque<Integer> queue1 = redisson.getPriorityBlockingDeque("queue1");
queue1.put(1);
queue1.put(2);
queue1.put(3);
assertThat(queue1.pollFirst(2, TimeUnit.SECONDS)).isEqualTo(1);
assertThat(queue1.pollFirst(2, TimeUnit.SECONDS)).isEqualTo(2);
assertThat(queue1.pollFirst(2, TimeUnit.SECONDS)).isEqualTo(3);
long s = System.currentTimeMillis();
assertThat(queue1.pollFirst(5, TimeUnit.SECONDS)).isNull();
assertThat(System.currentTimeMillis() - s).isGreaterThan(5000);
}
@Test
public void testPollLast() throws InterruptedException {
RBlockingDeque<Integer> queue1 = redisson.getPriorityBlockingDeque("queue1");
queue1.add(3);
queue1.add(1);
queue1.add(2);
assertThat(queue1.pollLast(2, TimeUnit.SECONDS)).isEqualTo(3);
assertThat(queue1.pollLast(2, TimeUnit.SECONDS)).isEqualTo(2);
assertThat(queue1.pollLast(2, TimeUnit.SECONDS)).isEqualTo(1);
long s = System.currentTimeMillis();
assertThat(queue1.pollLast(5, TimeUnit.SECONDS)).isNull();
assertThat(System.currentTimeMillis() - s).isGreaterThanOrEqualTo(5000);
}
}

@ -1,6 +1,7 @@
package org.redisson.executor; package org.redisson.executor;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -16,65 +17,36 @@ import java.util.concurrent.TimeoutException;
import org.awaitility.Duration; import org.awaitility.Duration;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.redisson.BaseTest; import org.redisson.BaseTest;
import org.redisson.RedissonNode; import org.redisson.RedissonNode;
import org.redisson.RedissonRuntimeEnvironment;
import org.redisson.api.RExecutorBatchFuture; import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture; import org.redisson.api.RExecutorFuture;
import org.redisson.api.RExecutorService; import org.redisson.api.RExecutorService;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig; import org.redisson.config.RedissonNodeConfig;
import static org.awaitility.Awaitility.*;
public class RedissonExecutorServiceTest extends BaseTest { public class RedissonExecutorServiceTest extends BaseTest {
private static RedissonNode node; private static RedissonNode node;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
BaseTest.beforeClass();
if (!RedissonRuntimeEnvironment.isTravis) {
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
node = RedissonNode.create(nodeConfig);
node.start();
}
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
BaseTest.afterClass();
if (!RedissonRuntimeEnvironment.isTravis) {
node.shutdown();
}
}
@Before @Before
@Override @Override
public void before() throws IOException, InterruptedException { public void before() throws IOException, InterruptedException {
super.before(); super.before();
if (RedissonRuntimeEnvironment.isTravis) { Config config = createConfig();
Config config = createConfig(); RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1)); node = RedissonNode.create(nodeConfig);
node = RedissonNode.create(nodeConfig); node.start();
node.start();
}
} }
@After @After
@Override @Override
public void after() throws InterruptedException { public void after() throws InterruptedException {
super.after(); super.after();
if (RedissonRuntimeEnvironment.isTravis) { node.shutdown();
node.shutdown();
}
} }
private void cancel(RExecutorFuture<?> future) throws InterruptedException, ExecutionException { private void cancel(RExecutorFuture<?> future) throws InterruptedException, ExecutionException {

@ -12,14 +12,11 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.redisson.BaseTest; import org.redisson.BaseTest;
import org.redisson.RedissonNode; import org.redisson.RedissonNode;
import org.redisson.RedissonRuntimeEnvironment;
import org.redisson.api.CronSchedule; import org.redisson.api.CronSchedule;
import org.redisson.api.RScheduledExecutorService; import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture; import org.redisson.api.RScheduledFuture;
@ -30,26 +27,6 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
private static RedissonNode node; private static RedissonNode node;
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
BaseTest.beforeClass();
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
node = RedissonNode.create(nodeConfig);
node.start();
}
}
@AfterClass
public static void afterClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {
BaseTest.afterClass();
node.shutdown();
}
}
@Before @Before
@Override @Override
public void before() throws IOException, InterruptedException { public void before() throws IOException, InterruptedException {
@ -68,6 +45,25 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
node.shutdown(); node.shutdown();
} }
@Test(timeout = 7000)
public void testTaskResume() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
ScheduledFuture<Long> future1 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS);
ScheduledFuture<Long> future2 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS);
ScheduledFuture<Long> future3 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS);
node.shutdown();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(redisson.getConfig());
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
node = RedissonNode.create(nodeConfig);
node.start();
assertThat(future1.get()).isEqualTo(100);
assertThat(future2.get()).isEqualTo(100);
assertThat(future3.get()).isEqualTo(100);
}
@Test @Test
public void testLoad() { public void testLoad() {
Config config = createConfig(); Config config = createConfig();

Loading…
Cancel
Save