refactoring

pull/968/head
Nikita 8 years ago
parent 15ef25b5c3
commit 631f455e6b

@ -34,7 +34,7 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong {
protected RedissonAtomicLong(CommandAsyncExecutor commandExecutor, String name) {
public RedissonAtomicLong(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}

@ -32,11 +32,11 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
protected RedissonBucket(CommandAsyncExecutor connectionManager, String name) {
public RedissonBucket(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
protected RedissonBucket(Codec codec, CommandAsyncExecutor connectionManager, String name) {
public RedissonBucket(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
@ -89,7 +89,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
Collections.<Object>singletonList(getName()));
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.GETSET, getName(), newValue);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.GETSET, getName(), encode(newValue));
}
@Override

@ -21,7 +21,7 @@ import org.reactivestreams.Publisher;
/**
* object functions
* Object holder. Max size of object is 512MB
*
* @author Nikita Koksharov
*
@ -29,6 +29,21 @@ import org.reactivestreams.Publisher;
*/
public interface RBucketReactive<V> extends RExpirableReactive {
/**
* Returns size of object in bytes
*
* @return object size
*/
Publisher<Long> size();
Publisher<Boolean> trySet(V value);
Publisher<Boolean> trySet(V value, long timeToLive, TimeUnit timeUnit);
Publisher<Boolean> compareAndSet(V expect, V update);
Publisher<V> getAndSet(V newValue);
Publisher<V> get();
Publisher<Void> set(V value);

@ -277,7 +277,7 @@ public interface RedisCommands {
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor());
RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor());
RedisCommand<Object> GETSET = new RedisCommand<Object>("GETSET", 2);
RedisCommand<Object> GETSET = new RedisCommand<Object>("GETSET");
RedisCommand<Object> GETRANGE = new RedisCommand<Object>("GETRANGE");
RedisCommand<Object> APPEND = new RedisCommand<Object>("APPEND");
RedisCommand<Object> SETRANGE = new RedisCommand<Object>("SETRANGE");

@ -15,17 +15,14 @@
*/
package org.redisson.reactive;
import java.util.Collections;
import org.reactivestreams.Publisher;
import org.redisson.RedissonAtomicLong;
import org.redisson.api.RAtomicLongAsync;
import org.redisson.api.RAtomicLongReactive;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.api.RFuture;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
import reactor.rx.Streams;
/**
@ -36,29 +33,41 @@ import reactor.rx.Streams;
*/
public class RedissonAtomicLongReactive extends RedissonExpirableReactive implements RAtomicLongReactive {
private final RAtomicLongAsync instance;
public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonAtomicLong(commandExecutor, name);
}
@Override
public Publisher<Long> addAndGet(long delta) {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.INCRBY, getName(), delta);
public Publisher<Long> addAndGet(final long delta) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.addAndGetAsync(delta);
}
});
}
@Override
public Publisher<Boolean> compareAndSet(long expect, long update) {
return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getName()), expect, update);
public Publisher<Boolean> compareAndSet(final long expect, final long update) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.compareAndSetAsync(expect, update);
}
});
}
@Override
public Publisher<Long> decrementAndGet() {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName());
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.decrementAndGetAsync();
}
});
}
@Override
@ -68,23 +77,33 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem
@Override
public Publisher<Long> getAndAdd(final long delta) {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Long>("INCRBY", new SingleConvertor<Long>() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public Long convert(Object obj) {
return ((Long) obj) - delta;
public RFuture<Long> get() {
return instance.getAndAddAsync(delta);
}
}), getName(), delta);
});
}
@Override
public Publisher<Long> getAndSet(long newValue) {
return commandExecutor.writeReactive(getName(), LongCodec.INSTANCE, RedisCommands.GETSET, getName(), newValue);
public Publisher<Long> getAndSet(final long newValue) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.getAndSetAsync(newValue);
}
});
}
@Override
public Publisher<Long> incrementAndGet() {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.INCR, getName());
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.incrementAndGetAsync();
}
});
}
@Override
@ -98,8 +117,13 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem
}
@Override
public Publisher<Void> set(long newValue) {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.SET, getName(), newValue);
public Publisher<Void> set(final long newValue) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(newValue);
}
});
}
public String toString() {

@ -18,11 +18,15 @@ package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonBucket;
import org.redisson.api.RBucketAsync;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov
@ -31,27 +35,96 @@ import org.redisson.command.CommandReactiveExecutor;
*/
public class RedissonBucketReactive<V> extends RedissonExpirableReactive implements RBucketReactive<V> {
private final RBucketAsync<V> instance;
public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
instance = new RedissonBucket<V>(connectionManager, name);
}
public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) {
super(codec, connectionManager, name);
instance = new RedissonBucket<V>(codec, connectionManager, name);
}
@Override
public Publisher<V> get() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.GET, getName());
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.getAsync();
}
});
}
@Override
public Publisher<Void> set(V value) {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SET, getName(), value);
public Publisher<Void> set(final V value) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(value);
}
});
}
@Override
public Publisher<Void> set(V value, long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PSETEX, getName(), timeUnit.toMillis(timeToLive), value);
public Publisher<Void> set(final V value, final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(value, timeToLive, timeUnit);
}
});
}
@Override
public Publisher<Long> size() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.sizeAsync();
}
});
}
@Override
public Publisher<Boolean> trySet(final V value) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetAsync(value);
}
});
}
@Override
public Publisher<Boolean> trySet(final V value, final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetAsync(value, timeToLive, timeUnit);
}
});
}
@Override
public Publisher<Boolean> compareAndSet(final V expect, final V update) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.compareAndSetAsync(expect, update);
}
});
}
@Override
public Publisher<V> getAndSet(final V newValue) {
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.getAndSetAsync(newValue);
}
});
}
}

@ -15,12 +15,16 @@
*/
package org.redisson.reactive;
import java.io.IOException;
import org.reactivestreams.Publisher;
import org.redisson.RedissonReference;
import org.redisson.api.RFuture;
import org.redisson.api.RObjectReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.misc.RedissonObjectFactory;
import reactor.fn.Supplier;
import reactor.rx.Stream;
@ -66,6 +70,51 @@ abstract class RedissonObjectReactive implements RObjectReactive {
return codec;
}
protected byte[] encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
protected byte[] encodeMapKey(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getMapKeyEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
protected byte[] encodeMapValue(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getMapValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public Publisher<Void> rename(String newName) {
return commandExecutor.writeReactive(getName(), RedisCommands.RENAME, getName(), newName);

@ -15,7 +15,6 @@
*/
package org.redisson.reactive;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
@ -114,15 +113,8 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
});
}
private byte[] encode(V value) throws IOException {
return codec.getValueEncoder().encode(value);
}
@Override
public Publisher<Long> add(V value) {
try {
byte[] objectState = encode(value);
long timeoutDate = 92233720368547758L;
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LONG,
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
@ -131,10 +123,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
+ "end; " +
"redis.call('zadd', KEYS[1], ARGV[2], ARGV[3]); " +
"return 1; ",
Arrays.<Object>asList(getName()), System.currentTimeMillis(), timeoutDate, objectState);
} catch (IOException e) {
throw new RuntimeException(e);
}
Arrays.<Object>asList(getName()), System.currentTimeMillis(), timeoutDate, encode(value));
}
@Override
@ -166,15 +155,11 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
long score = 92233720368547758L - System.currentTimeMillis();
List<Object> params = new ArrayList<Object>(c.size()*2 + 1);
params.add(getName());
try {
for (V value : c) {
byte[] objectState = encode(value);
params.add(score);
params.add(objectState);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return commandExecutor.writeReactive(getName(), codec, RedisCommands.ZADD_RAW, params.toArray());
}

Loading…
Cancel
Save