Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java
pull/1821/head
Nikita 6 years ago
commit b1ae9ec044

@ -50,6 +50,7 @@ import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive;
@ -352,12 +353,16 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(evictionScheduler, commandExecutor, name);
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(commandExecutor, set), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, evictionScheduler, commandExecutor, name);
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(commandExecutor, set), RSetCacheReactive.class);
}
@Override

@ -21,6 +21,7 @@ import java.util.Queue;
import java.util.regex.Pattern;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
@ -79,6 +80,10 @@ public class CommandsQueue extends ChannelDuplexHandler {
command.getChannelPromise().tryFailure(
new WriteRedisConnectionException("Channel has been closed! Can't write command: " + command.getCommand() + " to channel: " + ctx.channel()));
if (command.getChannelPromise().isSuccess()) {
command.getCommand().tryFailure(new RedisConnectionException("Command succesfully sent, but channel " + ctx.channel() + " has been closed!"));
}
}
super.channelInactive(ctx);

@ -38,6 +38,7 @@ import org.redisson.RedissonQueue;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.RedissonScript;
import org.redisson.RedissonSet;
import org.redisson.RedissonSetCache;
import org.redisson.RedissonSetMultimap;
import org.redisson.RedissonStream;
import org.redisson.api.BatchOptions;
@ -63,6 +64,7 @@ import org.redisson.api.RMapReactive;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive;
@ -232,12 +234,16 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(evictionScheduler, executorService, name);
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, evictionScheduler, executorService, name);
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
}
@Override

@ -19,7 +19,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@ -27,14 +26,11 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache;
import org.redisson.ScanIterator;
import org.redisson.api.RFuture;
import org.redisson.api.RSetCacheAsync;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetCache;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.eviction.EvictionScheduler;
import io.netty.buffer.ByteBuf;
import reactor.core.publisher.Flux;
@ -59,151 +55,54 @@ import reactor.core.publisher.Flux;
*
* @param <V> value
*/
public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive implements RSetCacheReactive<V> {
public class RedissonSetCacheReactive<V> {
private final RSetCacheAsync<V> instance;
private final RSetCache<V> instance;
private final CommandReactiveExecutor commandExecutor;
public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor, name, new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null));
}
public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync<V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
}
public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
this(codec, commandExecutor, name, new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null));
}
public RedissonSetCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync<V> instance) {
super(codec, commandExecutor, name, instance);
public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, RSetCache<V> instance) {
this.commandExecutor = commandExecutor;
this.instance = instance;
}
@Override
public Publisher<Integer> size() {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sizeAsync();
}
});
}
@Override
public Publisher<Boolean> contains(final Object o) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.containsAsync(o);
}
});
}
@Override
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((ScanIterator)instance).scanIteratorAsync(getName(), client, nextIterPos, null, 10);
return ((ScanIterator)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10);
}
});
}
@Override
public Publisher<Boolean> add(final V value, final long ttl, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.addAsync(value, ttl, unit);
}
});
}
@Override
public Publisher<Integer> add(V value) {
long timeoutDate = 92233720368547758L;
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_INTEGER,
return commandExecutor.evalWriteReactive(instance.getName(), instance.getCodec(), RedisCommands.EVAL_INTEGER,
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; " +
"redis.call('zadd', KEYS[1], ARGV[2], ARGV[3]); " +
"return 1; ",
Arrays.<Object>asList(getName()), System.currentTimeMillis(), timeoutDate, encode(value));
}
@Override
public Publisher<Set<V>> readAll() {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readAllAsync();
}
});
}
@Override
public Publisher<Boolean> remove(final Object o) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.removeAsync(o);
}
});
}
@Override
public Publisher<Boolean> containsAll(final Collection<?> c) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.containsAllAsync(c);
}
});
Arrays.<Object>asList(instance.getName()), System.currentTimeMillis(), timeoutDate, ((RedissonSetCache)instance).encode(value));
}
@Override
public Publisher<Integer> addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return newSucceeded(0);
return Streams.just(0);
}
long score = 92233720368547758L - System.currentTimeMillis();
List<Object> params = new ArrayList<Object>(c.size()*2 + 1);
params.add(getName());
params.add(instance.getName());
for (V value : c) {
ByteBuf objectState = encode(value);
ByteBuf objectState = ((RedissonSetCache)instance).encode(value);
params.add(score);
params.add(objectState);
}
return commandExecutor.writeReactive(getName(), codec, RedisCommands.ZADD_RAW, params.toArray());
}
@Override
public Publisher<Boolean> retainAll(final Collection<?> c) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.retainAllAsync(c);
}
});
}
@Override
public Publisher<Boolean> removeAll(final Collection<?> c) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.removeAllAsync(c);
}
});
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.ZADD_RAW, params.toArray());
}
@Override
public Publisher<Integer> addAll(Publisher<? extends V> c) {
return new PublisherAdder<V>() {
@Override

@ -25,6 +25,7 @@ import org.redisson.api.RMapCache;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RTransaction;
@ -103,12 +104,16 @@ public class RedissonTransactionReactive implements RTransactionReactive {
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(executorService, name, transaction.<V>getSetCache(name));
RSetCache<V> set = transaction.<V>getSetCache(name);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, executorService, name, transaction.<V>getSetCache(name, codec));
RSetCache<V> set = transaction.<V>getSetCache(name, codec);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
}
@Override

@ -14,7 +14,6 @@ import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RSetCacheReactive;
import org.redisson.codec.MsgPackJacksonCodec;
public class RedissonSetCacheReactiveTest extends BaseReactiveTest {

Loading…
Cancel
Save