Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/api/RTopicReactive.java
pull/1833/head
Nikita Koksharov 6 years ago
commit caabcc3f6f

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import io.netty.util.CharsetUtil;
@ -24,7 +24,7 @@ import io.netty.util.CharsetUtil;
* @author Nikita Koksharov
*
*/
public class BinaryConvertor extends SingleConvertor<Object> {
public class BinaryConvertor implements Convertor<Object> {
@Override
public Object convert(Object obj) {

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.redis.connection.DataType;
/**
@ -23,7 +23,7 @@ import org.springframework.data.redis.connection.DataType;
* @author Nikita Koksharov
*
*/
public class DataTypeConvertor extends SingleConvertor<DataType> {
public class DataTypeConvertor implements Convertor<DataType> {
@Override
public DataType convert(Object obj) {

@ -17,14 +17,14 @@ package org.redisson.spring.data.connection;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class SecondsConvertor extends SingleConvertor<Long> {
public class SecondsConvertor implements Convertor<Long> {
private final TimeUnit unit;
private final TimeUnit source;

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import io.netty.util.CharsetUtil;
@ -24,7 +24,7 @@ import io.netty.util.CharsetUtil;
* @author Nikita Koksharov
*
*/
public class BinaryConvertor extends SingleConvertor<Object> {
public class BinaryConvertor implements Convertor<Object> {
@Override
public Object convert(Object obj) {

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.redis.connection.DataType;
/**
@ -23,7 +23,7 @@ import org.springframework.data.redis.connection.DataType;
* @author Nikita Koksharov
*
*/
public class DataTypeConvertor extends SingleConvertor<DataType> {
public class DataTypeConvertor implements Convertor<DataType> {
@Override
public DataType convert(Object obj) {

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.Metric;
@ -24,7 +24,7 @@ import org.springframework.data.geo.Metric;
* @author Nikita Koksharov
*
*/
public class DistanceConvertor extends SingleConvertor<Distance> {
public class DistanceConvertor implements Convertor<Distance> {
private final Metric metric;

@ -17,14 +17,14 @@ package org.redisson.spring.data.connection;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class SecondsConvertor extends SingleConvertor<Long> {
public class SecondsConvertor implements Convertor<Long> {
private final TimeUnit unit;
private final TimeUnit source;

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import io.netty.util.CharsetUtil;
@ -24,7 +24,7 @@ import io.netty.util.CharsetUtil;
* @author Nikita Koksharov
*
*/
public class BinaryConvertor extends SingleConvertor<Object> {
public class BinaryConvertor implements Convertor<Object> {
@Override
public Object convert(Object obj) {

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.redis.connection.DataType;
/**
@ -23,7 +23,7 @@ import org.springframework.data.redis.connection.DataType;
* @author Nikita Koksharov
*
*/
public class DataTypeConvertor extends SingleConvertor<DataType> {
public class DataTypeConvertor implements Convertor<DataType> {
@Override
public DataType convert(Object obj) {

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.Metric;
@ -24,7 +24,7 @@ import org.springframework.data.geo.Metric;
* @author Nikita Koksharov
*
*/
public class DistanceConvertor extends SingleConvertor<Distance> {
public class DistanceConvertor implements Convertor<Distance> {
private final Metric metric;

@ -17,14 +17,14 @@ package org.redisson.spring.data.connection;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class SecondsConvertor extends SingleConvertor<Long> {
public class SecondsConvertor implements Convertor<Long> {
private final TimeUnit unit;
private final TimeUnit source;

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import io.netty.util.CharsetUtil;
@ -24,7 +24,7 @@ import io.netty.util.CharsetUtil;
* @author Nikita Koksharov
*
*/
public class BinaryConvertor extends SingleConvertor<Object> {
public class BinaryConvertor implements Convertor<Object> {
@Override
public Object convert(Object obj) {

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.redis.connection.DataType;
/**
@ -23,7 +23,7 @@ import org.springframework.data.redis.connection.DataType;
* @author Nikita Koksharov
*
*/
public class DataTypeConvertor extends SingleConvertor<DataType> {
public class DataTypeConvertor implements Convertor<DataType> {
@Override
public DataType convert(Object obj) {

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.Metric;
@ -24,7 +24,7 @@ import org.springframework.data.geo.Metric;
* @author Nikita Koksharov
*
*/
public class DistanceConvertor extends SingleConvertor<Distance> {
public class DistanceConvertor implements Convertor<Distance> {
private final Metric metric;

@ -25,7 +25,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
@ -62,7 +62,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
});
}
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new SingleConvertor<DataType>() {
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new Convertor<DataType>() {
@Override
public DataType convert(Object obj) {
return DataType.fromCode(obj.toString());

@ -17,14 +17,14 @@ package org.redisson.spring.data.connection;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class SecondsConvertor extends SingleConvertor<Long> {
public class SecondsConvertor implements Convertor<Long> {
private final TimeUnit unit;
private final TimeUnit source;

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import io.netty.util.CharsetUtil;
@ -24,7 +24,7 @@ import io.netty.util.CharsetUtil;
* @author Nikita Koksharov
*
*/
public class BinaryConvertor extends SingleConvertor<Object> {
public class BinaryConvertor implements Convertor<Object> {
@Override
public Object convert(Object obj) {

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.redis.connection.DataType;
/**
@ -23,7 +23,7 @@ import org.springframework.data.redis.connection.DataType;
* @author Nikita Koksharov
*
*/
public class DataTypeConvertor extends SingleConvertor<DataType> {
public class DataTypeConvertor implements Convertor<DataType> {
@Override
public DataType convert(Object obj) {

@ -15,7 +15,7 @@
*/
package org.redisson.spring.data.connection;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.Metric;
@ -24,7 +24,7 @@ import org.springframework.data.geo.Metric;
* @author Nikita Koksharov
*
*/
public class DistanceConvertor extends SingleConvertor<Distance> {
public class DistanceConvertor implements Convertor<Distance> {
private final Metric metric;

@ -27,7 +27,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.reactive.RedissonKeysReactive;
import org.springframework.data.redis.connection.DataType;
@ -67,7 +67,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
});
}
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new SingleConvertor<DataType>() {
private static final RedisStrictCommand<DataType> TYPE = new RedisStrictCommand<DataType>("TYPE", new Convertor<DataType>() {
@Override
public DataType convert(Object obj) {
return DataType.fromCode(obj.toString());
@ -310,7 +310,7 @@ public class RedissonReactiveKeyCommands extends RedissonBaseReactive implements
});
}
private static final RedisStrictCommand<ValueEncoding> OBJECT_ENCODING = new RedisStrictCommand<ValueEncoding>("OBJECT", "ENCODING", new SingleConvertor<ValueEncoding>() {
private static final RedisStrictCommand<ValueEncoding> OBJECT_ENCODING = new RedisStrictCommand<ValueEncoding>("OBJECT", "ENCODING", new Convertor<ValueEncoding>() {
@Override
public ValueEncoding convert(Object obj) {
return ValueEncoding.of((String) obj);

@ -17,14 +17,14 @@ package org.redisson.spring.data.connection;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class SecondsConvertor extends SingleConvertor<Long> {
public class SecondsConvertor implements Convertor<Long> {
private final TimeUnit unit;
private final TimeUnit source;

@ -24,7 +24,7 @@ import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -110,7 +110,7 @@ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDo
@Override
public RFuture<Double> getAndAddAsync(final double delta) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Double>("INCRBYFLOAT", new SingleConvertor<Double>() {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Double>("INCRBYFLOAT", new Convertor<Double>() {
@Override
public Double convert(Object obj) {
return Double.valueOf(obj.toString()) - delta;

@ -23,7 +23,7 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -108,7 +108,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
@Override
public RFuture<Long> getAndAddAsync(final long delta) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Long>("INCRBY", new SingleConvertor<Long>() {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Long>("INCRBY", new Convertor<Long>() {
@Override
public Long convert(Object obj) {
return ((Long) obj) - delta;

@ -38,7 +38,7 @@ import org.redisson.command.CommandBatchService;
public class RedissonBitSet extends RedissonExpirable implements RBitSet {
public RedissonBitSet(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
super(null, connectionManager, name);
}
@Override

@ -28,7 +28,6 @@ import java.util.concurrent.locks.Lock;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.RedisResponseTimeoutException;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -236,9 +235,6 @@ public class RedissonMultiLock implements Lock {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisConnectionClosedException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
@ -321,8 +317,7 @@ public class RedissonMultiLock implements Lock {
lockAcquired = future.getNow();
}
if (future.cause() instanceof RedisConnectionClosedException
|| future.cause() instanceof RedisResponseTimeoutException) {
if (future.cause() instanceof RedisResponseTimeoutException) {
unlockInnerAsync(Arrays.asList(lock), threadId);
}

@ -111,7 +111,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Long> sizeInMemoryAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.MEMORY_USAGE, getName());
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.MEMORY_USAGE, getName());
}
public final RFuture<Long> sizeInMemoryAsync(List<Object> keys) {
@ -134,7 +134,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Void> renameAsync(String newName) {
return commandExecutor.writeAsync(getName(), RedisCommands.RENAME, getName(), newName);
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.RENAME, getName(), newName);
}
@Override
@ -144,7 +144,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Void> migrateAsync(String host, int port, int database, long timeout) {
return commandExecutor.writeAsync(getName(), RedisCommands.MIGRATE, host, port, getName(), database, timeout);
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.MIGRATE, host, port, getName(), database, timeout);
}
@Override
@ -154,7 +154,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Void> copyAsync(String host, int port, int database, long timeout) {
return commandExecutor.writeAsync(getName(), RedisCommands.MIGRATE, host, port, getName(), database, timeout, "COPY");
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.MIGRATE, host, port, getName(), database, timeout, "COPY");
}
@Override
@ -164,7 +164,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Boolean> moveAsync(int database) {
return commandExecutor.writeAsync(getName(), RedisCommands.MOVE, getName(), database);
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.MOVE, getName(), database);
}
@Override
@ -174,7 +174,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Boolean> renamenxAsync(String newName) {
return commandExecutor.writeAsync(getName(), RedisCommands.RENAMENX, getName(), newName);
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.RENAMENX, getName(), newName);
}
@Override
@ -184,7 +184,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName());
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.DEL_BOOL, getName());
}
@Override
@ -194,7 +194,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Boolean> unlinkAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.UNLINK_BOOL, getName());
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.UNLINK_BOOL, getName());
}
@Override
@ -204,7 +204,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Boolean> touchAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.TOUCH, getName());
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.TOUCH, getName());
}
@Override
@ -214,7 +214,7 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Boolean> isExistsAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.EXISTS, getName());
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.EXISTS, getName());
}
@Override

@ -27,6 +27,8 @@ import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
@ -80,7 +82,7 @@ public class RedissonTopic implements RTopic {
@Override
public RFuture<Long> publishAsync(Object message) {
return commandExecutor.writeAsync(name, codec, RedisCommands.PUBLISH, name, encode(message));
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PUBLISH, name, encode(message));
}
protected ByteBuf encode(Object value) {
@ -273,4 +275,14 @@ public class RedissonTopic implements RTopic {
return 0;
}
@Override
public RFuture<Long> countSubscribersAsync() {
return commandExecutor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.PUBSUB_NUMSUB, name);
}
@Override
public long countSubscribers() {
return commandExecutor.get(countSubscribersAsync());
}
}

@ -85,9 +85,17 @@ public interface RTopic extends RTopicAsync {
void removeAllListeners();
/**
* Returns amount of registered listeners
* Returns amount of registered listeners to this topic
*
* @return amount of listeners
*/
int countListeners();
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
long countSubscribers();
}

@ -72,4 +72,12 @@ public interface RTopicAsync {
*/
RFuture<Void> removeListenerAsync(MessageListener<?> listener);
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
RFuture<Long> countSubscribersAsync();
}

@ -83,4 +83,13 @@ public interface RTopicReactive {
* @return stream of messages
*/
<M> Flux<M> getMessages(Class<M> type);
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
Mono<Long> countSubscribers();
}

@ -82,5 +82,13 @@ public interface RTopicRx {
* @return stream of messages
*/
<M> Flowable<M> getMessages(Class<M> type);
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
Flowable<Long> countSubscribers();
}

@ -1,31 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisConnectionClosedException extends RedisConnectionException {
private static final long serialVersionUID = -5162298227713965182L;
public RedisConnectionClosedException(String msg) {
super(msg);
}
}

@ -34,6 +34,7 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -166,7 +167,8 @@ public class RedisPubSubConnection extends RedisConnection {
}
private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
return channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
RPromise<R> promise = new RedissonPromise<R>();
return channel.writeAndFlush(new CommandData<T, R>(promise, messageDecoder, null, command, params));
}
public Map<ChannelName, Codec> getChannels() {

@ -51,9 +51,7 @@ import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.SlotsDecoder;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.slf4j.Logger;
@ -87,19 +85,23 @@ public class CommandDecoder extends ReplayingDecoder<State> {
log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data);
}
if (state() == null) {
boolean makeCheckpoint = data != null;
if (data != null) {
if (data instanceof CommandsData) {
makeCheckpoint = false;
} else {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
if (cmd.getCommand().getReplayMultiDecoder() != null
&& (SlotsDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())
|| ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) {
makeCheckpoint = false;
}
}
}
boolean makeCheckpoint = false;
// commented out due to https://github.com/redisson/redisson/issues/1632. Reproduced with RedissonMapCacheTest
//
// boolean makeCheckpoint = data != null;
// if (data != null) {
// if (data instanceof CommandsData) {
// makeCheckpoint = false;
// } else {
// CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
// MultiDecoder<Object> decoder = cmd.getCommand().getReplayMultiDecoder();
// if (decoder != null
// && (decoder instanceof SlotsDecoder
// || decoder instanceof ListMultiDecoder)) {
// makeCheckpoint = false;
// }
// }
// }
state(new State(makeCheckpoint));
}
@ -122,10 +124,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try {
if (state().getLevels().size() > 0) {
if (state().isMakeCheckpoint()) {
decodeFromCheckpoint(ctx, in, data, cmd);
} else {
decode(in, cmd, null, ctx.channel(), false);
decode(in, cmd, null, ctx, false);
}
sendNext(ctx, data);
} catch (Exception e) {
@ -143,6 +145,17 @@ public class CommandDecoder extends ReplayingDecoder<State> {
sendNext(ctx);
throw e;
}
} else {
try {
while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx, false);
}
sendNext(ctx);
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
sendNext(ctx);
throw e;
}
}
}
@ -153,54 +166,18 @@ public class CommandDecoder extends ReplayingDecoder<State> {
protected void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
CommandData<Object, Object> cmd) throws IOException {
if (state().getLevels().size() == 2) {
StateLevel secondLevel = state().getLevels().get(1);
if (secondLevel.getParts().isEmpty()) {
state().getLevels().remove(1);
}
}
StateLevel level = state().getLastLevel();
if (state().getLevels().size() == 2) {
StateLevel firstLevel = state().getLevels().get(0);
StateLevel secondLevel = state().getLevels().get(1);
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts(), false);
MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts());
if (decoder != null) {
Object result = decoder.decode(firstLevel.getParts(), state());
if (data != null) {
handleResult(cmd, null, result, true, ctx.channel());
}
}
List<Object> prevParts = null;
if (state().getLevels().size() > 1) {
StateLevel prevLevel = state().getLevels().get(state().getLevel() - 1);
prevParts = prevLevel.getParts();
}
if (state().getLevels().size() == 1) {
StateLevel firstLevel = state().getLevels().get(0);
if (firstLevel.getParts().isEmpty() && firstLevel.getLastList() == null) {
state().resetLevel();
decode(in, cmd, null, ctx.channel(), false);
} else {
if (firstLevel.getLastList() != null) {
if (firstLevel.getLastList().isEmpty()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel(), false);
} else {
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList(), false);
}
firstLevel.setLastList(null);
firstLevel.setLastListSize(0);
while (in.isReadable() && firstLevel.getParts().size() < firstLevel.getSize()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel(), false);
}
decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false);
} else {
while (firstLevel.getSize() == firstLevel.getParts().size() && in.isReadable()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel(), false);
}
decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false);
}
}
decodeList(in, cmd, prevParts, ctx, level.getSize(), level.getParts(), false);
if (state().getLastLevel() == level) {
state().removeLastLevel();
}
}
@ -233,7 +210,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
try {
decode(in, commandData, null, ctx.channel(), skipConvertor);
decode(in, commandData, null, ctx, skipConvertor);
} finally {
if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) {
commandsData.remove();
@ -291,8 +268,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor) throws IOException {
protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, ChannelHandlerContext ctx, boolean skipConvertor) throws IOException {
int code = in.readByte();
Channel channel = ctx.channel();
if (code == '+') {
ByteBuf rb = in.readBytes(in.bytesBefore((byte) '\r'));
try {
@ -335,7 +313,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (data != null) {
data.tryFailure(new RedisException(error + ". channel: " + channel + " command: " + LogHelper.toString(data)));
} else {
log.error("Error: {} channel: {} data: {}", error, channel, LogHelper.toString(data));
log.error("Error message from Redis: {} channel: {}", error, channel);
}
}
} finally {
@ -354,33 +332,22 @@ public class CommandDecoder extends ReplayingDecoder<State> {
handleResult(data, parts, result, false, channel);
} else if (code == '*') {
long size = readLong(in);
List<Object> respParts;
final List<Object> respParts = new ArrayList<Object>();
StateLevel lastLevel = state().getLastLevel();
if (lastLevel != null && lastLevel.getSize() != lastLevel.getParts().size()) {
respParts = new ArrayList<Object>();
lastLevel.setLastListSize(size);
lastLevel.setLastList(respParts);
} else {
int level = state().incLevel();
if (state().getLevels().size()-1 >= level) {
StateLevel stateLevel = state().getLevels().get(level);
respParts = stateLevel.getParts();
size = stateLevel.getSize();
} else {
respParts = new ArrayList<Object>();
if (state().isMakeCheckpoint()) {
state().addLevel(new StateLevel(size, respParts));
}
}
StateLevel lastLevel = null;
if (state().isMakeCheckpoint()) {
lastLevel = new StateLevel(size, respParts);
state().addLevel(lastLevel);
}
decodeList(in, data, parts, channel, size, respParts, skipConvertor);
decodeList(in, data, parts, ctx, size, respParts, skipConvertor);
if (lastLevel != null && lastLevel.getLastList() != null) {
lastLevel.setLastList(null);
lastLevel.setLastListSize(0);
if (state().isMakeCheckpoint()) {
if (lastLevel == state().getLastLevel() && lastLevel.isFull()) {
state().removeLastLevel();
}
}
} else {
String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8);
throw new IllegalStateException("Can't decode replay: " + dataStr);
@ -389,7 +356,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
@SuppressWarnings("unchecked")
private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, long size, List<Object> respParts, boolean skipConvertor)
ChannelHandlerContext ctx, long size, List<Object> respParts, boolean skipConvertor)
throws IOException {
if (parts == null && commandsData.get() != null) {
List<CommandData<?, ?>> commands = commandsData.get();
@ -399,7 +366,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
suffix = 1;
}
CommandData<Object, Object> commandData = (CommandData<Object, Object>) commands.get(i+suffix);
decode(in, commandData, respParts, channel, skipConvertor);
decode(in, commandData, respParts, ctx, skipConvertor);
if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) {
data.tryFailure(commandData.cause());
}
@ -410,7 +377,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
} else {
for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel, skipConvertor);
decode(in, data, respParts, ctx, skipConvertor);
if (state().isMakeCheckpoint()) {
checkpoint();
}
@ -423,13 +390,13 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
Object result = decoder.decode(respParts, state());
decodeResult(data, parts, channel, result);
decodeResult(data, parts, ctx, result);
}
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, ChannelHandlerContext ctx,
Object result) throws IOException {
if (data != null) {
handleResult(data, parts, result, true, channel);
handleResult(data, parts, result, true, ctx.channel());
}
}
@ -476,6 +443,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (decoder == null) {
if (data.getCodec() == null) {
return StringCodec.INSTANCE.getValueDecoder();
}
if (data.getCommand().getOutParamType() == ValueType.MAP) {
if (parts != null && parts.size() % 2 != 0) {
return data.getCodec().getMapValueDecoder();

@ -39,7 +39,6 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.PlatformDependent;
@ -75,7 +74,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (data == null) {
try {
while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel(), false);
decode(in, null, null, ctx, false);
}
sendNext(ctx);
} catch (Exception e) {
@ -86,11 +85,11 @@ public class CommandPubSubDecoder extends CommandDecoder {
} else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try {
if (state().getLevels().size() > 0) {
if (state().isMakeCheckpoint()) {
decodeFromCheckpoint(ctx, in, data, cmd);
} else {
while (in.writerIndex() > in.readerIndex()) {
decode(in, cmd, null, ctx.channel(), false);
decode(in, cmd, null, ctx, false);
}
}
sendNext(ctx, data);
@ -104,7 +103,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
}
@Override
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, ChannelHandlerContext ctx,
final Object result) throws IOException {
if (executor.isShutdown()) {
return;
@ -113,7 +112,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (result instanceof Message) {
checkpoint();
final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(ctx.channel());
ChannelName channelName = ((Message) result).getChannel();
if (result instanceof PubSubStatusMessage) {
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
@ -161,7 +160,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
}
} else {
if (data != null && data.getCommand().getName().equals("PING")) {
super.decodeResult(data, parts, channel, result);
super.decodeResult(data, parts, ctx, result);
}
}
}

@ -21,7 +21,6 @@ import java.util.Queue;
import java.util.regex.Pattern;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
@ -66,8 +65,19 @@ public class CommandsQueue extends ChannelDuplexHandler {
};
public void sendNextCommand(Channel channel) {
channel.attr(CommandsQueue.CURRENT_COMMAND).set(null);
queue.poll();
QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).getAndSet(null);
if (command != null) {
queue.poll();
} else {
QueueCommandHolder c = queue.peek();
if (c != null) {
QueueCommand data = c.getCommand();
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
queue.poll();
}
}
}
sendData(channel);
}
@ -82,11 +92,6 @@ public class CommandsQueue extends ChannelDuplexHandler {
command.getChannelPromise().tryFailure(
new WriteRedisConnectionException("Channel has been closed! Can't write command: "
+ LogHelper.toString(command.getCommand()) + " to channel: " + ctx.channel()));
if (command.getChannelPromise().isSuccess() && !command.getCommand().isBlockingCommand()) {
command.getCommand().tryFailure(new RedisConnectionClosedException("Command "
+ LogHelper.toString(command.getCommand()) + " succesfully sent, but channel " + ctx.channel() + " has been closed!"));
}
}
super.channelInactive(ctx);

@ -21,6 +21,11 @@ import java.util.List;
import org.redisson.client.protocol.decoder.DecoderState;
/**
*
* @author Nikita Koksharov
*
*/
public class State {
private int batchIndex;
@ -37,18 +42,11 @@ public class State {
public boolean isMakeCheckpoint() {
return makeCheckpoint;
}
public void resetLevel() {
level = -1;
levels.clear();
}
public int decLevel() {
return --level;
}
public int incLevel() {
return ++level;
}
public int getLevel() {
return level;
}
public StateLevel getLastLevel() {
if (levels == null || levels.isEmpty()) {
return null;
@ -61,7 +59,13 @@ public class State {
levels = new ArrayList<StateLevel>(2);
}
levels.add(stateLevel);
level++;
}
public void removeLastLevel() {
levels.remove(level);
level--;
}
public List<StateLevel> getLevels() {
if (levels == null) {
return Collections.emptyList();

@ -17,12 +17,15 @@ package org.redisson.client.handler;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public class StateLevel {
private long size;
private List<Object> parts;
private long lastListSize;
private List<Object> lastList;
private final long size;
private final List<Object> parts;
public StateLevel(long size, List<Object> parts) {
super();
@ -30,20 +33,10 @@ public class StateLevel {
this.parts = parts;
}
public long getLastListSize() {
return lastListSize;
public boolean isFull() {
return size == parts.size();
}
public void setLastListSize(long lastListSize) {
this.lastListSize = lastListSize;
}
public List<Object> getLastList() {
return lastList;
}
public void setLastList(List<Object> lastList) {
this.lastList = lastList;
}
public long getSize() {
return size;
}

@ -426,6 +426,7 @@ public interface RedisCommands {
RedisStrictCommand<Void> QUIT = new RedisStrictCommand<Void>("QUIT", new VoidReplayConvertor());
RedisStrictCommand<Long> PUBLISH = new RedisStrictCommand<Long>("PUBLISH");
RedisCommand<Long> PUBSUB_NUMSUB = new RedisCommand<Long>("PUBSUB", "NUMSUB", new ListObjectDecoder<Long>(1));
RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> UNSUBSCRIBE = new RedisCommand<Object>("UNSUBSCRIBE", new PubSubStatusDecoder());

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class BitsSizeReplayConvertor extends SingleConvertor<Long> {
/**
*
* @author Nikita Koksharov
*
*/
public class BitsSizeReplayConvertor implements Convertor<Long> {
@Override
public Long convert(Object obj) {

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class BooleanAmountReplayConvertor extends SingleConvertor<Boolean> {
/**
*
* @author Nikita Koksharov
*
*/
public class BooleanAmountReplayConvertor implements Convertor<Boolean> {
@Override
public Boolean convert(Object obj) {

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class BooleanNotNullReplayConvertor extends SingleConvertor<Boolean> {
/**
*
* @author Nikita Koksharov
*
*/
public class BooleanNotNullReplayConvertor implements Convertor<Boolean> {
@Override
public Boolean convert(Object obj) {

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class BooleanNullReplayConvertor extends SingleConvertor<Boolean> {
/**
*
* @author Nikita Koksharov
*
*/
public class BooleanNullReplayConvertor implements Convertor<Boolean> {
@Override
public Boolean convert(Object obj) {

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class BooleanNullSafeReplayConvertor extends SingleConvertor<Boolean> {
/**
*
* @author Nikita Koksharov
*
*/
public class BooleanNullSafeReplayConvertor implements Convertor<Boolean> {
@Override
public Boolean convert(Object obj) {

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class BooleanNumberReplayConvertor extends SingleConvertor<Boolean> {
/**
*
* @author Nikita Koksharov
*
*/
public class BooleanNumberReplayConvertor implements Convertor<Boolean> {
private long number;

@ -20,7 +20,7 @@ package org.redisson.client.protocol.convertor;
* @author Nikita Koksharov
*
*/
public class BooleanReplayConvertor extends SingleConvertor<Boolean> {
public class BooleanReplayConvertor implements Convertor<Boolean> {
@Override
public Boolean convert(Object obj) {

@ -23,8 +23,6 @@ package org.redisson.client.protocol.convertor;
*/
public interface Convertor<R> {
Object convertMulti(Object obj);
R convert(Object obj);
}

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class DoubleReplayConvertor extends SingleConvertor<Double> {
/**
*
* @author Nikita Koksharov
*
*/
public class DoubleReplayConvertor implements Convertor<Double> {
@Override
public Double convert(Object obj) {

@ -15,7 +15,13 @@
*/
package org.redisson.client.protocol.convertor;
public class EmptyConvertor<R> extends SingleConvertor<R> {
/**
*
* @author Nikita Koksharov
*
* @param <R>
*/
public class EmptyConvertor<R> implements Convertor<R> {
@Override
public R convert(Object obj) {

@ -20,7 +20,7 @@ package org.redisson.client.protocol.convertor;
* @author Nikita Koksharov
*
*/
public class IntegerReplayConvertor extends SingleConvertor<Integer> {
public class IntegerReplayConvertor implements Convertor<Integer> {
private Integer nullValue;

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class LongReplayConvertor extends SingleConvertor<Long> {
/**
*
* @author Nikita Koksharov
*
*/
public class LongReplayConvertor implements Convertor<Long> {
@Override
public Long convert(Object obj) {

@ -17,7 +17,12 @@ package org.redisson.client.protocol.convertor;
import java.math.BigDecimal;
public class NumberConvertor extends SingleConvertor<Object> {
/**
*
* @author Nikita Koksharov
*
*/
public class NumberConvertor implements Convertor<Object> {
private Class<?> resultClass;

@ -1,31 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
/**
*
* @author Nikita Koksharov
*
* @param <R> type
*/
public abstract class SingleConvertor<R> implements Convertor<R> {
@Override
public Object convertMulti(Object obj) {
return obj;
}
}

@ -22,7 +22,7 @@ import org.redisson.api.StreamMessageId;
* @author Nikita Koksharov
*
*/
public class StreamIdConvertor extends SingleConvertor<StreamMessageId> {
public class StreamIdConvertor implements Convertor<StreamMessageId> {
@Override
public StreamMessageId convert(Object id) {

@ -18,14 +18,12 @@ package org.redisson.client.protocol.convertor;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.protocol.convertor.SingleConvertor;
/**
*
* @author Nikita Koksharov
*
*/
public class StringToListConvertor extends SingleConvertor<List<String>> {
public class StringToListConvertor implements Convertor<List<String>> {
@Override
public List<String> convert(Object obj) {

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class TrueReplayConvertor extends SingleConvertor<Boolean> {
/**
*
* @author Nikita Koksharov
*
*/
public class TrueReplayConvertor implements Convertor<Boolean> {
@Override
public Boolean convert(Object obj) {

@ -17,7 +17,12 @@ package org.redisson.client.protocol.convertor;
import org.redisson.api.RType;
public class TypeConvertor extends SingleConvertor<RType> {
/**
*
* @author Nikita Koksharov
*
*/
public class TypeConvertor implements Convertor<RType> {
@Override
public RType convert(Object obj) {

@ -15,7 +15,12 @@
*/
package org.redisson.client.protocol.convertor;
public class VoidReplayConvertor extends SingleConvertor<Void> {
/**
*
* @author Nikita Koksharov
*
*/
public class VoidReplayConvertor implements Convertor<Void> {
@Override
public Void convert(Object obj) {

@ -219,7 +219,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false);
return mainPromise;
}
@ -227,21 +227,21 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
int slot = connectionManager.calcSlot(name);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false);
return mainPromise;
}
public <T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false);
return mainPromise;
}
@ -291,7 +291,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
for (MasterSlaveEntry entry : nodes) {
RPromise<R> promise = new RedissonPromise<R>();
promise.addListener(listener);
async(true, new NodeSource(entry), codec, command, params, promise, 0, true, null);
async(true, new NodeSource(entry), codec, command, params, promise, 0, true);
}
return mainPromise;
}
@ -336,7 +336,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
});
MasterSlaveEntry entry = nodes.remove(0);
async(true, new NodeSource(entry), codec, command, params, attemptPromise, 0, false, null);
async(true, new NodeSource(entry), codec, command, params, attemptPromise, 0, false);
}
@Override
@ -392,7 +392,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
for (MasterSlaveEntry entry : nodes) {
RPromise<T> promise = new RedissonPromise<T>();
promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true, null);
async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true);
}
return mainPromise;
}
@ -419,7 +419,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(true, source, codec, command, params, mainPromise, 0, false, null);
async(true, source, codec, command, params, mainPromise, 0, false);
return mainPromise;
}
@ -427,20 +427,20 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(true, source, codec, command, params, mainPromise, 0, false, null);
async(true, source, codec, command, params, mainPromise, 0, false);
return mainPromise;
}
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
return mainPromise;
}
@Override
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null);
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
return mainPromise;
}
@ -510,7 +510,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
for (MasterSlaveEntry entry : entries) {
RPromise<T> promise = new RedissonPromise<T>();
promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true, null);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true);
}
return mainPromise;
}
@ -590,7 +590,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(false, nodeSource, codec, command, args.toArray(), promise, 0, false, null);
async(false, nodeSource, codec, command, args.toArray(), promise, 0, false);
promise.addListener(new FutureListener<R>() {
@Override
@ -613,8 +613,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(pps));
async(false, nodeSource, codec, command, args.toArray(), mainPromise, 0, false,
null);
async(false, nodeSource, codec, command, args.toArray(), mainPromise, 0, false);
}
});
} else {
@ -636,7 +635,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false, null);
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false);
return mainPromise;
}
@ -649,20 +648,20 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(false, source, codec, command, params, mainPromise, 0, false, null);
async(false, source, codec, command, params, mainPromise, 0, false);
return mainPromise;
}
public <T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(false, source, codec, command, params, mainPromise, 0, false, null);
async(false, source, codec, command, params, mainPromise, 0, false);
return mainPromise;
}
public <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt,
final boolean ignoreRedirect, final RFuture<RedisConnection> connFuture) {
final boolean ignoreRedirect) {
if (mainPromise.isCancelled()) {
free(params);
return;
@ -764,7 +763,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
count, details.getCommand(), LogHelper.toString(details.getParams()));
}
details.removeMainPromiseListener();
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect, connFuture);
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect);
AsyncDetails.release(details);
}
@ -798,7 +797,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(details, connection);
checkWriteFuture(details, ignoreRedirect, connection);
}
});
@ -831,7 +830,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final boolean ignoreRedirect, final RedisConnection connection) {
ChannelFuture future = details.getWriteFuture();
if (future.isCancelled() || details.getAttemptPromise().isDone()) {
return;
@ -884,9 +883,26 @@ public class CommandAsyncService implements CommandAsyncExecutor {
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) {
if (!details.getAttemptPromise().cancel(false)) {
return;
}
int count = details.getAttempt() + 1;
if (log.isDebugEnabled()) {
log.debug("attempt {} for command {} and params {}",
count, details.getCommand(), LogHelper.toString(details.getParams()));
}
details.removeMainPromiseListener();
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect);
AsyncDetails.release(details);
return;
}
details.getAttemptPromise().tryFailure(
new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + details.getCommand()
+ " with params: " + LogHelper.toString(details.getParams()) + " channel: " + connection.getChannel()));
new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured"
+ " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts. Command: " + details.getCommand()
+ ", params: " + LogHelper.toString(details.getParams()) + ", channel: " + connection.getChannel()));
}
};
@ -905,17 +921,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
final Timeout scheduledFuture;
if (popTimeout != 0) {
// handling cases when connection has been lost
final Channel orignalChannel = connection.getChannel();
scheduledFuture = connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// re-connection hasn't been made
// and connection is still active
// if (orignalChannel == connection.getChannel()
// && connection.isActive()) {
// return;
// }
if (details.getAttemptPromise().trySuccess(null)) {
connection.forceFastReconnectAsync();
}
@ -1004,7 +1012,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
@ -1012,14 +1020,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (future.cause() instanceof RedisAskException && !ignoreRedirect) {
RedisAskException ex = (RedisAskException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisLoadingException) {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
@ -1029,7 +1037,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
}
}, 1, TimeUnit.SECONDS);
@ -1070,12 +1078,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private <R, V> void handleReference(RPromise<R> mainPromise, R res) {
try {
mainPromise.trySuccess((R) tryHandleReference(res));
} catch (Exception e) {
//fall back and let other part of the code handle the type conversion.
mainPromise.trySuccess(res);
}
mainPromise.trySuccess((R) tryHandleReference(res));
}
protected Object tryHandleReference(Object o) {
@ -1198,7 +1201,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
return RedissonObjectFactory.fromReference(redissonRx, (RedissonReference) res);
} catch (Exception exception) {
return res;
throw new IllegalStateException(exception);
}
}

@ -153,7 +153,7 @@ public class CommandBatchService extends CommandAsyncService {
@Override
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect, RFuture<RedisConnection> connFuture) {
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) {
if (executed.get()) {
throw new IllegalStateException("Batch already has been executed!");
}
@ -187,7 +187,7 @@ public class CommandBatchService extends CommandAsyncService {
throw new IllegalStateException("Data modification commands can't be used with queueStore=REDIS_READ_ATOMIC");
}
super.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, true, connFuture);
super.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, true);
}
AsyncSemaphore semaphore = new AsyncSemaphore(0);
@ -439,10 +439,9 @@ public class CommandBatchService extends CommandAsyncService {
final CountableListener<Map<MasterSlaveEntry, List<Object>>> listener = new CountableListener<Map<MasterSlaveEntry, List<Object>>>(mainPromise, result);
listener.setCounter(connections.size());
for (final Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) {
ConnectionEntry connection = connections.get(entry.getKey());
final RPromise<List<Object>> execPromise = new RedissonPromise<List<Object>>();
async(false, new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC,
new Object[] {}, execPromise, 0, false, connection.getConnectionFuture());
new Object[] {}, execPromise, 0, false);
execPromise.addListener(new FutureListener<List<Object>>() {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
@ -647,6 +646,8 @@ public class CommandBatchService extends CommandAsyncService {
final RPromise<Void> attemptPromise = new RedissonPromise<Void>();
final AsyncDetails details = new AsyncDetails();
details.init(null, attemptPromise,
entry.isReadOnlyMode(), source, null, null, null, mainPromise, attempt);
final RFuture<RedisConnection> connectionFuture;
if (entry.isReadOnlyMode()) {
@ -751,7 +752,7 @@ public class CommandBatchService extends CommandAsyncService {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(),
options.getResponseTimeout(), attempts, options.getExecutionMode());
options.getResponseTimeout(), attempts, options.getExecutionMode(), slots);
}
});
@ -808,8 +809,8 @@ public class CommandBatchService extends CommandAsyncService {
}
}
private void checkWriteFuture(Entry entry, final RPromise<Void> attemptPromise, AsyncDetails details,
final RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts) {
private void checkWriteFuture(final Entry entry, final RPromise<Void> attemptPromise, final AsyncDetails details,
final RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts, final AtomicInteger slots, final RPromise<Void> mainPromise) {
if (future.isCancelled() || attemptPromise.isDone()) {
return;
}
@ -817,7 +818,9 @@ public class CommandBatchService extends CommandAsyncService {
if (!future.isSuccess()) {
details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause()));
if (details.getAttempt() == attempts) {
attemptPromise.tryFailure(details.getException());
if (!attemptPromise.tryFailure(details.getException())) {
log.error(details.getException().getMessage());
}
}
return;
}
@ -827,6 +830,21 @@ public class CommandBatchService extends CommandAsyncService {
TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) {
if (!details.getAttemptPromise().cancel(false)) {
return;
}
int count = details.getAttempt() + 1;
if (log.isDebugEnabled()) {
log.debug("attempt {} for command {} and params {}",
count, details.getCommand(), LogHelper.toString(details.getParams()));
}
details.removeMainPromiseListener();
execute(entry, details.getSource(), mainPromise, slots, count, options);
return;
}
attemptPromise.tryFailure(
new RedisResponseTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel()));
}
@ -842,7 +860,8 @@ public class CommandBatchService extends CommandAsyncService {
private void checkConnectionFuture(final Entry entry, final NodeSource source,
final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details,
RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, final int attempts, ExecutionMode executionMode) {
RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, final int attempts,
ExecutionMode executionMode, final AtomicInteger slots) {
if (connFuture.isCancelled()) {
return;
}
@ -881,7 +900,7 @@ public class CommandBatchService extends CommandAsyncService {
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(entry, attemptPromise, details, connection, future, responseTimeout, attempts);
checkWriteFuture(entry, attemptPromise, details, connection, future, responseTimeout, attempts, slots, mainPromise);
}
});

@ -84,7 +84,9 @@ public class RedissonObjectBuilder {
public void store(RObject ar, String fieldName, RMap<String, Object> liveMap) {
Codec codec = ar.getCodec();
codecProvider.registerCodec((Class) codec.getClass(), codec);
if (codec != null) {
codecProvider.registerCodec((Class) codec.getClass(), codec);
}
liveMap.fastPut(fieldName,
new RedissonReference(ar.getClass(), ar.getName(), codec));
}

@ -171,14 +171,18 @@ public class RedissonObjectFactory {
Class<?> clazz = object.getClass().getInterfaces()[0];
RObject rObject = ((RObject) object);
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
if (rObject.getCodec() != null) {
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
}
return new RedissonReference(clazz, rObject.getName(), rObject.getCodec());
}
if (object instanceof RObjectReactive && !(object instanceof RLiveObject)) {
Class<?> clazz = object.getClass().getInterfaces()[0];
RObjectReactive rObject = ((RObjectReactive) object);
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
if (rObject.getCodec() != null) {
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
}
return new RedissonReference(clazz, rObject.getName(), rObject.getCodec());
}

@ -24,7 +24,6 @@ import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandAsyncExecutor;
@ -64,8 +63,8 @@ public class CommandReactiveBatchService extends CommandReactiveService {
@Override
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect, RFuture<RedisConnection> connFuture) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture);
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect);
}
public RFuture<BatchResult<?>> executeAsync(BatchOptions options) {

@ -25,7 +25,6 @@ import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonRxClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandAsyncExecutor;
@ -64,8 +63,8 @@ public class CommandRxBatchService extends CommandRxService {
@Override
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect, RFuture<RedisConnection> connFuture) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture);
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect);
}
public RFuture<BatchResult<?>> executeAsync(BatchOptions options) {

@ -150,7 +150,7 @@ public class RedissonBatchTest extends BaseTest {
BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 300000; i++) {
for (int i = 0; i < 400000; i++) {
batch.getBucket("test").setAsync(123);
}
@ -171,6 +171,8 @@ public class RedissonBatchTest extends BaseTest {
assertThat(redisson.getBucket("test1").get()).isEqualTo(1);
assertThat(redisson.getBucket("test2").get()).isEqualTo(2);
redisson.shutdown();
}
@Test
@ -228,6 +230,7 @@ public class RedissonBatchTest extends BaseTest {
assertThat(result.getSyncedSlaves()).isEqualTo(1);
process.shutdown();
redisson.shutdown();
}
@Test
@ -238,7 +241,8 @@ public class RedissonBatchTest extends BaseTest {
RBatch batch = redisson.createBatch(batchOptions);
RMapCacheAsync<String, String> map = batch.getMapCache("test");
for (int i = 0; i < 200000; i++) {
int total = 200000;
for (int i = 0; i < total; i++) {
RFuture<String> f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES);
if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
f.syncUninterruptibly();
@ -246,7 +250,7 @@ public class RedissonBatchTest extends BaseTest {
}
batch.execute();
assertThat(redisson.getMapCache("test").size()).isEqualTo(200000);
assertThat(redisson.getMapCache("test").size()).isEqualTo(total);
redisson.shutdown();
}
@ -339,6 +343,7 @@ public class RedissonBatchTest extends BaseTest {
}
process.shutdown();
redisson.shutdown();
}

@ -2,8 +2,10 @@ package org.redisson;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.assertj.core.api.Assertions.*;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -19,6 +21,21 @@ import org.redisson.config.Config;
*/
public class RedissonReferenceTest extends BaseTest {
@Test
public void testBitSet() {
RMap<String, RBitSet> data = redisson.getMap("data-00");
RBitSet bs = redisson.getBitSet("data-01");
bs.set(5);
bs.set(7);
data.put("a", bs);
assertThat(data.entrySet()).hasSize(1);
for (Map.Entry<String, RBitSet> entry : data.entrySet()) {
assertThat(entry.getValue().get(5)).isTrue();
assertThat(entry.getValue().get(7)).isTrue();
}
}
@Test
public void testBasic() {
RBucket<Object> b1 = redisson.getBucket("b1");

@ -107,6 +107,20 @@ public class RedissonTopicTest {
}
@Test
public void testCountSubscribers() {
RedissonClient redisson = BaseTest.createInstance();
RTopic topic1 = redisson.getTopic("topic", LongCodec.INSTANCE);
assertThat(topic1.countSubscribers()).isZero();
int id = topic1.addListener(Long.class, (channel, msg) -> {
});
assertThat(topic1.countSubscribers()).isOne();
topic1.removeListener(id);
assertThat(topic1.countSubscribers()).isZero();
redisson.shutdown();
}
@Test
public void testCountListeners() {
RedissonClient redisson = BaseTest.createInstance();

Loading…
Cancel
Save