refactoring

pull/3487/head
Nikita Koksharov 4 years ago
parent 63fe4d5620
commit d10ff3c772

@ -61,10 +61,7 @@ public class Redisson implements RedissonClient {
this.config = config; this.config = config;
Config configCopy = new Config(config); Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy); connectionManager = ConfigSupport.createConnectionManager(configCopy, this);
if (config.isReferenceEnabled()) {
connectionManager.getCommandExecutor().enableRedissonReferenceSupport(this);
}
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor()); evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
writeBehindService = new WriteBehindService(connectionManager.getCommandExecutor()); writeBehindService = new WriteBehindService(connectionManager.getCommandExecutor());
} }

@ -50,11 +50,7 @@ public class RedissonReactive implements RedissonReactiveClient {
this.config = config; this.config = config;
Config configCopy = new Config(config); Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy); connectionManager = ConfigSupport.createConnectionManager(configCopy, this);
if (config.isReferenceEnabled()) {
this.connectionManager.getCommandExecutor().enableRedissonReferenceSupport(this);
}
commandExecutor = new CommandReactiveService(connectionManager); commandExecutor = new CommandReactiveService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor); evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor);

@ -48,10 +48,7 @@ public class RedissonRx implements RedissonRxClient {
this.config = config; this.config = config;
Config configCopy = new Config(config); Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy); connectionManager = ConfigSupport.createConnectionManager(configCopy, this);
if (config.isReferenceEnabled()) {
connectionManager.getCommandExecutor().enableRedissonReferenceSupport(this);
}
commandExecutor = new CommandRxService(connectionManager); commandExecutor = new CommandRxService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor); evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor);

@ -37,6 +37,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry; import org.redisson.connection.SingleEntry;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
@ -76,8 +77,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>(); private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) { public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
super(config, id); super(config, id, objectBuilder);
if (cfg.getNodeAddresses().isEmpty()) { if (cfg.getNodeAddresses().isEmpty()) {
throw new IllegalArgumentException("At least one cluster node should be defined!"); throw new IllegalArgumentException("At least one cluster node should be defined!");

@ -18,9 +18,6 @@ package org.redisson.command;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
@ -43,12 +40,6 @@ public interface CommandAsyncExecutor {
ConnectionManager getConnectionManager(); ConnectionManager getConnectionManager();
void enableRedissonReferenceSupport(RedissonClient redisson);
void enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive);
void enableRedissonReferenceSupport(RedissonRxClient redissonReactive);
<V> RedisException convertException(RFuture<V> future); <V> RedisException convertException(RFuture<V> future);
void syncSubscription(RFuture<?> future); void syncSubscription(RFuture<?> future);

@ -22,9 +22,6 @@ import io.netty.util.ReferenceCountUtil;
import org.redisson.RedissonReference; import org.redisson.RedissonReference;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.cache.LRUCacheMap; import org.redisson.cache.LRUCacheMap;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
@ -34,8 +31,6 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
@ -65,10 +60,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class); static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
final ConnectionManager connectionManager; final ConnectionManager connectionManager;
protected RedissonObjectBuilder objectBuilder; final RedissonObjectBuilder objectBuilder;
public CommandAsyncService(ConnectionManager connectionManager) { public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.objectBuilder = objectBuilder;
} }
@Override @Override
@ -76,28 +72,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return connectionManager; return connectionManager;
} }
@Override
public void enableRedissonReferenceSupport(RedissonClient redisson) {
enableRedissonReferenceSupport(redisson.getConfig(), redisson, null, null);
}
@Override
public void enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
enableRedissonReferenceSupport(redissonReactive.getConfig(), null, redissonReactive, null);
}
@Override
public void enableRedissonReferenceSupport(RedissonRxClient redissonRx) {
enableRedissonReferenceSupport(redissonRx.getConfig(), null, null, redissonRx);
}
private void enableRedissonReferenceSupport(Config config, RedissonClient redisson, RedissonReactiveClient redissonReactive, RedissonRxClient redissonRx) {
Codec codec = config.getCodec();
objectBuilder = new RedissonObjectBuilder(config, redisson, redissonReactive, redissonRx);
ReferenceCodecProvider codecProvider = objectBuilder.getReferenceCodecProvider();
codecProvider.registerCodec((Class<Codec>) codec.getClass(), codec);
}
private boolean isRedissonReferenceSupportEnabled() { private boolean isRedissonReferenceSupportEnabled() {
return objectBuilder != null; return objectBuilder != null;
} }

@ -127,9 +127,8 @@ public class CommandBatchService extends CommandAsyncService {
} }
public CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder) { public CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder) {
super(connectionManager); super(connectionManager, objectBuilder);
this.options = options; this.options = options;
this.objectBuilder = objectBuilder;
} }
public BatchOptions getOptions() { public BatchOptions getOptions() {

@ -21,6 +21,7 @@ import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -33,8 +34,8 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx
final Logger log = LoggerFactory.getLogger(getClass()); final Logger log = LoggerFactory.getLogger(getClass());
public CommandSyncService(ConnectionManager connectionManager) { public CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
super(connectionManager); super(connectionManager, objectBuilder);
} }
@Override @Override

@ -15,33 +15,6 @@
*/ */
package org.redisson.config; package org.redisson.config;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
import java.util.Scanner;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.redisson.api.NatMapper;
import org.redisson.api.RedissonNodeInitializer;
import org.redisson.client.NettyHook;
import org.redisson.client.codec.Codec;
import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.connection.AddressResolverGroupFactory;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.ReplicatedConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.connection.balancer.LoadBalancer;
import com.fasterxml.jackson.annotation.JsonFilter; import com.fasterxml.jackson.annotation.JsonFilter;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonInclude.Include;
@ -49,11 +22,27 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.ser.FilterProvider; import com.fasterxml.jackson.databind.ser.FilterProvider;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.redisson.api.*;
import org.redisson.client.NettyHook;
import org.redisson.client.codec.Codec;
import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.connection.*;
import org.redisson.connection.balancer.LoadBalancer;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import java.io.*;
import java.net.URL;
import java.util.Scanner;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* *
@ -188,25 +177,52 @@ public class ConfigSupport {
public String toYAML(Config config) throws IOException { public String toYAML(Config config) throws IOException {
return yamlMapper.writeValueAsString(config); return yamlMapper.writeValueAsString(config);
} }
public static ConnectionManager createConnectionManager(Config configCopy) { public static ConnectionManager createConnectionManager(Config config, RedissonRxClient redisson) {
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(redisson);
}
return createConnectionManager(config, objectBuilder);
}
public static ConnectionManager createConnectionManager(Config config, RedissonReactiveClient redisson) {
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(redisson);
}
return createConnectionManager(config, objectBuilder);
}
public static ConnectionManager createConnectionManager(Config config, RedissonClient redisson) {
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(redisson);
}
return createConnectionManager(config, objectBuilder);
}
public static ConnectionManager createConnectionManager(Config configCopy, RedissonObjectBuilder objectBuilder) {
UUID id = UUID.randomUUID(); UUID id = UUID.randomUUID();
if (configCopy.getMasterSlaveServersConfig() != null) { if (configCopy.getMasterSlaveServersConfig() != null) {
validate(configCopy.getMasterSlaveServersConfig()); validate(configCopy.getMasterSlaveServersConfig());
return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id); return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id, objectBuilder);
} else if (configCopy.getSingleServerConfig() != null) { } else if (configCopy.getSingleServerConfig() != null) {
validate(configCopy.getSingleServerConfig()); validate(configCopy.getSingleServerConfig());
return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id); return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id, objectBuilder);
} else if (configCopy.getSentinelServersConfig() != null) { } else if (configCopy.getSentinelServersConfig() != null) {
validate(configCopy.getSentinelServersConfig()); validate(configCopy.getSentinelServersConfig());
return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id); return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id, objectBuilder);
} else if (configCopy.getClusterServersConfig() != null) { } else if (configCopy.getClusterServersConfig() != null) {
validate(configCopy.getClusterServersConfig()); validate(configCopy.getClusterServersConfig());
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id); return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id, objectBuilder);
} else if (configCopy.getReplicatedServersConfig() != null) { } else if (configCopy.getReplicatedServersConfig() != null) {
validate(configCopy.getReplicatedServersConfig()); validate(configCopy.getReplicatedServersConfig());
return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id); return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id, objectBuilder);
} else if (configCopy.getConnectionManager() != null) { } else if (configCopy.getConnectionManager() != null) {
return configCopy.getConnectionManager(); return configCopy.getConnectionManager();
}else { }else {

@ -40,6 +40,7 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandSyncService; import org.redisson.command.CommandSyncService;
import org.redisson.config.*; import org.redisson.config.*;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CountableListener; import org.redisson.misc.CountableListener;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
@ -147,15 +148,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>(); private final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) { public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
this(config, id); this(config, id, objectBuilder);
this.config = cfg; this.config = cfg;
initTimer(cfg); initTimer(cfg);
initSingleEntry(); initSingleEntry();
} }
protected MasterSlaveConnectionManager(Config cfg, UUID id) { protected MasterSlaveConnectionManager(Config cfg, UUID id, RedissonObjectBuilder objectBuilder) {
this.id = id.toString(); this.id = id.toString();
Version.logVersion(); Version.logVersion();
@ -212,7 +213,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.cfg = cfg; this.cfg = cfg;
this.codec = cfg.getCodec(); this.codec = cfg.getCodec();
this.commandExecutor = new CommandSyncService(this); this.commandExecutor = new CommandSyncService(this, objectBuilder);
} }
protected void closeNodeConnections() { protected void closeNodeConnections() {

@ -34,6 +34,7 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.config.ReplicatedServersConfig; import org.redisson.config.ReplicatedServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -63,8 +64,8 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
slave slave
} }
public ReplicatedConnectionManager(ReplicatedServersConfig cfg, Config config, UUID id) { public ReplicatedConnectionManager(ReplicatedServersConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
super(config, id); super(config, id, objectBuilder);
this.config = create(cfg); this.config = create(cfg);
initTimer(this.config); initTimer(this.config);

@ -28,6 +28,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.*; import org.redisson.config.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CountableListener; import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
@ -69,8 +70,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private boolean usePassword = false; private boolean usePassword = false;
private String scheme; private String scheme;
public SentinelConnectionManager(SentinelServersConfig cfg, Config config, UUID id) { public SentinelConnectionManager(SentinelServersConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
super(config, id); super(config, id, objectBuilder);
if (cfg.getMasterName() == null) { if (cfg.getMasterName() == null) {
throw new IllegalArgumentException("masterName parameter is not defined!"); throw new IllegalArgumentException("masterName parameter is not defined!");

@ -22,6 +22,7 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.config.SingleServerConfig; import org.redisson.config.SingleServerConfig;
import org.redisson.config.SubscriptionMode; import org.redisson.config.SubscriptionMode;
import org.redisson.liveobject.core.RedissonObjectBuilder;
/** /**
* *
@ -30,8 +31,8 @@ import org.redisson.config.SubscriptionMode;
*/ */
public class SingleConnectionManager extends MasterSlaveConnectionManager { public class SingleConnectionManager extends MasterSlaveConnectionManager {
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) { public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id, RedissonObjectBuilder objectBuilder) {
super(create(cfg), config, id); super(create(cfg), config, id, objectBuilder);
} }
private static MasterSlaveServersConfig create(SingleServerConfig cfg) { private static MasterSlaveServersConfig create(SingleServerConfig cfg) {

@ -62,9 +62,9 @@ public class RedissonObjectBuilder {
} }
private final Config config; private final Config config;
private final RedissonClient redisson; private RedissonClient redisson;
private final RedissonReactiveClient redissonReactive; private RedissonReactiveClient redissonReactive;
private final RedissonRxClient redissonRx; private RedissonRxClient redissonRx;
public static class CodecMethodRef { public static class CodecMethodRef {
@ -81,15 +81,46 @@ public class RedissonObjectBuilder {
private final ReferenceCodecProvider codecProvider = new DefaultReferenceCodecProvider(); private final ReferenceCodecProvider codecProvider = new DefaultReferenceCodecProvider();
public RedissonObjectBuilder(Config config, public RedissonObjectBuilder(RedissonClient redisson) {
super();
this.config = redisson.getConfig();
this.redisson = redisson;
Codec codec = config.getCodec();
codecProvider.registerCodec((Class<Codec>) codec.getClass(), codec);
}
public RedissonObjectBuilder(RedissonReactiveClient redissonReactive) {
super();
this.config = redissonReactive.getConfig();
this.redissonReactive = redissonReactive;
Codec codec = config.getCodec();
codecProvider.registerCodec((Class<Codec>) codec.getClass(), codec);
}
public RedissonObjectBuilder(RedissonRxClient redissonRx) {
super();
this.config = redissonRx.getConfig();
this.redissonRx = redissonRx;
Codec codec = config.getCodec();
codecProvider.registerCodec((Class<Codec>) codec.getClass(), codec);
}
public RedissonObjectBuilder(Config config,
RedissonClient redisson, RedissonReactiveClient redissonReactive, RedissonRxClient redissonRx) { RedissonClient redisson, RedissonReactiveClient redissonReactive, RedissonRxClient redissonRx) {
super(); super();
this.config = config; this.config = config;
this.redisson = redisson; this.redisson = redisson;
this.redissonReactive = redissonReactive; this.redissonReactive = redissonReactive;
this.redissonRx = redissonRx; this.redissonRx = redissonRx;
Codec codec = config.getCodec();
codecProvider.registerCodec((Class<Codec>) codec.getClass(), codec);
} }
public ReferenceCodecProvider getReferenceCodecProvider() { public ReferenceCodecProvider getReferenceCodecProvider() {
return codecProvider; return codecProvider;
} }

@ -15,22 +15,19 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.concurrent.Callable;
import org.redisson.api.BatchOptions; import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult; import org.redisson.api.BatchResult;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService; import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.concurrent.Callable;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov

@ -32,8 +32,7 @@ import reactor.core.publisher.Mono;
public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor { public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor {
public CommandReactiveService(ConnectionManager connectionManager) { public CommandReactiveService(ConnectionManager connectionManager) {
super(connectionManager); super(connectionManager, connectionManager.getCommandExecutor().getObjectBuilder());
objectBuilder = connectionManager.getCommandExecutor().getObjectBuilder();
} }
@Override @Override

@ -15,21 +15,18 @@
*/ */
package org.redisson.rx; package org.redisson.rx;
import java.util.concurrent.Callable; import io.reactivex.rxjava3.core.Flowable;
import org.redisson.api.BatchOptions; import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult; import org.redisson.api.BatchResult;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonRxClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService; import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import io.reactivex.rxjava3.core.Flowable; import java.util.concurrent.Callable;
/** /**
* *

@ -34,8 +34,7 @@ import io.reactivex.rxjava3.processors.ReplayProcessor;
public class CommandRxService extends CommandAsyncService implements CommandRxExecutor { public class CommandRxService extends CommandAsyncService implements CommandRxExecutor {
public CommandRxService(ConnectionManager connectionManager) { public CommandRxService(ConnectionManager connectionManager) {
super(connectionManager); super(connectionManager, connectionManager.getCommandExecutor().getObjectBuilder());
objectBuilder = connectionManager.getCommandExecutor().getObjectBuilder();
} }
@Override @Override

Loading…
Cancel
Save