refactoring

pull/1705/head
Nikita 6 years ago
parent 7bd080442f
commit b23643a340

@ -50,6 +50,7 @@ import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive;
@ -352,12 +353,16 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(evictionScheduler, commandExecutor, name);
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(commandExecutor, set), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, evictionScheduler, commandExecutor, name);
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(commandExecutor, set), RSetCacheReactive.class);
}
@Override

@ -37,6 +37,7 @@ import org.redisson.RedissonQueue;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.RedissonScript;
import org.redisson.RedissonSet;
import org.redisson.RedissonSetCache;
import org.redisson.RedissonSetMultimap;
import org.redisson.RedissonStream;
import org.redisson.api.BatchOptions;
@ -62,6 +63,7 @@ import org.redisson.api.RMapReactive;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive;
@ -230,12 +232,16 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(evictionScheduler, executorService, name);
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, evictionScheduler, executorService, name);
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
}
@Override

@ -19,24 +19,20 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache;
import org.redisson.ScanIterator;
import org.redisson.api.RFuture;
import org.redisson.api.RSetCacheAsync;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetCache;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.eviction.EvictionScheduler;
import io.netty.buffer.ByteBuf;
import reactor.fn.Supplier;
import reactor.rx.Streams;
/**
* <p>Set-based cache with ability to set TTL for each entry via
@ -58,151 +54,54 @@ import reactor.fn.Supplier;
*
* @param <V> value
*/
public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive implements RSetCacheReactive<V> {
public class RedissonSetCacheReactive<V> {
private final RSetCacheAsync<V> instance;
private final RSetCache<V> instance;
private final CommandReactiveExecutor commandExecutor;
public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor, name, new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null));
}
public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync<V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
}
public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
this(codec, commandExecutor, name, new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null));
}
public RedissonSetCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync<V> instance) {
super(codec, commandExecutor, name, instance);
public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, RSetCache<V> instance) {
this.commandExecutor = commandExecutor;
this.instance = instance;
}
@Override
public Publisher<Integer> size() {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sizeAsync();
}
});
}
@Override
public Publisher<Boolean> contains(final Object o) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.containsAsync(o);
}
});
}
@Override
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((ScanIterator)instance).scanIteratorAsync(getName(), client, nextIterPos, null, 10);
return ((ScanIterator)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10);
}
};
}
@Override
public Publisher<Boolean> add(final V value, final long ttl, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.addAsync(value, ttl, unit);
}
});
}
@Override
public Publisher<Integer> add(V value) {
long timeoutDate = 92233720368547758L;
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_INTEGER,
return commandExecutor.evalWriteReactive(instance.getName(), instance.getCodec(), RedisCommands.EVAL_INTEGER,
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; " +
"redis.call('zadd', KEYS[1], ARGV[2], ARGV[3]); " +
"return 1; ",
Arrays.<Object>asList(getName()), System.currentTimeMillis(), timeoutDate, encode(value));
Arrays.<Object>asList(instance.getName()), System.currentTimeMillis(), timeoutDate, ((RedissonSetCache)instance).encode(value));
}
@Override
public Publisher<Set<V>> readAll() {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readAllAsync();
}
});
}
@Override
public Publisher<Boolean> remove(final Object o) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.removeAsync(o);
}
});
}
@Override
public Publisher<Boolean> containsAll(final Collection<?> c) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.containsAllAsync(c);
}
});
}
@Override
public Publisher<Integer> addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return newSucceeded(0);
return Streams.just(0);
}
long score = 92233720368547758L - System.currentTimeMillis();
List<Object> params = new ArrayList<Object>(c.size()*2 + 1);
params.add(getName());
params.add(instance.getName());
for (V value : c) {
ByteBuf objectState = encode(value);
ByteBuf objectState = ((RedissonSetCache)instance).encode(value);
params.add(score);
params.add(objectState);
}
return commandExecutor.writeReactive(getName(), codec, RedisCommands.ZADD_RAW, params.toArray());
}
@Override
public Publisher<Boolean> retainAll(final Collection<?> c) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.retainAllAsync(c);
}
});
}
@Override
public Publisher<Boolean> removeAll(final Collection<?> c) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.removeAllAsync(c);
}
});
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.ZADD_RAW, params.toArray());
}
@Override
public Publisher<Integer> addAll(Publisher<? extends V> c) {
return new PublisherAdder<V>() {
@Override

@ -23,6 +23,7 @@ import org.redisson.api.RMapCache;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RTransaction;
@ -103,12 +104,16 @@ public class RedissonTransactionReactive implements RTransactionReactive {
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(executorService, name, transaction.<V>getSetCache(name));
RSetCache<V> set = transaction.<V>getSetCache(name);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, executorService, name, transaction.<V>getSetCache(name, codec));
RSetCache<V> set = transaction.<V>getSetCache(name, codec);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
}
@Override

@ -14,7 +14,6 @@ import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RSetCacheReactive;
import org.redisson.codec.MsgPackJacksonCodec;
public class RedissonSetCacheReactiveTest extends BaseReactiveTest {

Loading…
Cancel
Save