refactoring

pull/3505/head
Nikita Koksharov 4 years ago
parent e429081ded
commit fc3d59ffa2

@ -1523,7 +1523,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void subscribe(MessageListener listener, byte[]... channels) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.subscribe(channels);
}
@ -1544,7 +1544,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void pSubscribe(MessageListener listener, byte[]... patterns) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.pSubscribe(patterns);
}

@ -20,6 +20,7 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -39,12 +40,12 @@ import java.util.List;
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
this.subscribeService = subscribeService;
}
@ -67,7 +68,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}
@ -97,7 +98,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}

@ -1580,7 +1580,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void subscribe(MessageListener listener, byte[]... channels) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.subscribe(channels);
}
@ -1601,7 +1601,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void pSubscribe(MessageListener listener, byte[]... patterns) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.pSubscribe(patterns);
}

@ -20,6 +20,7 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -39,12 +40,12 @@ import java.util.List;
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
this.subscribeService = subscribeService;
}
@ -67,7 +68,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}
@ -97,7 +98,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}

@ -1593,7 +1593,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void subscribe(MessageListener listener, byte[]... channels) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.subscribe(channels);
}
@ -1614,7 +1614,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void pSubscribe(MessageListener listener, byte[]... patterns) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.pSubscribe(patterns);
}

@ -20,6 +20,7 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -39,12 +40,12 @@ import java.util.List;
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
this.subscribeService = subscribeService;
}
@ -67,7 +68,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}
@ -97,7 +98,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}

@ -1581,7 +1581,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void subscribe(MessageListener listener, byte[]... channels) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.subscribe(channels);
}
@ -1602,7 +1602,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void pSubscribe(MessageListener listener, byte[]... patterns) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.pSubscribe(patterns);
}

@ -144,12 +144,12 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public ReactiveRedisConnection getReactiveConnection() {
return new RedissonReactiveRedisConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
@Override
public ReactiveRedisClusterConnection getReactiveClusterConnection() {
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
}

@ -20,6 +20,7 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -39,12 +40,12 @@ import java.util.List;
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
this.subscribeService = subscribeService;
}
@ -67,7 +68,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}
@ -97,7 +98,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}

@ -81,7 +81,9 @@ public class RedissonReactiveClusterKeyCommandsTest {
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager()));
connection = new RedissonReactiveRedisClusterConnection(
new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getCommandExecutor().getConnectionManager(),
((RedissonKeys) redisson.getKeys()).getCommandExecutor().getObjectBuilder()));
}
@AfterClass

@ -1612,7 +1612,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void subscribe(MessageListener listener, byte[]... channels) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(redisson.getCommandExecutor(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription.subscribe(channels);
}
@ -1633,7 +1633,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void pSubscribe(MessageListener listener, byte[]... patterns) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(redisson.getCommandExecutor(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription.pSubscribe(patterns);
}

@ -144,12 +144,12 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public ReactiveRedisConnection getReactiveConnection() {
return new RedissonReactiveRedisConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
@Override
public ReactiveRedisClusterConnection getReactiveClusterConnection() {
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
}

@ -20,6 +20,7 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -39,12 +40,12 @@ import java.util.List;
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
this.subscribeService = subscribeService;
}
@ -67,7 +68,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}
@ -97,7 +98,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}

@ -80,7 +80,7 @@ public class RedissonReactiveClusterKeyCommandsTest {
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager()));
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
@AfterClass

@ -1604,7 +1604,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void subscribe(MessageListener listener, byte[]... channels) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.subscribe(channels);
}
@ -1625,7 +1625,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void pSubscribe(MessageListener listener, byte[]... patterns) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.pSubscribe(patterns);
}

@ -150,12 +150,12 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public ReactiveRedisConnection getReactiveConnection() {
return new RedissonReactiveRedisConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
@Override
public ReactiveRedisClusterConnection getReactiveClusterConnection() {
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
}

@ -20,6 +20,7 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -39,12 +40,12 @@ import java.util.List;
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
this.subscribeService = subscribeService;
}
@ -67,7 +68,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}
@ -97,7 +98,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}

@ -80,7 +80,7 @@ public class RedissonReactiveClusterKeyCommandsTest {
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager()));
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
@AfterClass

@ -1581,7 +1581,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void subscribe(MessageListener listener, byte[]... channels) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(redisson.getCommandExecutor(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription.subscribe(channels);
}
@ -1602,7 +1602,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void pSubscribe(MessageListener listener, byte[]... patterns) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(redisson.getCommandExecutor(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription.pSubscribe(patterns);
}

@ -150,12 +150,12 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public ReactiveRedisConnection getReactiveConnection() {
return new RedissonReactiveRedisConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
@Override
public ReactiveRedisClusterConnection getReactiveClusterConnection() {
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
}

@ -20,6 +20,7 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -39,12 +40,12 @@ import java.util.List;
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
this.subscribeService = subscribeService;
}
@ -67,7 +68,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}
@ -97,7 +98,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}

@ -81,7 +81,7 @@ public class RedissonReactiveClusterKeyCommandsTest {
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager()));
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
@AfterClass

@ -1581,7 +1581,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void subscribe(MessageListener listener, byte[]... channels) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.subscribe(channels);
}
@ -1602,7 +1602,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void pSubscribe(MessageListener listener, byte[]... patterns) {
checkSubscription();
subscription = new RedissonSubscription(redisson.getConnectionManager(), redisson.getConnectionManager().getSubscribeService(), listener);
subscription = new RedissonSubscription(executorService, redisson.getConnectionManager().getSubscribeService(), listener);
subscription.pSubscribe(patterns);
}

@ -150,12 +150,12 @@ public class RedissonConnectionFactory implements RedisConnectionFactory,
@Override
public ReactiveRedisConnection getReactiveConnection() {
return new RedissonReactiveRedisConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
@Override
public ReactiveRedisClusterConnection getReactiveClusterConnection() {
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys)redisson.getKeys()).getConnectionManager()));
return new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
}

@ -20,6 +20,7 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -39,12 +40,12 @@ import java.util.List;
*/
public class RedissonSubscription extends AbstractSubscription {
private final ConnectionManager connectionManager;
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(ConnectionManager connectionManager, PublishSubscribeService subscribeService, MessageListener listener) {
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
this.subscribeService = subscribeService;
}
@ -67,7 +68,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}
@ -97,7 +98,7 @@ public class RedissonSubscription extends AbstractSubscription {
list.add(f);
}
for (RFuture<?> future : list) {
connectionManager.getCommandExecutor().syncSubscription(future);
commandExecutor.syncSubscription(future);
}
}

@ -81,7 +81,7 @@ public class RedissonReactiveClusterKeyCommandsTest {
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
redisson = Redisson.create(config);
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((RedissonKeys) redisson.getKeys()).getConnectionManager()));
connection = new RedissonReactiveRedisClusterConnection(new CommandReactiveService(((Redisson)redisson).getConnectionManager(), ((Redisson)redisson).getCommandExecutor().getObjectBuilder()));
}
@AfterClass

@ -17,13 +17,14 @@ package org.redisson;
import org.redisson.api.ClusterNode;
import org.redisson.api.ClusterNodesGroup;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
@Deprecated
public class RedisClusterNodes extends RedisNodes<ClusterNode> implements ClusterNodesGroup {
public RedisClusterNodes(ConnectionManager connectionManager) {
super(connectionManager);
public RedisClusterNodes(ConnectionManager connectionManager, CommandAsyncExecutor commandExecutor) {
super(connectionManager, commandExecutor);
}
}

@ -21,6 +21,7 @@ import org.redisson.api.NodesGroup;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RedisURI;
@ -44,9 +45,11 @@ import java.util.concurrent.TimeUnit;
public class RedisNodes<N extends Node> implements NodesGroup<N> {
final ConnectionManager connectionManager;
final CommandAsyncExecutor commandExecutor;
public RedisNodes(ConnectionManager connectionManager) {
public RedisNodes(ConnectionManager connectionManager, CommandAsyncExecutor commandExecutor) {
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
}
@Override
@ -56,13 +59,13 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
for (MasterSlaveEntry masterSlaveEntry : entries) {
if (masterSlaveEntry.getAllEntries().isEmpty()
&& RedisURI.compare(masterSlaveEntry.getClient().getAddr(), addr)) {
return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
return (N) new RedisClientEntry(masterSlaveEntry.getClient(), commandExecutor, NodeType.MASTER);
}
for (ClientConnectionsEntry entry : masterSlaveEntry.getAllEntries()) {
if (RedisURI.compare(entry.getClient().getAddr(), addr)
&& entry.getFreezeReason() != FreezeReason.MANAGER) {
return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType());
return (N) new RedisClientEntry(entry.getClient(), commandExecutor, entry.getNodeType());
}
}
}
@ -76,14 +79,14 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
for (MasterSlaveEntry masterSlaveEntry : entries) {
if (masterSlaveEntry.getAllEntries().isEmpty()
&& type == NodeType.MASTER) {
RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), commandExecutor, NodeType.MASTER);
result.add((N) entry);
}
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) {
if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER
&& slaveEntry.getNodeType() == type) {
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), commandExecutor, slaveEntry.getNodeType());
result.add((N) entry);
}
}
@ -98,13 +101,13 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
List<N> result = new ArrayList<N>();
for (MasterSlaveEntry masterSlaveEntry : entries) {
if (masterSlaveEntry.getAllEntries().isEmpty()) {
RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), commandExecutor, NodeType.MASTER);
result.add((N) masterEntry);
}
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) {
if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER) {
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), commandExecutor, slaveEntry.getNodeType());
result.add((N) entry);
}
}

@ -15,18 +15,16 @@
*/
package org.redisson;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.api.*;
import org.redisson.api.redisnode.*;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandSyncService;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.redisnode.RedissonClusterNodes;
import org.redisson.redisnode.RedissonMasterSlaveNodes;
import org.redisson.redisnode.RedissonSentinelMasterSlaveNodes;
@ -34,6 +32,10 @@ import org.redisson.redisnode.RedissonSingleNode;
import org.redisson.remote.ResponseEntry;
import org.redisson.transaction.RedissonTransaction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* Main infrastructure class allows to get access
* to all Redisson objects on top of Redis server.
@ -51,6 +53,7 @@ public class Redisson implements RedissonClient {
protected final EvictionScheduler evictionScheduler;
protected final WriteBehindService writeBehindService;
protected final ConnectionManager connectionManager;
protected final CommandAsyncExecutor commandExecutor;
protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = new ConcurrentHashMap<>();
protected final Config config;
@ -61,17 +64,22 @@ public class Redisson implements RedissonClient {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy, this);
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
writeBehindService = new WriteBehindService(connectionManager.getCommandExecutor());
connectionManager = ConfigSupport.createConnectionManager(configCopy);
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(this);
}
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
}
public EvictionScheduler getEvictionScheduler() {
return evictionScheduler;
}
public CommandExecutor getCommandExecutor() {
return connectionManager.getCommandExecutor();
public CommandAsyncExecutor getCommandExecutor() {
return commandExecutor;
}
public ConnectionManager getConnectionManager() {
@ -86,11 +94,7 @@ public class Redisson implements RedissonClient {
public static RedissonClient create() {
Config config = new Config();
config.useSingleServer()
.setTimeout(1000000)
.setAddress("redis://127.0.0.1:6379");
// config.useMasterSlaveConnection().setMasterAddress("127.0.0.1:6379").addSlaveAddress("127.0.0.1:6389").addSlaveAddress("127.0.0.1:6399");
// config.useSentinelConnection().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379");
// config.useClusterServers().addNodeAddress("127.0.0.1:7000");
return create(config);
}
@ -125,7 +129,7 @@ public class Redisson implements RedissonClient {
return new RedissonRx(config);
}
/**
* Create Reactive Redisson instance with default config
*
@ -149,189 +153,189 @@ public class Redisson implements RedissonClient {
@Override
public <V> RTimeSeries<V> getTimeSeries(String name) {
return new RedissonTimeSeries<>(evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonTimeSeries<>(evictionScheduler, commandExecutor, name);
}
@Override
public <V> RTimeSeries<V> getTimeSeries(String name, Codec codec) {
return new RedissonTimeSeries<>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonTimeSeries<>(codec, evictionScheduler, commandExecutor, name);
}
@Override
public <K, V> RStream<K, V> getStream(String name) {
return new RedissonStream<K, V>(connectionManager.getCommandExecutor(), name);
return new RedissonStream<K, V>(commandExecutor, name);
}
@Override
public <K, V> RStream<K, V> getStream(String name, Codec codec) {
return new RedissonStream<K, V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonStream<K, V>(codec, commandExecutor, name);
}
@Override
public RBinaryStream getBinaryStream(String name) {
return new RedissonBinaryStream(connectionManager.getCommandExecutor(), name);
return new RedissonBinaryStream(commandExecutor, name);
}
@Override
public <V> RGeo<V> getGeo(String name) {
return new RedissonGeo<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonGeo<V>(commandExecutor, name, this);
}
@Override
public <V> RGeo<V> getGeo(String name, Codec codec) {
return new RedissonGeo<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonGeo<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RBucket<V> getBucket(String name) {
return new RedissonBucket<V>(connectionManager.getCommandExecutor(), name);
return new RedissonBucket<V>(commandExecutor, name);
}
@Override
public RRateLimiter getRateLimiter(String name) {
return new RedissonRateLimiter(connectionManager.getCommandExecutor(), name);
return new RedissonRateLimiter(commandExecutor, name);
}
@Override
public <V> RBucket<V> getBucket(String name, Codec codec) {
return new RedissonBucket<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonBucket<V>(codec, commandExecutor, name);
}
@Override
public RBuckets getBuckets() {
return new RedissonBuckets(connectionManager.getCommandExecutor());
return new RedissonBuckets(commandExecutor);
}
@Override
public RBuckets getBuckets(Codec codec) {
return new RedissonBuckets(codec, connectionManager.getCommandExecutor());
return new RedissonBuckets(codec, commandExecutor);
}
@Override
public <V> RHyperLogLog<V> getHyperLogLog(String name) {
return new RedissonHyperLogLog<V>(connectionManager.getCommandExecutor(), name);
return new RedissonHyperLogLog<V>(commandExecutor, name);
}
@Override
public <V> RHyperLogLog<V> getHyperLogLog(String name, Codec codec) {
return new RedissonHyperLogLog<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonHyperLogLog<V>(codec, commandExecutor, name);
}
@Override
public <V> RList<V> getList(String name) {
return new RedissonList<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonList<V>(commandExecutor, name, this);
}
@Override
public <V> RList<V> getList(String name, Codec codec) {
return new RedissonList<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonList<V>(codec, commandExecutor, name, this);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name) {
return new RedissonListMultimap<K, V>(connectionManager.getCommandExecutor(), name);
return new RedissonListMultimap<K, V>(commandExecutor, name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimap<K, V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimap<K, V>(codec, commandExecutor, name);
}
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, LocalCachedMapOptions<K, V> options) {
return new RedissonLocalCachedMap<K, V>(connectionManager.getCommandExecutor(), name,
return new RedissonLocalCachedMap<K, V>(commandExecutor, name,
options, evictionScheduler, this, writeBehindService);
}
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions<K, V> options) {
return new RedissonLocalCachedMap<K, V>(codec, connectionManager.getCommandExecutor(), name,
return new RedissonLocalCachedMap<K, V>(codec, commandExecutor, name,
options, evictionScheduler, this, writeBehindService);
}
@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMap<K, V>(commandExecutor, name, this, null, null);
}
@Override
public <K, V> RMap<K, V> getMap(String name, MapOptions<K, V> options) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, options, writeBehindService);
return new RedissonMap<K, V>(commandExecutor, name, this, options, writeBehindService);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimap<K, V>(commandExecutor, name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimapCache<K, V>(evictionScheduler, commandExecutor, name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name, Codec codec) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, commandExecutor, name);
}
@Override
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimapCache<K, V>(evictionScheduler, commandExecutor, name);
}
@Override
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name, Codec codec) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, commandExecutor, name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimap<K, V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimap<K, V>(codec, commandExecutor, name);
}
@Override
public <V> RSetCache<V> getSetCache(String name) {
return new RedissonSetCache<V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this);
return new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, this);
}
@Override
public <V> RSetCache<V> getSetCache(String name, Codec codec) {
return new RedissonSetCache<V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this);
return new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, this);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, this, null, null);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, MapOptions<K, V> options) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, options, writeBehindService);
return new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, this, options, writeBehindService);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, this, null, null);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, options, writeBehindService);
return new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, this, options, writeBehindService);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMap<K, V>(codec, commandExecutor, name, this, null, null);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, options, writeBehindService);
return new RedissonMap<K, V>(codec, commandExecutor, name, this, options, writeBehindService);
}
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
return new RedissonLock(commandExecutor, name);
}
@Override
@ -341,7 +345,7 @@ public class Redisson implements RedissonClient {
@Override
public RLock getSpinLock(String name, LockOptions.BackOff backOff) {
return new RedissonSpinLock(connectionManager.getCommandExecutor(), name, backOff);
return new RedissonSpinLock(commandExecutor, name, backOff);
}
@Override
@ -356,32 +360,32 @@ public class Redisson implements RedissonClient {
@Override
public RLock getFairLock(String name) {
return new RedissonFairLock(connectionManager.getCommandExecutor(), name);
return new RedissonFairLock(commandExecutor, name);
}
@Override
public RReadWriteLock getReadWriteLock(String name) {
return new RedissonReadWriteLock(connectionManager.getCommandExecutor(), name);
return new RedissonReadWriteLock(commandExecutor, name);
}
@Override
public <V> RSet<V> getSet(String name) {
return new RedissonSet<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonSet<V>(commandExecutor, name, this);
}
@Override
public <V> RSet<V> getSet(String name, Codec codec) {
return new RedissonSet<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonSet<V>(codec, commandExecutor, name, this);
}
@Override
public RScript getScript() {
return new RedissonScript(connectionManager.getCommandExecutor());
return new RedissonScript(commandExecutor);
}
@Override
public RScript getScript(Codec codec) {
return new RedissonScript(connectionManager.getCommandExecutor(), codec);
return new RedissonScript(commandExecutor, codec);
}
@Override
@ -401,7 +405,7 @@ public class Redisson implements RedissonClient {
@Override
public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) {
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, options);
return new RedissonExecutorService(codec, commandExecutor, this, name, queueTransferService, responses, options);
}
@Override
@ -425,62 +429,62 @@ public class Redisson implements RedissonClient {
if (codec != connectionManager.getCodec()) {
executorId = executorId + ":" + name;
}
return new RedissonRemoteService(codec, name, connectionManager.getCommandExecutor(), executorId, responses);
return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses);
}
@Override
public <V> RSortedSet<V> getSortedSet(String name) {
return new RedissonSortedSet<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonSortedSet<V>(commandExecutor, name, this);
}
@Override
public <V> RSortedSet<V> getSortedSet(String name, Codec codec) {
return new RedissonSortedSet<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonSortedSet<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonScoredSortedSet<V>(commandExecutor, name, this);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name, Codec codec) {
return new RedissonScoredSortedSet<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonScoredSortedSet<V>(codec, commandExecutor, name, this);
}
@Override
public RLexSortedSet getLexSortedSet(String name) {
return new RedissonLexSortedSet(connectionManager.getCommandExecutor(), name, this);
return new RedissonLexSortedSet(commandExecutor, name, this);
}
@Override
public RTopic getTopic(String name) {
return new RedissonTopic(connectionManager.getCommandExecutor(), name);
return new RedissonTopic(commandExecutor, name);
}
@Override
public RTopic getTopic(String name, Codec codec) {
return new RedissonTopic(codec, connectionManager.getCommandExecutor(), name);
return new RedissonTopic(codec, commandExecutor, name);
}
@Override
public RReliableTopic getReliableTopic(String name) {
return new RedissonReliableTopic(connectionManager.getCommandExecutor(), name);
return new RedissonReliableTopic(commandExecutor, name);
}
@Override
public RReliableTopic getReliableTopic(String name, Codec codec) {
return new RedissonReliableTopic(codec, connectionManager.getCommandExecutor(), name);
return new RedissonReliableTopic(codec, commandExecutor, name);
}
@Override
public RPatternTopic getPatternTopic(String pattern) {
return new RedissonPatternTopic(connectionManager.getCommandExecutor(), pattern);
return new RedissonPatternTopic(commandExecutor, pattern);
}
@Override
public RPatternTopic getPatternTopic(String pattern, Codec codec) {
return new RedissonPatternTopic(codec, connectionManager.getCommandExecutor(), pattern);
return new RedissonPatternTopic(codec, commandExecutor, pattern);
}
@Override
@ -488,151 +492,151 @@ public class Redisson implements RedissonClient {
if (destinationQueue == null) {
throw new NullPointerException();
}
return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), connectionManager.getCommandExecutor(), destinationQueue.getName());
return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), commandExecutor, destinationQueue.getName());
}
@Override
public <V> RQueue<V> getQueue(String name) {
return new RedissonQueue<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonQueue<V>(commandExecutor, name, this);
}
@Override
public <V> RQueue<V> getQueue(String name, Codec codec) {
return new RedissonQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonQueue<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RTransferQueue<V> getTransferQueue(String name) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
return new RedissonTransferQueue<V>(connectionManager.getCommandExecutor(), name, service);
return new RedissonTransferQueue<V>(commandExecutor, name, service);
}
@Override
public <V> RTransferQueue<V> getTransferQueue(String name, Codec codec) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
return new RedissonTransferQueue<V>(codec, connectionManager.getCommandExecutor(), name, service);
return new RedissonTransferQueue<V>(codec, commandExecutor, name, service);
}
@Override
public <V> RRingBuffer<V> getRingBuffer(String name) {
return new RedissonRingBuffer<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonRingBuffer<V>(commandExecutor, name, this);
}
@Override
public <V> RRingBuffer<V> getRingBuffer(String name, Codec codec) {
return new RedissonRingBuffer<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonRingBuffer<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonBlockingQueue<V>(commandExecutor, name, this);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonBlockingQueue<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RBoundedBlockingQueue<V> getBoundedBlockingQueue(String name) {
return new RedissonBoundedBlockingQueue<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonBoundedBlockingQueue<V>(commandExecutor, name, this);
}
@Override
public <V> RBoundedBlockingQueue<V> getBoundedBlockingQueue(String name, Codec codec) {
return new RedissonBoundedBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonBoundedBlockingQueue<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RDeque<V> getDeque(String name) {
return new RedissonDeque<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonDeque<V>(commandExecutor, name, this);
}
@Override
public <V> RDeque<V> getDeque(String name, Codec codec) {
return new RedissonDeque<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonDeque<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name) {
return new RedissonBlockingDeque<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonBlockingDeque<V>(commandExecutor, name, this);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name, Codec codec) {
return new RedissonBlockingDeque<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonBlockingDeque<V>(codec, commandExecutor, name, this);
};
@Override
public RAtomicLong getAtomicLong(String name) {
return new RedissonAtomicLong(connectionManager.getCommandExecutor(), name);
return new RedissonAtomicLong(commandExecutor, name);
}
@Override
public RLongAdder getLongAdder(String name) {
return new RedissonLongAdder(connectionManager.getCommandExecutor(), name, this);
return new RedissonLongAdder(commandExecutor, name, this);
}
@Override
public RDoubleAdder getDoubleAdder(String name) {
return new RedissonDoubleAdder(connectionManager.getCommandExecutor(), name, this);
return new RedissonDoubleAdder(commandExecutor, name, this);
}
@Override
public RAtomicDouble getAtomicDouble(String name) {
return new RedissonAtomicDouble(connectionManager.getCommandExecutor(), name);
return new RedissonAtomicDouble(commandExecutor, name);
}
@Override
public RCountDownLatch getCountDownLatch(String name) {
return new RedissonCountDownLatch(connectionManager.getCommandExecutor(), name);
return new RedissonCountDownLatch(commandExecutor, name);
}
@Override
public RBitSet getBitSet(String name) {
return new RedissonBitSet(connectionManager.getCommandExecutor(), name);
return new RedissonBitSet(commandExecutor, name);
}
@Override
public RSemaphore getSemaphore(String name) {
return new RedissonSemaphore(connectionManager.getCommandExecutor(), name);
return new RedissonSemaphore(commandExecutor, name);
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(String name) {
return new RedissonPermitExpirableSemaphore(connectionManager.getCommandExecutor(), name);
return new RedissonPermitExpirableSemaphore(commandExecutor, name);
}
@Override
public <V> RBloomFilter<V> getBloomFilter(String name) {
return new RedissonBloomFilter<V>(connectionManager.getCommandExecutor(), name);
return new RedissonBloomFilter<V>(commandExecutor, name);
}
@Override
public <V> RBloomFilter<V> getBloomFilter(String name, Codec codec) {
return new RedissonBloomFilter<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonBloomFilter<V>(codec, commandExecutor, name);
}
@Override
public RIdGenerator getIdGenerator(String name) {
return new RedissonIdGenerator(connectionManager.getCommandExecutor(), name);
return new RedissonIdGenerator(commandExecutor, name);
}
@Override
public RKeys getKeys() {
return new RedissonKeys(connectionManager.getCommandExecutor());
return new RedissonKeys(commandExecutor);
}
@Override
public RTransaction createTransaction(TransactionOptions options) {
return new RedissonTransaction(connectionManager.getCommandExecutor(), options);
return new RedissonTransaction(commandExecutor, options);
}
@Override
public RBatch createBatch(BatchOptions options) {
return new RedissonBatch(evictionScheduler, connectionManager, options);
return new RedissonBatch(evictionScheduler, commandExecutor, options);
}
@Override
@ -642,7 +646,7 @@ public class Redisson implements RedissonClient {
@Override
public RLiveObjectService getLiveObjectService() {
return new RedissonLiveObjectService(liveObjectClassCache, connectionManager);
return new RedissonLiveObjectService(liveObjectClassCache, commandExecutor);
}
@Override
@ -667,32 +671,32 @@ public class Redisson implements RedissonClient {
if (config.isSentinelConfig() || config.isClusterConfig()) {
throw new IllegalArgumentException("Can't be used in non Redis single configuration");
}
return (T) new RedissonSingleNode(connectionManager);
return (T) new RedissonSingleNode(connectionManager, commandExecutor);
}
if (nodes.getClazz() == RedisCluster.class) {
if (!config.isClusterConfig()) {
throw new IllegalArgumentException("Can't be used in non Redis Cluster configuration");
}
return (T) new RedissonClusterNodes(connectionManager);
return (T) new RedissonClusterNodes(connectionManager, commandExecutor);
}
if (nodes.getClazz() == RedisSentinelMasterSlave.class) {
if (!config.isSentinelConfig()) {
throw new IllegalArgumentException("Can't be used in non Redis Sentinel configuration");
}
return (T) new RedissonSentinelMasterSlaveNodes(connectionManager);
return (T) new RedissonSentinelMasterSlaveNodes(connectionManager, commandExecutor);
}
if (nodes.getClazz() == RedisMasterSlave.class) {
if (config.isSentinelConfig() || config.isClusterConfig()) {
throw new IllegalArgumentException("Can't be used in non Redis Master Slave configuration");
}
return (T) new RedissonMasterSlaveNodes(connectionManager);
return (T) new RedissonMasterSlaveNodes(connectionManager, commandExecutor);
}
throw new IllegalArgumentException();
}
@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);
return new RedisNodes<Node>(connectionManager, commandExecutor);
}
@Override
@ -700,7 +704,7 @@ public class Redisson implements RedissonClient {
if (!connectionManager.isClusterMode()) {
throw new IllegalStateException("Redisson is not in cluster mode!");
}
return new RedisClusterNodes(connectionManager);
return new RedisClusterNodes(connectionManager, commandExecutor);
}
@Override
@ -715,43 +719,43 @@ public class Redisson implements RedissonClient {
@Override
public <V> RPriorityQueue<V> getPriorityQueue(String name) {
return new RedissonPriorityQueue<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonPriorityQueue<V>(commandExecutor, name, this);
}
@Override
public <V> RPriorityQueue<V> getPriorityQueue(String name, Codec codec) {
return new RedissonPriorityQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonPriorityQueue<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name) {
return new RedissonPriorityBlockingQueue<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonPriorityBlockingQueue<V>(commandExecutor, name, this);
}
@Override
public <V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name, Codec codec) {
return new RedissonPriorityBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonPriorityBlockingQueue<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RPriorityBlockingDeque<V> getPriorityBlockingDeque(String name) {
return new RedissonPriorityBlockingDeque<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonPriorityBlockingDeque<V>(commandExecutor, name, this);
}
@Override
public <V> RPriorityBlockingDeque<V> getPriorityBlockingDeque(String name, Codec codec) {
return new RedissonPriorityBlockingDeque<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonPriorityBlockingDeque<V>(codec, commandExecutor, name, this);
}
@Override
public <V> RPriorityDeque<V> getPriorityDeque(String name) {
return new RedissonPriorityDeque<V>(connectionManager.getCommandExecutor(), name, this);
return new RedissonPriorityDeque<V>(commandExecutor, name, this);
}
@Override
public <V> RPriorityDeque<V> getPriorityDeque(String name, Codec codec) {
return new RedissonPriorityDeque<V>(codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonPriorityDeque<V>(codec, commandExecutor, name, this);
}
@Override

@ -17,8 +17,8 @@ package org.redisson;
import org.redisson.api.*;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
/**
@ -32,8 +32,8 @@ public class RedissonBatch implements RBatch {
private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService;
public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, BatchOptions options) {
this.executorService = new CommandBatchService(connectionManager.getCommandExecutor(), options);
public RedissonBatch(EvictionScheduler evictionScheduler, CommandAsyncExecutor executor, BatchOptions options) {
this.executorService = new CommandBatchService(executor, options);
this.evictionScheduler = evictionScheduler;
}

@ -28,30 +28,25 @@
*/
package org.redisson;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import org.redisson.api.RBitSetAsync;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RFuture;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.codec.*;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.Hash;
import io.netty.buffer.ByteBuf;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Bloom filter based on Highway 128-bit hash.
@ -65,16 +60,16 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
private volatile long size;
private volatile int hashIterations;
private final CommandExecutor commandExecutor;
private final CommandAsyncExecutor commandExecutor;
private String configName;
protected RedissonBloomFilter(CommandExecutor commandExecutor, String name) {
protected RedissonBloomFilter(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.configName = suffixName(getName(), "config");
}
protected RedissonBloomFilter(Codec codec, CommandExecutor commandExecutor, String name) {
protected RedissonBloomFilter(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
this.commandExecutor = commandExecutor;
this.configName = suffixName(getName(), "config");
@ -305,25 +300,25 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override
public long getExpectedInsertions() {
Long result = commandExecutor.read(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "expectedInsertions");
Long result = get(commandExecutor.readAsync(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "expectedInsertions"));
return check(result);
}
@Override
public double getFalseProbability() {
Double result = commandExecutor.read(configName, DoubleCodec.INSTANCE, RedisCommands.HGET, configName, "falseProbability");
Double result = get(commandExecutor.readAsync(configName, DoubleCodec.INSTANCE, RedisCommands.HGET, configName, "falseProbability"));
return check(result);
}
@Override
public long getSize() {
Long result = commandExecutor.read(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "size");
Long result = get(commandExecutor.readAsync(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "size"));
return check(result);
}
@Override
public int getHashIterations() {
Integer result = commandExecutor.read(configName, IntegerCodec.INSTANCE, RedisCommands.HGET, configName, "hashIterations");
Integer result = get(commandExecutor.readAsync(configName, IntegerCodec.INSTANCE, RedisCommands.HGET, configName, "hashIterations"));
return check(result);
}

@ -15,13 +15,6 @@
*/
package org.redisson;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.redisson.api.RBoundedBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -29,11 +22,18 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* <p>Distributed and concurrent implementation of bounded {@link java.util.concurrent.BlockingQueue}.
*
@ -41,14 +41,14 @@ import org.redisson.misc.RedissonPromise;
*/
public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements RBoundedBlockingQueue<V> {
private final CommandExecutor commandExecutor;
private final CommandAsyncExecutor commandExecutor;
protected RedissonBoundedBlockingQueue(CommandExecutor commandExecutor, String name, RedissonClient redisson) {
protected RedissonBoundedBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
this.commandExecutor = commandExecutor;
}
protected RedissonBoundedBlockingQueue(Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) {
protected RedissonBoundedBlockingQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
this.commandExecutor = commandExecutor;
}
@ -340,14 +340,14 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public void clear() {
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
commandExecutor.evalWrite(getName(), codec, RedisCommands.EVAL_BOOLEAN,
get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local len = redis.call('llen', KEYS[1]); " +
"if len > 0 then "
+ "redis.call('del', KEYS[1]); "
+ "local value = redis.call('incrby', KEYS[2], len); " +
"redis.call('publish', KEYS[3], value); "
+ "end; ",
Arrays.<Object>asList(getName(), getSemaphoreName(), channelName));
Arrays.<Object>asList(getName(), getSemaphoreName(), channelName)));
}

@ -23,7 +23,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.*;
import org.redisson.executor.params.*;
@ -60,7 +60,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
public static final int SHUTDOWN_STATE = 1;
public static final int TERMINATED_STATE = 2;
private final CommandExecutor commandExecutor;
private final CommandAsyncExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final Codec codec;
private final Redisson redisson;
@ -102,8 +102,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<>();
private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(new ConcurrentHashMap<>());
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson,
String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, ExecutorOptions options) {
public RedissonExecutorService(Codec codec, CommandAsyncExecutor commandExecutor, Redisson redisson,
String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, ExecutorOptions options) {
super();
this.codec = codec;
this.commandExecutor = commandExecutor;
@ -119,7 +119,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
this.executorId = connectionManager.getId() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
}
remoteService = new RedissonExecutorRemoteService(codec, name, connectionManager.getCommandExecutor(), executorId, responses);
remoteService = new RedissonExecutorRemoteService(codec, name, commandExecutor, executorId, responses);
requestQueueName = remoteService.getRequestQueueName(RemoteExecutorService.class);
responseQueueName = remoteService.getResponseQueueName(executorId);
String objectName = requestQueueName;
@ -474,7 +474,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
remoteService.deregister(RemoteExecutorService.class);
workersTopic.removeListener(workersGroupListenerId);
commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
commandExecutor.get(commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"if redis.call('exists', KEYS[2]) == 0 then "
+ "if redis.call('get', KEYS[1]) == '0' or redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[2], ARGV[2]);"
@ -484,7 +484,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "end;"
+ "end;",
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopic.getChannelNames().get(0), tasksRetryIntervalName),
SHUTDOWN_STATE, TERMINATED_STATE);
SHUTDOWN_STATE, TERMINATED_STATE));
}
@Override
@ -524,13 +524,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
private boolean checkState(int state) {
return commandExecutor.evalWrite(getName(), codec, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[1]) == 1 and tonumber(redis.call('get', KEYS[1])) >= tonumber(ARGV[1]) then "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(statusName),
state);
state));
}
@Override

@ -64,6 +64,10 @@ public class RedissonKeys implements RKeys {
this.commandExecutor = commandExecutor;
}
public CommandAsyncExecutor getCommandExecutor() {
return commandExecutor;
}
public ConnectionManager getConnectionManager() {
return commandExecutor.getConnectionManager();
}

@ -61,7 +61,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor);
}
@Override

@ -38,7 +38,6 @@ import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.LiveObjectSearch;
import org.redisson.liveobject.LiveObjectTemplate;
import org.redisson.liveobject.core.*;
@ -59,15 +58,16 @@ import java.util.stream.Collectors;
public class RedissonLiveObjectService implements RLiveObjectService {
private static final ConcurrentMap<Class<? extends RIdResolver<?>>, RIdResolver<?>> PROVIDER_CACHE = new ConcurrentHashMap<>();
private final ConcurrentMap<Class<?>, Class<?>> classCache;
private final ConnectionManager connectionManager;
private final CommandAsyncExecutor commandExecutor;
private final LiveObjectSearch seachEngine;
public RedissonLiveObjectService(ConcurrentMap<Class<?>, Class<?>> classCache,
ConnectionManager connectionManager) {
CommandAsyncExecutor commandExecutor) {
this.classCache = classCache;
this.connectionManager = connectionManager;
this.seachEngine = new LiveObjectSearch(connectionManager.getCommandExecutor());
this.commandExecutor = commandExecutor;
this.seachEngine = new LiveObjectSearch(commandExecutor);
}
//TODO: Add ttl renewal functionality
@ -89,7 +89,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
RId annotation = ClassUtils.getDeclaredField(entityClass, idFieldName)
.getAnnotation(RId.class);
RIdResolver<?> resolver = getResolver(annotation.generator());
Object id = resolver.resolve(entityClass, annotation, idFieldName, connectionManager.getCommandExecutor());
Object id = resolver.resolve(entityClass, annotation, idFieldName, commandExecutor);
return id;
}
@ -175,7 +175,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
@Override
public <T> List<T> persist(T... detachedObjects) {
CommandBatchService commandExecutor = new CommandBatchService(connectionManager.getCommandExecutor());
CommandBatchService batchService = new CommandBatchService(commandExecutor);
Map<Class<?>, Class<?>> classCache = new HashMap<>();
Map<T, Object> detached2Attached = new LinkedHashMap<>();
@ -184,14 +184,14 @@ public class RedissonLiveObjectService implements RLiveObjectService {
for (T detachedObject : detachedObjects) {
Object id = getId(detachedObject);
T attachedObject = attach(detachedObject, commandExecutor, classCache);
T attachedObject = attach(detachedObject, batchService, classCache);
RMap<String, Object> liveMap = getMap(attachedObject);
detached2Attached.put(detachedObject, attachedObject);
name2id.put(liveMap.getName(), id);
}
CommandBatchService checkExecutor = new CommandBatchService(connectionManager.getCommandExecutor());
CommandBatchService checkExecutor = new CommandBatchService(batchService);
for (Entry<String, Object> entry : name2id.entrySet()) {
RMap<String, Object> map = new RedissonMap<>(checkExecutor, entry.getKey(), null, null, null);
map.containsKeyAsync("redisson_live_object");
@ -228,7 +228,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
ClassIntrospector.get().reset();
commandExecutor.execute();
batchService.execute();
return new ArrayList<>(detached2Attached.keySet());
}
@ -268,9 +268,9 @@ public class RedissonLiveObjectService implements RLiveObjectService {
continue;
}
RObject rObject = connectionManager.getCommandExecutor().getObjectBuilder().createObject(id, detachedObject.getClass(), object.getClass(), field.getName());
RObject rObject = commandExecutor.getObjectBuilder().createObject(id, detachedObject.getClass(), object.getClass(), field.getName());
if (rObject != null) {
connectionManager.getCommandExecutor().getObjectBuilder().store(rObject, field.getName(), liveMap);
commandExecutor.getObjectBuilder().store(rObject, field.getName(), liveMap);
if (rObject instanceof SortedSet) {
((RSortedSet) rObject).trySetComparator(((SortedSet) object).comparator());
}
@ -561,11 +561,11 @@ public class RedissonLiveObjectService implements RLiveObjectService {
@Override
public <T> long delete(Class<T> entityClass, Object... ids) {
CommandBatchService ce = new CommandBatchService(connectionManager.getCommandExecutor());
CommandBatchService ce = new CommandBatchService(commandExecutor);
FieldList<InDefinedShape> fields = Introspectior.getFieldsWithAnnotation(entityClass.getSuperclass(), RIndex.class);
Set<String> fieldNames = fields.stream().map(f -> f.getName()).collect(Collectors.toSet());
NamingScheme namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(entityClass);
NamingScheme namingScheme = commandExecutor.getObjectBuilder().getNamingScheme(entityClass);
for (Object id: ids) {
delete(id, entityClass, namingScheme, ce, fieldNames);
}
@ -580,7 +580,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
String mapName = namingScheme.getName(entityClass, id);
Object liveObjectId = namingScheme.resolveId(mapName);
RMap<String, Object> liveMap = new RedissonMap<>(namingScheme.getCodec(), connectionManager.getCommandExecutor(),
RMap<String, Object> liveMap = new RedissonMap<>(namingScheme.getCodec(), commandExecutor,
mapName, null, null, null);
Map<String, ?> values = liveMap.getAll(fieldNames);
for (String fieldName : fieldNames) {
@ -614,9 +614,9 @@ public class RedissonLiveObjectService implements RLiveObjectService {
@Override
public <K> Iterable<K> findIds(Class<?> entityClass, int count) {
NamingScheme namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(entityClass);
NamingScheme namingScheme = commandExecutor.getObjectBuilder().getNamingScheme(entityClass);
String pattern = namingScheme.getNamePattern(entityClass);
RedissonKeys keys = new RedissonKeys(connectionManager.getCommandExecutor());
RedissonKeys keys = new RedissonKeys(commandExecutor);
RedisCommand<ListScanResult<String>> command = new RedisCommand<>("SCAN",
new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder<Object>()), new Convertor<Object>() {
@ -656,7 +656,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
public void registerClass(Class<?> cls) {
if (!classCache.containsKey(cls)) {
validateClass(cls);
Class<?> proxyClass = createProxy(cls, connectionManager.getCommandExecutor());
Class<?> proxyClass = createProxy(cls, commandExecutor);
classCache.putIfAbsent(cls, proxyClass);
}
}
@ -787,8 +787,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
.withBinders(FieldProxy.Binder
.install(LiveObjectInterceptor.Getter.class,
LiveObjectInterceptor.Setter.class))
.to(new LiveObjectInterceptor(commandExecutor, connectionManager,
this, entityClass, getRIdFieldName(entityClass))))
.to(new LiveObjectInterceptor(commandExecutor, this, entityClass, getRIdFieldName(entityClass))))
// .intercept(MethodDelegation.to(
// new LiveObjectInterceptor(redisson, codecProvider, entityClass,
// getRIdFieldName(entityClass)))
@ -826,7 +825,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
.and(ElementMatchers.isPublic()
.or(ElementMatchers.isProtected()))
)
.intercept(MethodDelegation.to(new AccessorInterceptor(commandExecutor, connectionManager)))
.intercept(MethodDelegation.to(new AccessorInterceptor(commandExecutor)))
.make().load(entityClass.getClassLoader(),
ClassLoadingStrategy.Default.WRAPPER)

@ -98,7 +98,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public <KOut, VOut> RMapReduce<K, V, KOut, VOut> mapReduce() {
return new RedissonMapReduce<>(this, redisson, commandExecutor.getConnectionManager());
return new RedissonMapReduce<>(this, redisson, commandExecutor);
}
@Override

@ -15,20 +15,20 @@
*/
package org.redisson;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.redisson.api.RFuture;
import org.redisson.api.RPriorityBlockingDeque;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* <p>Distributed and concurrent implementation of priority blocking deque.
*
@ -41,12 +41,12 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
private final RedissonPriorityBlockingQueue<V> blockingQueue;
protected RedissonPriorityBlockingDeque(CommandExecutor commandExecutor, String name, RedissonClient redisson) {
protected RedissonPriorityBlockingDeque(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
blockingQueue = (RedissonPriorityBlockingQueue<V>) redisson.getPriorityBlockingQueue(name);
}
protected RedissonPriorityBlockingDeque(Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) {
protected RedissonPriorityBlockingDeque(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
blockingQueue = (RedissonPriorityBlockingQueue<V>) redisson.getPriorityBlockingQueue(name, codec);

@ -15,13 +15,6 @@
*/
package org.redisson;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.redisson.api.RFuture;
import org.redisson.api.RPriorityBlockingQueue;
import org.redisson.api.RedissonClient;
@ -29,11 +22,18 @@ import org.redisson.client.RedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.PriorityBlockingQueue}.
*
@ -44,11 +44,11 @@ import org.redisson.misc.RedissonPromise;
*/
public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> implements RPriorityBlockingQueue<V> {
protected RedissonPriorityBlockingQueue(CommandExecutor commandExecutor, String name, RedissonClient redisson) {
protected RedissonPriorityBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
}
protected RedissonPriorityBlockingQueue(Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) {
protected RedissonPriorityBlockingQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
}

@ -22,7 +22,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import java.util.Collections;
import java.util.Iterator;
@ -42,11 +42,11 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());
protected RedissonPriorityDeque(CommandExecutor commandExecutor, String name, RedissonClient redisson) {
protected RedissonPriorityDeque(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
}
public RedissonPriorityDeque(Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) {
public RedissonPriorityDeque(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
}

@ -20,7 +20,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -69,12 +69,12 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
private Comparator comparator = Comparator.naturalOrder();
CommandExecutor commandExecutor;
CommandAsyncExecutor commandExecutor;
RLock lock;
private RBucket<String> comparatorHolder;
public RedissonPriorityQueue(CommandExecutor commandExecutor, String name, RedissonClient redisson) {
public RedissonPriorityQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
this.commandExecutor = commandExecutor;
@ -82,7 +82,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}");
}
public RedissonPriorityQueue(Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) {
public RedissonPriorityQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
this.commandExecutor = commandExecutor;
@ -143,7 +143,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
@Override
public boolean contains(Object o) {
checkComparator();
return binarySearch((V) o, codec).getIndex() >= 0;
return binarySearch((V) o).getIndex() >= 0;
}
@Override
@ -153,7 +153,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
try {
checkComparator();
BinarySearchResult<V> res = binarySearch(value, codec);
BinarySearchResult<V> res = binarySearch(value);
int index = 0;
if (res.getIndex() < 0) {
index = -(res.getIndex() + 1);
@ -161,7 +161,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
index = res.getIndex() + 1;
}
commandExecutor.evalWrite(getName(), RedisCommands.EVAL_VOID,
commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local len = redis.call('llen', KEYS[1]);"
+ "if tonumber(ARGV[1]) < len then "
+ "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);"
@ -195,7 +195,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
try {
checkComparator();
BinarySearchResult<V> res = binarySearch((V) value, codec);
BinarySearchResult<V> res = binarySearch((V) value);
if (res.getIndex() < 0) {
return false;
}
@ -211,7 +211,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
public boolean containsAll(Collection<?> c) {
checkComparator();
for (Object object : c) {
if (binarySearch((V) object, codec).getIndex() < 0) {
if (binarySearch((V) object).getIndex() < 0) {
return false;
}
}
@ -361,7 +361,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
}
// TODO optimize: get three values each time instead of single
public BinarySearchResult<V> binarySearch(V value, Codec codec) {
public BinarySearchResult<V> binarySearch(V value) {
int size = size();
int upperIndex = size - 1;
int lowerIndex = 0;

@ -15,15 +15,15 @@
*/
package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.redisson.api.RFuture;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
/**
*
* @author Nikita Koksharov
@ -35,7 +35,7 @@ public class RedissonQueueSemaphore extends RedissonSemaphore {
private Object value;
private Collection<?> values;
public RedissonQueueSemaphore(CommandExecutor commandExecutor, String name) {
public RedissonQueueSemaphore(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}

@ -26,6 +26,7 @@ import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.reactive.*;
import org.redisson.remote.ResponseEntry;
@ -42,16 +43,18 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final EvictionScheduler evictionScheduler;
protected final CommandReactiveService commandExecutor;
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected RedissonReactive(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy, this);
commandExecutor = new CommandReactiveService(connectionManager);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(this);
}
commandExecutor = new CommandReactiveService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
}
@ -480,12 +483,12 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public Config getConfig() {
return config;
return connectionManager.getCfg();
}
@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);
return new RedisNodes<Node>(connectionManager, commandExecutor);
}
@Override
@ -493,7 +496,7 @@ public class RedissonReactive implements RedissonReactiveClient {
if (!connectionManager.isClusterMode()) {
throw new IllegalStateException("Redisson not in cluster mode!");
}
return new RedisNodes<ClusterNode>(connectionManager);
return new RedisNodes<ClusterNode>(connectionManager, commandExecutor);
}
@Override
@ -563,7 +566,7 @@ public class RedissonReactive implements RedissonReactiveClient {
public <V> RTransferQueueReactive<V> getTransferQueue(String name) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(connectionManager.getCommandExecutor(), name, service);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(commandExecutor, name, service);
return ReactiveProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueReactive<V>(queue), RTransferQueueReactive.class);
}
@ -572,7 +575,7 @@ public class RedissonReactive implements RedissonReactiveClient {
public <V> RTransferQueueReactive<V> getTransferQueue(String name, Codec codec) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(codec, connectionManager.getCommandExecutor(), name, service);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(codec, commandExecutor, name, service);
return ReactiveProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueReactive<V>(queue), RTransferQueueReactive.class);
}

@ -21,6 +21,7 @@ import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.remote.ResponseEntry;
import org.redisson.rx.*;
@ -40,16 +41,18 @@ public class RedissonRx implements RedissonRxClient {
protected final EvictionScheduler evictionScheduler;
protected final CommandRxExecutor commandExecutor;
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected RedissonRx(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy, this);
commandExecutor = new CommandRxService(connectionManager);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
RedissonObjectBuilder objectBuilder = null;
if (connectionManager.getCfg().isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(this);
}
commandExecutor = new CommandRxService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
}
@ -485,12 +488,12 @@ public class RedissonRx implements RedissonRxClient {
@Override
public Config getConfig() {
return config;
return connectionManager.getCfg();
}
@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);
return new RedisNodes<Node>(connectionManager, commandExecutor);
}
@Override
@ -498,7 +501,7 @@ public class RedissonRx implements RedissonRxClient {
if (!connectionManager.isClusterMode()) {
throw new IllegalStateException("Redisson not in cluster mode!");
}
return new RedisNodes<ClusterNode>(connectionManager);
return new RedisNodes<ClusterNode>(connectionManager, commandExecutor);
}
@Override
@ -569,7 +572,7 @@ public class RedissonRx implements RedissonRxClient {
public <V> RTransferQueueRx<V> getTransferQueue(String name) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(connectionManager.getCommandExecutor(), name, service);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(commandExecutor, name, service);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueRx<V>(queue), RTransferQueueRx.class);
}
@ -578,7 +581,7 @@ public class RedissonRx implements RedissonRxClient {
public <V> RTransferQueueRx<V> getTransferQueue(String name, Codec codec) {
String remoteName = RedissonObject.suffixName(name, "remoteService");
RRemoteService service = getRemoteService(remoteName);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(codec, connectionManager.getCommandExecutor(), name, service);
RedissonTransferQueue<V> queue = new RedissonTransferQueue<V>(codec, commandExecutor, name, service);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonTransferQueueRx<V>(queue), RTransferQueueRx.class);
}

@ -71,7 +71,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor);
}
@Override

@ -66,7 +66,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor);
}
@Override

@ -89,7 +89,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
return new RedissonCollectionMapReduce<>(this, redisson, commandExecutor);
}
@Override

@ -21,7 +21,7 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -69,14 +69,14 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
private Comparator comparator = Comparator.naturalOrder();
CommandExecutor commandExecutor;
CommandAsyncExecutor commandExecutor;
private RLock lock;
private RedissonList<V> list;
private RBucket<String> comparatorHolder;
private RedissonClient redisson;
protected RedissonSortedSet(CommandExecutor commandExecutor, String name, RedissonClient redisson) {
protected RedissonSortedSet(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.redisson = redisson;
@ -86,7 +86,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
list = (RedissonList<V>) redisson.<V>getList(getName());
}
public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name, Redisson redisson) {
public RedissonSortedSet(Codec codec, CommandAsyncExecutor commandExecutor, String name, Redisson redisson) {
super(codec, commandExecutor, name);
this.commandExecutor = commandExecutor;
@ -97,7 +97,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor);
}
private void loadComparator() {
@ -196,14 +196,14 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
ByteBuf encodedValue = encode(value);
commandExecutor.evalWrite(getName(), RedisCommands.EVAL_VOID,
commandExecutor.get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local len = redis.call('llen', KEYS[1]);"
+ "if tonumber(ARGV[1]) < len then "
+ "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);"
+ "redis.call('linsert', KEYS[1], 'before', pivot, ARGV[2]);"
+ "return;"
+ "end;"
+ "redis.call('rpush', KEYS[1], ARGV[2]);", Arrays.<Object>asList(getName()), index, encodedValue);
+ "redis.call('rpush', KEYS[1], ARGV[2]);", Arrays.<Object>asList(getName()), index, encodedValue));
return true;
} else {
return false;
@ -374,14 +374,14 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
String className = comparator.getClass().getName();
final String comparatorSign = className + ":" + calcClassSign(className);
Boolean res = commandExecutor.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
Boolean res = commandExecutor.get(commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('llen', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[2], ARGV[1]); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(), getComparatorKeyName()), comparatorSign);
Arrays.<Object>asList(getName(), getComparatorKeyName()), comparatorSign));
if (res) {
this.comparator = comparator;
}

@ -37,7 +37,6 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
@ -46,7 +45,9 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
@ -77,8 +78,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
super(config, id, objectBuilder);
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
if (cfg.getNodeAddresses().isEmpty()) {
throw new IllegalArgumentException("At least one cluster node should be defined!");

@ -181,9 +181,8 @@ public class RedisExecutor<V, R> {
if (connectionFuture.cancel(false)) {
if (exception == null) {
exception = new RedisTimeoutException("Unable to acquire connection! " +
"Avoid to use blocking commands in Async/JavaRx/Reactive handlers. " +
"Try to increase connection pool size. "
exception = new RedisTimeoutException("Unable to acquire connection! " + connectionFuture +
"Increase connection pool size. "
+ "Node source: " + source
+ ", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts");
@ -204,7 +203,6 @@ public class RedisExecutor<V, R> {
}
exception = new RedisTimeoutException("Command still hasn't been written into connection! " +
"Avoid to use blocking commands in Async/JavaRx/Reactive handlers. " +
"Try to increase nettyThreads setting. Payload size in bytes: " + totalSize
+ ". Node source: " + source + ", connection: " + connectionFuture.getNow()
+ ", command: " + LogHelper.toString(command, params)

@ -28,14 +28,14 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.redisson.api.*;
import org.redisson.api.NatMapper;
import org.redisson.api.RedissonNodeInitializer;
import org.redisson.client.NettyHook;
import org.redisson.client.codec.Codec;
import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.connection.*;
import org.redisson.connection.balancer.LoadBalancer;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import java.io.*;
import java.net.URL;
@ -178,51 +178,24 @@ public class ConfigSupport {
return yamlMapper.writeValueAsString(config);
}
public static ConnectionManager createConnectionManager(Config config, RedissonRxClient redisson) {
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(redisson);
}
return createConnectionManager(config, objectBuilder);
}
public static ConnectionManager createConnectionManager(Config config, RedissonReactiveClient redisson) {
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(redisson);
}
return createConnectionManager(config, objectBuilder);
}
public static ConnectionManager createConnectionManager(Config config, RedissonClient redisson) {
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(redisson);
}
return createConnectionManager(config, objectBuilder);
}
public static ConnectionManager createConnectionManager(Config configCopy, RedissonObjectBuilder objectBuilder) {
public static ConnectionManager createConnectionManager(Config configCopy) {
UUID id = UUID.randomUUID();
if (configCopy.getMasterSlaveServersConfig() != null) {
validate(configCopy.getMasterSlaveServersConfig());
return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id, objectBuilder);
return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
} else if (configCopy.getSingleServerConfig() != null) {
validate(configCopy.getSingleServerConfig());
return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id, objectBuilder);
return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
} else if (configCopy.getSentinelServersConfig() != null) {
validate(configCopy.getSentinelServersConfig());
return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id, objectBuilder);
return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);
} else if (configCopy.getClusterServersConfig() != null) {
validate(configCopy.getClusterServersConfig());
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id, objectBuilder);
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
} else if (configCopy.getReplicatedServersConfig() != null) {
validate(configCopy.getReplicatedServersConfig());
return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id, objectBuilder);
return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);
} else if (configCopy.getConnectionManager() != null) {
return configCopy.getConnectionManager();
}else {

@ -15,11 +15,10 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import org.redisson.ElementsSubscribeService;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
@ -28,17 +27,16 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandSyncService;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RedisURI;
import org.redisson.pubsub.PublishSubscribeService;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
@ -51,8 +49,6 @@ public interface ConnectionManager {
String getId();
CommandSyncService getCommandExecutor();
ElementsSubscribeService getElementsSubscribeService();
PublishSubscribeService getSubscribeService();

@ -15,41 +15,6 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import io.netty.util.*;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import org.redisson.ElementsSubscribeService;
import org.redisson.Version;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandSyncService;
import org.redisson.config.*;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CountableListener;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.PublishSubscribeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
@ -64,11 +29,35 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.DefaultAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProviders;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.*;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import org.redisson.ElementsSubscribeService;
import org.redisson.Version;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.*;
import org.redisson.misc.*;
import org.redisson.pubsub.PublishSubscribeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
*
@ -136,8 +125,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final ExecutorService executor;
private final CommandSyncService commandExecutor;
private final Config cfg;
protected final AddressResolverGroup<InetSocketAddress> resolverGroup;
@ -148,15 +135,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
this(config, id, objectBuilder);
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
this(config, id);
this.config = cfg;
initTimer(cfg);
initSingleEntry();
}
protected MasterSlaveConnectionManager(Config cfg, UUID id, RedissonObjectBuilder objectBuilder) {
protected MasterSlaveConnectionManager(Config cfg, UUID id) {
this.id = id.toString();
Version.logVersion();
@ -213,7 +200,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.cfg = cfg;
this.codec = cfg.getCodec();
this.commandExecutor = new CommandSyncService(this, objectBuilder);
}
protected void closeNodeConnections() {
@ -288,11 +274,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return false;
}
@Override
public CommandSyncService getCommandExecutor() {
return commandExecutor;
}
@Override
public IdleConnectionWatcher getConnectionWatcher() {
return connectionWatcher;

@ -15,10 +15,6 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.api.ClusterNode;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
@ -28,10 +24,14 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.Time;
import org.redisson.command.CommandSyncService;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
*
* @author Nikita Koksharov
@ -40,10 +40,10 @@ import org.redisson.misc.RedissonPromise;
public class RedisClientEntry implements ClusterNode {
private final RedisClient client;
private final CommandSyncService commandExecutor;
private final CommandAsyncExecutor commandExecutor;
private final NodeType type;
public RedisClientEntry(RedisClient client, CommandSyncService commandExecutor, NodeType type) {
public RedisClientEntry(RedisClient client, CommandAsyncExecutor commandExecutor, NodeType type) {
super();
this.client = client;
this.commandExecutor = commandExecutor;

@ -15,31 +15,25 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.util.concurrent.ScheduledFuture;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.ReplicatedServersConfig;
import org.redisson.config.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RedisURI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* {@link ConnectionManager} for AWS ElastiCache Replication Groups or Azure Redis Cache. By providing all nodes
@ -55,7 +49,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private AtomicReference<RedisURI> currentMaster = new AtomicReference<>();
private final AtomicReference<RedisURI> currentMaster = new AtomicReference<>();
private ScheduledFuture<?> monitorFuture;
@ -64,8 +58,8 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
slave
}
public ReplicatedConnectionManager(ReplicatedServersConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
super(config, id, objectBuilder);
public ReplicatedConnectionManager(ReplicatedServersConfig cfg, Config config, UUID id) {
super(config, id);
this.config = create(cfg);
initTimer(this.config);

@ -28,7 +28,6 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
@ -70,8 +69,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private boolean usePassword = false;
private String scheme;
public SentinelConnectionManager(SentinelServersConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
super(config, id, objectBuilder);
public SentinelConnectionManager(SentinelServersConfig cfg, Config config, UUID id) {
super(config, id);
if (cfg.getMasterName() == null) {
throw new IllegalArgumentException("masterName parameter is not defined!");

@ -15,14 +15,9 @@
*/
package org.redisson.connection;
import java.util.UUID;
import org.redisson.config.*;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.SingleServerConfig;
import org.redisson.config.SubscriptionMode;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import java.util.UUID;
/**
*
@ -31,8 +26,8 @@ import org.redisson.liveobject.core.RedissonObjectBuilder;
*/
public class SingleConnectionManager extends MasterSlaveConnectionManager {
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
super(create(cfg), config, id, objectBuilder);
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) {
super(create(cfg), config, id);
}
private static MasterSlaveServersConfig create(SingleServerConfig cfg) {

@ -23,7 +23,7 @@ import org.redisson.api.RMap;
import org.redisson.api.executor.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.remote.*;
import org.slf4j.Logger;
@ -58,7 +58,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
private List<TaskSuccessListener> successListeners;
public RedissonExecutorRemoteService(Codec codec, String name,
CommandAsyncService commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
}

@ -15,22 +15,22 @@
*/
package org.redisson.executor;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
/**
*
* @author Nikita Koksharov
@ -40,7 +40,7 @@ public class ScheduledTasksService extends TasksService {
private RequestId requestId;
public ScheduledTasksService(Codec codec, String name, CommandExecutor commandExecutor, String redissonId, ConcurrentMap<String, ResponseEntry> responses) {
public ScheduledTasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String redissonId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, redissonId, responses);
}

@ -15,16 +15,15 @@
*/
package org.redisson.executor;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.ResponseEntry;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
/**
*
* @author Nikita Koksharov
@ -32,9 +31,9 @@ import org.redisson.remote.ResponseEntry;
*/
public class TasksBatchService extends TasksService {
private CommandBatchService batchCommandService;
private final CommandBatchService batchCommandService;
public TasksBatchService(Codec codec, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
public TasksBatchService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
batchCommandService = new CommandBatchService(commandExecutor);
}

@ -32,7 +32,7 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CustomObjectInputStream;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.params.*;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
@ -64,7 +64,7 @@ public class TasksRunnerService implements RemoteExecutorService {
private final Codec codec;
private final String name;
private final CommandExecutor commandExecutor;
private final CommandAsyncExecutor commandExecutor;
private final RedissonClient redisson;
@ -80,7 +80,7 @@ public class TasksRunnerService implements RemoteExecutorService {
private BeanFactory beanFactory;
private ConcurrentMap<String, ResponseEntry> responses;
public TasksRunnerService(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name, ConcurrentMap<String, ResponseEntry> responses) {
public TasksRunnerService(CommandAsyncExecutor commandExecutor, RedissonClient redisson, Codec codec, String name, ConcurrentMap<String, ResponseEntry> responses) {
this.commandExecutor = commandExecutor;
this.name = name;
this.redisson = redisson;
@ -385,10 +385,10 @@ public class TasksRunnerService implements RemoteExecutorService {
+ "end;"
+ "end;";
commandExecutor.evalWrite(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
commandExecutor.get(commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
script,
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId);
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId));
}
}

@ -2939,10 +2939,10 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return (T) this;
}
if (clazz == CacheReactive.class) {
return (T) ReactiveProxyBuilder.create(new CommandReactiveService(commandExecutor.getConnectionManager()), this, CacheReactive.class);
return (T) ReactiveProxyBuilder.create(new CommandReactiveService(commandExecutor.getConnectionManager(), commandExecutor.getObjectBuilder()), this, CacheReactive.class);
}
if (clazz == CacheRx.class) {
return (T) RxProxyBuilder.create(new CommandRxService(commandExecutor.getConnectionManager()), this, CacheRx.class);
return (T) RxProxyBuilder.create(new CommandRxService(commandExecutor.getConnectionManager(), commandExecutor.getObjectBuilder()), this, CacheRx.class);
}
return null;
}

@ -28,7 +28,6 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
@ -54,11 +53,9 @@ public class AccessorInterceptor {
private static final Pattern FIELD_PATTERN = Pattern.compile("^(get|set|is)");
private final CommandAsyncExecutor commandExecutor;
private final ConnectionManager connectionManager;
public AccessorInterceptor(CommandAsyncExecutor commandExecutor, ConnectionManager connectionManager) {
public AccessorInterceptor(CommandAsyncExecutor commandExecutor) {
this.commandExecutor = commandExecutor;
this.connectionManager = connectionManager;
}
@RuntimeType
@ -196,7 +193,7 @@ public class AccessorInterceptor {
set.removeAsync(((RLiveObject) me).getLiveObjectId());
} else {
if (ClassUtils.isAnnotationPresent(field.getType(), REntity.class)
|| connectionManager.isClusterMode()) {
|| commandExecutor.getConnectionManager().isClusterMode()) {
Object value = liveMap.remove(field.getName());
if (value != null) {
RMultimapAsync<Object, Object> map = new RedissonSetMultimap<>(namingScheme.getCodec(), ce, indexName);

@ -24,7 +24,6 @@ import org.redisson.api.RMap;
import org.redisson.client.RedisException;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.resolver.NamingScheme;
@ -48,22 +47,19 @@ public class LiveObjectInterceptor {
}
private final CommandAsyncExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final Class<?> originalClass;
private final String idFieldName;
private final Class<?> idFieldType;
private final NamingScheme namingScheme;
private final RedissonLiveObjectService service;
public LiveObjectInterceptor(CommandAsyncExecutor commandExecutor, ConnectionManager connectionManager,
RedissonLiveObjectService service, Class<?> entityClass, String idFieldName) {
public LiveObjectInterceptor(CommandAsyncExecutor commandExecutor, RedissonLiveObjectService service, Class<?> entityClass, String idFieldName) {
this.service = service;
this.commandExecutor = commandExecutor;
this.connectionManager = connectionManager;
this.originalClass = entityClass;
this.idFieldName = idFieldName;
namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(entityClass);
namingScheme = commandExecutor.getObjectBuilder().getNamingScheme(entityClass);
try {
this.idFieldType = ClassUtils.getDeclaredField(originalClass, idFieldName).getType();

@ -15,26 +15,21 @@
*/
package org.redisson.mapreduce;
import java.lang.reflect.Modifier;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.redisson.api.RBatch;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RMapAsync;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.*;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RMapReduceExecutor;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.codec.Codec;
import org.redisson.connection.ConnectionManager;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import java.lang.reflect.Modifier;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
/**
*
* @author Nikita Koksharov
@ -53,14 +48,14 @@ abstract class MapReduceExecutor<M, VIn, KOut, VOut> implements RMapReduceExecut
final Codec objectCodec;
final String objectName;
final Class<?> objectClass;
final CommandAsyncExecutor commandExecutor;
private ConnectionManager connectionManager;
RReducer<KOut, VOut> reducer;
M mapper;
long timeout;
MapReduceExecutor(RObject object, RedissonClient redisson, ConnectionManager connectionManager) {
MapReduceExecutor(RObject object, RedissonClient redisson, CommandAsyncExecutor commandExecutor) {
this.objectName = object.getName();
this.objectCodec = object.getCodec();
this.objectClass = object.getClass();
@ -69,7 +64,7 @@ abstract class MapReduceExecutor<M, VIn, KOut, VOut> implements RMapReduceExecut
UUID id = UUID.randomUUID();
this.resultMapName = object.getName() + ":result:" + id;
this.executorService = redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME);
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
}
protected void check(Object task) {
@ -87,7 +82,7 @@ abstract class MapReduceExecutor<M, VIn, KOut, VOut> implements RMapReduceExecut
@Override
public Map<KOut, VOut> execute() {
return connectionManager.getCommandExecutor().get(executeAsync());
return commandExecutor.get(executeAsync());
}
@Override
@ -120,7 +115,7 @@ abstract class MapReduceExecutor<M, VIn, KOut, VOut> implements RMapReduceExecut
@Override
public void execute(String resultMapName) {
connectionManager.getCommandExecutor().get(executeAsync(resultMapName));
commandExecutor.get(executeAsync(resultMapName));
}
@Override
@ -145,7 +140,7 @@ abstract class MapReduceExecutor<M, VIn, KOut, VOut> implements RMapReduceExecut
@Override
public <R> R execute(RCollator<KOut, VOut, R> collator) {
return connectionManager.getCommandExecutor().get(executeAsync(collator));
return commandExecutor.get(executeAsync(collator));
}
@Override

@ -15,16 +15,16 @@
*/
package org.redisson.mapreduce;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.api.mapreduce.RCollectionMapper;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.connection.ConnectionManager;
import org.redisson.command.CommandAsyncExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
*
@ -37,8 +37,8 @@ import org.redisson.connection.ConnectionManager;
public class RedissonCollectionMapReduce<VIn, KOut, VOut> extends MapReduceExecutor<RCollectionMapper<VIn, KOut, VOut>, VIn, KOut, VOut>
implements RCollectionMapReduce<VIn, KOut, VOut> {
public RedissonCollectionMapReduce(RObject object, RedissonClient redisson, ConnectionManager connectionManager) {
super(object, redisson, connectionManager);
public RedissonCollectionMapReduce(RObject object, RedissonClient redisson, CommandAsyncExecutor commandExecutor) {
super(object, redisson, commandExecutor);
}
@Override

@ -15,16 +15,16 @@
*/
package org.redisson.mapreduce;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.api.mapreduce.RMapper;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.connection.ConnectionManager;
import org.redisson.command.CommandAsyncExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
*
@ -38,8 +38,8 @@ import org.redisson.connection.ConnectionManager;
public class RedissonMapReduce<KIn, VIn, KOut, VOut> extends MapReduceExecutor<RMapper<KIn, VIn, KOut, VOut>, VIn, KOut, VOut>
implements RMapReduce<KIn, VIn, KOut, VOut> {
public RedissonMapReduce(RObject object, RedissonClient redisson, ConnectionManager connectionManager) {
super(object, redisson, connectionManager);
public RedissonMapReduce(RObject object, RedissonClient redisson, CommandAsyncExecutor commandExecutor) {
super(object, redisson, commandExecutor);
}
@Override

@ -38,9 +38,9 @@ public class CommandReactiveBatchService extends CommandReactiveService {
private final CommandBatchService batchService;
public CommandReactiveBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager.getCommandExecutor(), options, RedissonObjectBuilder.ReferenceType.REACTIVE);
public CommandReactiveBatchService(ConnectionManager connectionManager, CommandReactiveExecutor commandExecutor, BatchOptions options) {
super(connectionManager, commandExecutor.getObjectBuilder());
batchService = new CommandBatchService(commandExecutor, options, RedissonObjectBuilder.ReferenceType.REACTIVE);
}
@Override

@ -32,8 +32,8 @@ import reactor.core.publisher.Mono;
*/
public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor {
public CommandReactiveService(ConnectionManager connectionManager) {
super(connectionManager, connectionManager.getCommandExecutor().getObjectBuilder(), RedissonObjectBuilder.ReferenceType.REACTIVE);
public CommandReactiveService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.REACTIVE);
}
@Override

@ -35,7 +35,7 @@ public class RedissonBatchReactive implements RBatchReactive {
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandReactiveService commandExecutor, BatchOptions options) {
this.evictionScheduler = evictionScheduler;
this.executorService = new CommandReactiveBatchService(connectionManager, options);
this.executorService = new CommandReactiveBatchService(connectionManager, commandExecutor, options);
this.commandExecutor = commandExecutor;
}

@ -25,7 +25,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.Time;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandSyncService;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
@ -47,10 +47,10 @@ public class RedisNode implements RedisClusterMaster, RedisClusterSlave, RedisMa
RedisMasterAsync, RedisSlaveAsync {
final RedisClient client;
final CommandSyncService commandExecutor;
final CommandAsyncExecutor commandExecutor;
private final NodeType type;
public RedisNode(RedisClient client, CommandSyncService commandExecutor, NodeType type) {
public RedisNode(RedisClient client, CommandAsyncExecutor commandExecutor, NodeType type) {
super();
this.client = client;
this.commandExecutor = commandExecutor;

@ -20,6 +20,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.redisnode.BaseRedisNodes;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
@ -41,9 +42,11 @@ import java.util.concurrent.TimeUnit;
public class RedissonBaseNodes implements BaseRedisNodes {
ConnectionManager connectionManager;
CommandAsyncExecutor commandExecutor;
public RedissonBaseNodes(ConnectionManager connectionManager) {
public RedissonBaseNodes(ConnectionManager connectionManager, CommandAsyncExecutor commandExecutor) {
this.connectionManager = connectionManager;
this.commandExecutor = commandExecutor;
}
protected <T extends org.redisson.api.redisnode.RedisNode> Collection<T> getNodes(NodeType type) {
@ -52,14 +55,14 @@ public class RedissonBaseNodes implements BaseRedisNodes {
for (MasterSlaveEntry masterSlaveEntry : entries) {
if (masterSlaveEntry.getAllEntries().isEmpty()
&& type == NodeType.MASTER) {
RedisNode entry = new RedisNode(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
RedisNode entry = new RedisNode(masterSlaveEntry.getClient(), commandExecutor, NodeType.MASTER);
result.add((T) entry);
}
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) {
if (slaveEntry.getFreezeReason() != ClientConnectionsEntry.FreezeReason.MANAGER
&& slaveEntry.getNodeType() == type) {
RedisNode entry = new RedisNode(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
RedisNode entry = new RedisNode(slaveEntry.getClient(), commandExecutor, slaveEntry.getNodeType());
result.add((T) entry);
}
}
@ -74,13 +77,13 @@ public class RedissonBaseNodes implements BaseRedisNodes {
if (nodeType == NodeType.MASTER
&& masterSlaveEntry.getAllEntries().isEmpty()
&& RedisURI.compare(masterSlaveEntry.getClient().getAddr(), addr)) {
return new RedisNode(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
return new RedisNode(masterSlaveEntry.getClient(), commandExecutor, NodeType.MASTER);
}
for (ClientConnectionsEntry entry : masterSlaveEntry.getAllEntries()) {
if (RedisURI.compare(entry.getClient().getAddr(), addr)
&& entry.getFreezeReason() != ClientConnectionsEntry.FreezeReason.MANAGER) {
return new RedisNode(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType());
return new RedisNode(entry.getClient(), commandExecutor, entry.getNodeType());
}
}
}
@ -92,13 +95,13 @@ public class RedissonBaseNodes implements BaseRedisNodes {
List<RedisNode> result = new ArrayList<>();
for (MasterSlaveEntry masterSlaveEntry : entries) {
if (masterSlaveEntry.getAllEntries().isEmpty()) {
RedisNode masterEntry = new RedisNode(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
RedisNode masterEntry = new RedisNode(masterSlaveEntry.getClient(), commandExecutor, NodeType.MASTER);
result.add(masterEntry);
}
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) {
if (slaveEntry.getFreezeReason() != ClientConnectionsEntry.FreezeReason.MANAGER) {
RedisNode entry = new RedisNode(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
RedisNode entry = new RedisNode(slaveEntry.getClient(), commandExecutor, slaveEntry.getNodeType());
result.add(entry);
}
}

@ -19,6 +19,7 @@ import org.redisson.api.NodeType;
import org.redisson.api.redisnode.RedisCluster;
import org.redisson.api.redisnode.RedisClusterMaster;
import org.redisson.api.redisnode.RedisClusterSlave;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import java.util.Collection;
@ -30,8 +31,8 @@ import java.util.Collection;
*/
public class RedissonClusterNodes extends RedissonBaseNodes implements RedisCluster {
public RedissonClusterNodes(ConnectionManager connectionManager) {
super(connectionManager);
public RedissonClusterNodes(ConnectionManager connectionManager, CommandAsyncExecutor commandExecutor) {
super(connectionManager, commandExecutor);
}
@Override

@ -19,6 +19,7 @@ import org.redisson.api.NodeType;
import org.redisson.api.redisnode.RedisMaster;
import org.redisson.api.redisnode.RedisMasterSlave;
import org.redisson.api.redisnode.RedisSlave;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import java.util.Collection;
@ -30,8 +31,8 @@ import java.util.Collection;
*/
public class RedissonMasterSlaveNodes extends RedissonBaseNodes implements RedisMasterSlave {
public RedissonMasterSlaveNodes(ConnectionManager connectionManager) {
super(connectionManager);
public RedissonMasterSlaveNodes(ConnectionManager connectionManager, CommandAsyncExecutor commandExecutor) {
super(connectionManager, commandExecutor);
}
@Override

@ -17,6 +17,7 @@ package org.redisson.redisnode;
import org.redisson.api.redisnode.RedisSentinel;
import org.redisson.api.redisnode.RedisSentinelMasterSlave;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.misc.RedisURI;
@ -31,15 +32,14 @@ import java.util.stream.Collectors;
*/
public class RedissonSentinelMasterSlaveNodes extends RedissonMasterSlaveNodes implements RedisSentinelMasterSlave {
public RedissonSentinelMasterSlaveNodes(ConnectionManager connectionManager) {
super(connectionManager);
public RedissonSentinelMasterSlaveNodes(ConnectionManager connectionManager, CommandAsyncExecutor commandExecutor) {
super(connectionManager, commandExecutor);
}
@Override
public Collection<RedisSentinel> getSentinels() {
return ((SentinelConnectionManager) connectionManager).getSentinels().stream()
.map(c -> new SentinelRedisNode(c, connectionManager.getCommandExecutor()))
.map(c -> new SentinelRedisNode(c, commandExecutor))
.collect(Collectors.toList());
}
@ -48,7 +48,7 @@ public class RedissonSentinelMasterSlaveNodes extends RedissonMasterSlaveNodes i
RedisURI addr = new RedisURI(address);
return ((SentinelConnectionManager) connectionManager).getSentinels().stream()
.filter(c -> RedisURI.compare(c.getAddr(), addr))
.map(c -> new SentinelRedisNode(c, connectionManager.getCommandExecutor()))
.map(c -> new SentinelRedisNode(c, commandExecutor))
.findFirst().orElse(null);
}
}

@ -18,6 +18,7 @@ package org.redisson.redisnode;
import org.redisson.api.NodeType;
import org.redisson.api.redisnode.RedisMaster;
import org.redisson.api.redisnode.RedisSingle;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager;
import java.util.Collection;
@ -29,8 +30,8 @@ import java.util.Collection;
*/
public class RedissonSingleNode extends RedissonBaseNodes implements RedisSingle {
public RedissonSingleNode(ConnectionManager connectionManager) {
super(connectionManager);
public RedissonSingleNode(ConnectionManager connectionManager, CommandAsyncExecutor commandExecutor) {
super(connectionManager, commandExecutor);
}
@Override

@ -26,7 +26,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.Time;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -43,9 +43,9 @@ import java.util.concurrent.TimeUnit;
public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
private final RedisClient client;
private final CommandAsyncService commandAsyncService;
private final CommandAsyncExecutor commandAsyncService;
public SentinelRedisNode(RedisClient client, CommandAsyncService commandAsyncService) {
public SentinelRedisNode(RedisClient client, CommandAsyncExecutor commandAsyncService) {
super();
this.client = client;
this.commandAsyncService = commandAsyncService;

@ -21,6 +21,7 @@ import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
@ -38,9 +39,9 @@ public class CommandRxBatchService extends CommandRxService {
private final CommandBatchService batchService;
public CommandRxBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager.getCommandExecutor(), options, RedissonObjectBuilder.ReferenceType.RXJAVA);
public CommandRxBatchService(ConnectionManager connectionManager, CommandAsyncExecutor executor, BatchOptions options) {
super(connectionManager, executor.getObjectBuilder());
batchService = new CommandBatchService(executor, options, RedissonObjectBuilder.ReferenceType.RXJAVA);
}
@Override

@ -34,8 +34,8 @@ import org.redisson.liveobject.core.RedissonObjectBuilder;
*/
public class CommandRxService extends CommandAsyncService implements CommandRxExecutor {
public CommandRxService(ConnectionManager connectionManager) {
super(connectionManager, connectionManager.getCommandExecutor().getObjectBuilder(), RedissonObjectBuilder.ReferenceType.RXJAVA);
public CommandRxService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.RXJAVA);
}
@Override

@ -35,7 +35,7 @@ public class RedissonBatchRx implements RBatchRx {
public RedissonBatchRx(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandRxExecutor commandExecutor, BatchOptions options) {
this.evictionScheduler = evictionScheduler;
this.executorService = new CommandRxBatchService(connectionManager, options);
this.executorService = new CommandRxBatchService(connectionManager, commandExecutor, options);
this.commandExecutor = commandExecutor;
}

@ -540,7 +540,7 @@ public class RedissonTransaction implements RTransaction {
}
private RedissonBatch createBatch() {
return new RedissonBatch(null, commandExecutor.getConnectionManager(),
return new RedissonBatch(null, commandExecutor,
BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.IN_MEMORY_ATOMIC));
}

@ -149,8 +149,8 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
for (Long timeout : queue) {
long epiry = ((timeout - new Date().getTime()) / 1000);
log.info("Item " + (i++) + " expires in " + epiry + " seconds");
//the Redisson library uses this 5000ms delay in the code
if (epiry > leaseTimeSeconds + 5) {
//the Redisson library uses this 60000*5ms delay in the code
if (epiry > leaseTimeSeconds + 60*5) {
Assert.fail("It would take more than " + leaseTimeSeconds + "s to get the lock!");
}
}
@ -255,9 +255,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
// we're testing interaction of various internal methods, so create a Redisson instance for protected access
Redisson redisson = new Redisson(createConfig());
RedissonFairLock lock = new RedissonFairLock(
redisson.connectionManager.getCommandExecutor(),
"testAcquireFailedTimeoutDrift_Descrete");
RedissonFairLock lock = (RedissonFairLock) redisson.getFairLock("testAcquireFailedTimeoutDrift_Descrete");
// clear out any prior state
lock.delete();
@ -337,7 +335,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Redisson redisson = new Redisson(createConfig());
RedissonFairLock lock = new RedissonFairLock(
redisson.connectionManager.getCommandExecutor(),
redisson.getCommandExecutor(),
"testLockAcquiredTimeoutDrift_Descrete",
100);
@ -395,7 +393,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Redisson redisson = new Redisson(createConfig());
RedissonFairLock lock = new RedissonFairLock(
redisson.connectionManager.getCommandExecutor(),
redisson.getCommandExecutor(),
"testLockAcquiredTimeoutDrift_Descrete");
// clear out any prior state
@ -451,7 +449,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Redisson redisson = new Redisson(createConfig());
RedissonFairLock lock = new RedissonFairLock(
redisson.connectionManager.getCommandExecutor(),
redisson.getCommandExecutor(),
"testAbandonedTimeoutDrift_Descrete",
threadWaitTime);
@ -574,8 +572,8 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long timeout = queue.get(i);
long epiry = ((timeout - new Date().getTime()) / 1000);
log.info("Item " + i + " expires in " + epiry + " seconds");
// the Redisson library uses this 5000ms delay in the code
Assert.assertFalse("It would take more than " + (leaseTimeSeconds + 5 * (i + 1)) + "s to get the lock!", epiry > leaseTimeSeconds + 5 * (i + 1));
// the Redisson library uses this 60000*5ms delay in the code
Assert.assertFalse("It would take more than " + (leaseTimeSeconds + 60*5 * (i + 1)) + "s to get the lock!", epiry > leaseTimeSeconds + 60*5 * (i + 1));
}
}

Loading…
Cancel
Save