Fixed - Config can not handle underscore in host #722

pull/748/head
Nikita 8 years ago
parent 4260a3d38c
commit 1319607059

@ -16,7 +16,7 @@
package org.redisson.client;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -31,7 +31,7 @@ import org.redisson.client.handler.ConnectionWatchdog;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -70,15 +70,15 @@ public class RedisClient {
private boolean hasOwnGroup;
public RedisClient(String address) {
this(URIBuilder.create(address));
this(URLBuilder.create(address));
}
public RedisClient(URI address) {
public RedisClient(URL address) {
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address);
hasOwnGroup = true;
}
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URI address) {
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URL address) {
this(timer, executor, group, address.getHost(), address.getPort());
}

@ -15,7 +15,7 @@
*/
package org.redisson.cluster;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -66,13 +66,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<URI, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
private final Map<URL, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Integer, ClusterPartition> lastPartitions = PlatformDependent.newConcurrentHashMap();
private ScheduledFuture<?> monitorFuture;
private volatile URI lastClusterNode;
private volatile URL lastClusterNode;
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
super(config);
@ -84,7 +84,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>();
for (URI addr : cfg.getNodeAddresses()) {
for (URL addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
@ -158,7 +158,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URL addr) {
RedisConnection connection = nodeConnections.get(addr);
if (connection != null) {
return newSucceededFuture(connection);
@ -308,22 +308,22 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private void scheduleClusterChangeCheck(final ClusterServersConfig cfg, final Iterator<URI> iterator) {
private void scheduleClusterChangeCheck(final ClusterServersConfig cfg, final Iterator<URL> iterator) {
monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
Iterator<URI> nodesIterator = iterator;
Iterator<URL> nodesIterator = iterator;
if (nodesIterator == null) {
List<URI> nodes = new ArrayList<URI>();
List<URI> slaves = new ArrayList<URI>();
List<URL> nodes = new ArrayList<URL>();
List<URL> slaves = new ArrayList<URL>();
for (ClusterPartition partition : getLastPartitions()) {
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
}
Set<URI> partitionSlaves = new HashSet<URI>(partition.getSlaveAddresses());
Set<URL> partitionSlaves = new HashSet<URL>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves);
}
@ -339,7 +339,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URI> iterator, final AtomicReference<Throwable> lastException) {
private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URL> iterator, final AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) {
log.error("Can't update cluster state", lastException.get());
scheduleClusterChangeCheck(cfg, null);
@ -348,7 +348,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (!getShutdownLatch().acquire()) {
return;
}
final URI uri = iterator.next();
final URL uri = iterator.next();
RFuture<RedisConnection> connectionFuture = connect(cfg, uri);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
@ -366,7 +366,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
});
}
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator, final URI uri) {
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URL> iterator, final URL uri) {
RFuture<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES);
future.addListener(new FutureListener<List<ClusterNodeInfo>>() {
@Override
@ -415,7 +415,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr());
// should be invoked first in order to remove stale failedSlaveAddresses
Set<URI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
Set<URL> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
// Do some slaves have changed state from failed to alive?
upDownSlaves(entry, currentPart, newPart, addedSlaves);
@ -424,20 +424,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart, Set<URI> addedSlaves) {
Set<URI> aliveSlaves = new HashSet<URI>(currentPart.getFailedSlaveAddresses());
private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart, Set<URL> addedSlaves) {
Set<URL> aliveSlaves = new HashSet<URL>(currentPart.getFailedSlaveAddresses());
aliveSlaves.removeAll(addedSlaves);
aliveSlaves.removeAll(newPart.getFailedSlaveAddresses());
for (URI uri : aliveSlaves) {
for (URL uri : aliveSlaves) {
currentPart.removeFailedSlaveAddress(uri);
if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
Set<URI> failedSlaves = new HashSet<URI>(newPart.getFailedSlaveAddresses());
Set<URL> failedSlaves = new HashSet<URL>(newPart.getFailedSlaveAddresses());
failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
for (URI uri : failedSlaves) {
for (URL uri : failedSlaves) {
currentPart.addFailedSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
@ -445,11 +445,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
private Set<URI> addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) {
Set<URI> removedSlaves = new HashSet<URI>(currentPart.getSlaveAddresses());
private Set<URL> addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) {
Set<URL> removedSlaves = new HashSet<URL>(currentPart.getSlaveAddresses());
removedSlaves.removeAll(newPart.getSlaveAddresses());
for (URI uri : removedSlaves) {
for (URL uri : removedSlaves) {
currentPart.removeSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
@ -457,9 +457,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
Set<URL> addedSlaves = new HashSet<URL>(newPart.getSlaveAddresses());
addedSlaves.removeAll(currentPart.getSlaveAddresses());
for (final URI uri : addedSlaves) {
for (final URL uri : addedSlaves) {
RFuture<Void> future = entry.addSlave(uri.getHost(), uri.getPort());
future.addListener(new FutureListener<Void>() {
@Override
@ -516,8 +516,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
log.info("changing master from {} to {} for {}",
currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot);
URI newUri = newMasterPart.getMasterAddress();
URI oldUri = currentPart.getMasterAddress();
URL newUri = newMasterPart.getMasterAddress();
URL oldUri = currentPart.getMasterAddress();
changeMaster(slot, newUri.getHost(), newUri.getPort());
@ -720,7 +720,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
@Override
public URI getLastClusterNode() {
public URL getLastClusterNode() {
return lastClusterNode;
}

@ -16,10 +16,11 @@
package org.redisson.cluster;
import java.net.URI;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
@ -33,7 +34,7 @@ public class ClusterNodeInfo {
private final String nodeInfo;
private String nodeId;
private URI address;
private URL address;
private final Set<Flag> flags = new HashSet<Flag>();
private String slaveOf;
@ -50,11 +51,11 @@ public class ClusterNodeInfo {
this.nodeId = nodeId;
}
public URI getAddress() {
public URL getAddress() {
return address;
}
public void setAddress(String address) {
this.address = URIBuilder.create(address);
this.address = URLBuilder.create(address);
}
public void addSlotRange(ClusterSlotRange range) {

@ -16,20 +16,25 @@
package org.redisson.cluster;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class ClusterPartition {
private final String nodeId;
private boolean masterFail;
private URI masterAddress;
private final Set<URI> slaveAddresses = new HashSet<URI>();
private final Set<URI> failedSlaves = new HashSet<URI>();
private URL masterAddress;
private final Set<URL> slaveAddresses = new HashSet<URL>();
private final Set<URL> failedSlaves = new HashSet<URL>();
private final Set<Integer> slots = new HashSet<Integer>();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
@ -85,33 +90,33 @@ public class ClusterPartition {
return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort());
}
public URI getMasterAddress() {
public URL getMasterAddress() {
return masterAddress;
}
public void setMasterAddress(String masterAddress) {
setMasterAddress(URIBuilder.create(masterAddress));
setMasterAddress(URLBuilder.create(masterAddress));
}
public void setMasterAddress(URI masterAddress) {
public void setMasterAddress(URL masterAddress) {
this.masterAddress = masterAddress;
}
public void addFailedSlaveAddress(URI address) {
public void addFailedSlaveAddress(URL address) {
failedSlaves.add(address);
}
public Set<URI> getFailedSlaveAddresses() {
public Set<URL> getFailedSlaveAddresses() {
return Collections.unmodifiableSet(failedSlaves);
}
public void removeFailedSlaveAddress(URI uri) {
public void removeFailedSlaveAddress(URL uri) {
failedSlaves.remove(uri);
}
public void addSlaveAddress(URI address) {
public void addSlaveAddress(URL address) {
slaveAddresses.add(address);
}
public Set<URI> getSlaveAddresses() {
public Set<URL> getSlaveAddresses() {
return Collections.unmodifiableSet(slaveAddresses);
}
public void removeSlaveAddress(URI uri) {
public void removeSlaveAddress(URL uri) {
slaveAddresses.remove(uri);
failedSlaves.remove(uri);
}

@ -15,18 +15,23 @@
*/
package org.redisson.config;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterServersConfig> {
/**
* Redis cluster node urls list
*/
private List<URI> nodeAddresses = new ArrayList<URI>();
private List<URL> nodeAddresses = new ArrayList<URL>();
/**
* Redis cluster scan interval in milliseconds
@ -50,14 +55,14 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterSe
*/
public ClusterServersConfig addNodeAddress(String ... addresses) {
for (String address : addresses) {
nodeAddresses.add(URIBuilder.create(address));
nodeAddresses.add(URLBuilder.create(address));
}
return this;
}
public List<URI> getNodeAddresses() {
public List<URL> getNodeAddresses() {
return nodeAddresses;
}
void setNodeAddresses(List<URI> nodeAddresses) {
void setNodeAddresses(List<URL> nodeAddresses) {
this.nodeAddresses = nodeAddresses;
}

@ -47,7 +47,13 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class ConfigSupport {
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "class")
@ -111,6 +117,10 @@ public class ConfigSupport {
private final ObjectMapper jsonMapper = createMapper(null);
private final ObjectMapper yamlMapper = createMapper(new YAMLFactory());
public ConfigSupport() {
URLBuilder.init();
}
public <T> T fromJSON(String content, Class<T> configType) throws IOException {
return jsonMapper.readValue(content, configType);
}

@ -15,24 +15,25 @@
*/
package org.redisson.config;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
* Configuration for an AWS ElastiCache replication group. A replication group is composed
* of a single master endpoint and multiple read slaves.
*
* @author Steve Ungerer
* @author Nikita Koksharov
*/
public class ElasticacheServersConfig extends BaseMasterSlaveServersConfig<ElasticacheServersConfig> {
/**
* Replication group node urls list
*/
private List<URI> nodeAddresses = new ArrayList<URI>();
private List<URL> nodeAddresses = new ArrayList<URL>();
/**
* Replication group scan interval in milliseconds
@ -62,14 +63,14 @@ public class ElasticacheServersConfig extends BaseMasterSlaveServersConfig<Elast
*/
public ElasticacheServersConfig addNodeAddress(String ... addresses) {
for (String address : addresses) {
nodeAddresses.add(URIBuilder.create(address));
nodeAddresses.add(URLBuilder.create(address));
}
return this;
}
public List<URI> getNodeAddresses() {
public List<URL> getNodeAddresses() {
return nodeAddresses;
}
void setNodeAddresses(List<URI> nodeAddresses) {
void setNodeAddresses(List<URL> nodeAddresses) {
this.nodeAddresses = nodeAddresses;
}

@ -15,25 +15,28 @@
*/
package org.redisson.config;
import java.net.URI;
import java.util.Collections;
import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<MasterSlaveServersConfig> {
/**
* Redis slave servers addresses
*/
private Set<URI> slaveAddresses = new HashSet<URI>();
private Set<URL> slaveAddresses = new HashSet<URL>();
/**
* Redis master server address
*/
private List<URI> masterAddress;
private URL masterAddress;
/**
* Database index used for Redis connection
@ -59,19 +62,19 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<Maste
*/
public MasterSlaveServersConfig setMasterAddress(String masterAddress) {
if (masterAddress != null) {
this.masterAddress = Collections.singletonList(URIBuilder.create(masterAddress));
this.masterAddress = URLBuilder.create(masterAddress);
}
return this;
}
public URI getMasterAddress() {
public URL getMasterAddress() {
if (masterAddress != null) {
return masterAddress.get(0);
return masterAddress;
}
return null;
}
public void setMasterAddress(URI masterAddress) {
public void setMasterAddress(URL masterAddress) {
if (masterAddress != null) {
this.masterAddress = Collections.singletonList(masterAddress);
this.masterAddress = masterAddress;
}
}
@ -83,18 +86,18 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<Maste
*/
public MasterSlaveServersConfig addSlaveAddress(String ... addresses) {
for (String address : addresses) {
slaveAddresses.add(URIBuilder.create(address));
slaveAddresses.add(URLBuilder.create(address));
}
return this;
}
public MasterSlaveServersConfig addSlaveAddress(URI slaveAddress) {
public MasterSlaveServersConfig addSlaveAddress(URL slaveAddress) {
slaveAddresses.add(slaveAddress);
return this;
}
public Set<URI> getSlaveAddresses() {
public Set<URL> getSlaveAddresses() {
return slaveAddresses;
}
public void setSlaveAddresses(Set<URI> readAddresses) {
public void setSlaveAddresses(Set<URL> readAddresses) {
this.slaveAddresses = readAddresses;
}

@ -15,15 +15,20 @@
*/
package org.redisson.config;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class SentinelServersConfig extends BaseMasterSlaveServersConfig<SentinelServersConfig> {
private List<URI> sentinelAddresses = new ArrayList<URI>();
private List<URL> sentinelAddresses = new ArrayList<URL>();
private String masterName;
@ -64,14 +69,14 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig<Sentinel
*/
public SentinelServersConfig addSentinelAddress(String ... addresses) {
for (String address : addresses) {
sentinelAddresses.add(URIBuilder.create(address));
sentinelAddresses.add(URLBuilder.create(address));
}
return this;
}
public List<URI> getSentinelAddresses() {
public List<URL> getSentinelAddresses() {
return sentinelAddresses;
}
void setSentinelAddresses(List<URI> sentinelAddresses) {
void setSentinelAddresses(List<URL> sentinelAddresses) {
this.sentinelAddresses = sentinelAddresses;
}

@ -15,11 +15,9 @@
*/
package org.redisson.config;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.net.URL;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
@ -32,7 +30,7 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
* Redis server address
*
*/
private List<URI> address;
private URL address;
/**
* Minimum idle subscription connection amount
@ -129,19 +127,19 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
*/
public SingleServerConfig setAddress(String address) {
if (address != null) {
this.address = Collections.singletonList(URIBuilder.create(address));
this.address = URLBuilder.create(address);
}
return this;
}
public URI getAddress() {
public URL getAddress() {
if (address != null) {
return address.get(0);
return address;
}
return null;
}
void setAddress(URI address) {
void setAddress(URL address) {
if (address != null) {
this.address = Collections.singletonList(address);
this.address = address;
}
}

@ -16,7 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@ -47,7 +47,7 @@ public interface ConnectionManager {
ExecutorService getExecutor();
URI getLastClusterNode();
URL getLastClusterNode();
boolean isClusterMode();

@ -15,7 +15,7 @@
*/
package org.redisson.connection;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -55,9 +55,9 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private AtomicReference<URI> currentMaster = new AtomicReference<URI>();
private AtomicReference<URL> currentMaster = new AtomicReference<URL>();
private final Map<URI, RedisConnection> nodeConnections = new HashMap<URI, RedisConnection>();
private final Map<URL, RedisConnection> nodeConnections = new HashMap<URL, RedisConnection>();
private ScheduledFuture<?> monitorFuture;
@ -72,7 +72,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
this.config = create(cfg);
initTimer(this.config);
for (URI addr : cfg.getNodeAddresses()) {
for (URL addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
connectionFuture.awaitUninterruptibly();
RedisConnection connection = connectionFuture.getNow();
@ -110,7 +110,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
return res;
}
private RFuture<RedisConnection> connect(BaseMasterSlaveServersConfig<?> cfg, final URI addr) {
private RFuture<RedisConnection> connect(BaseMasterSlaveServersConfig<?> cfg, final URL addr) {
RedisConnection connection = nodeConnections.get(addr);
if (connection != null) {
return newSucceededFuture(connection);
@ -158,11 +158,11 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
final URI master = currentMaster.get();
final URL master = currentMaster.get();
log.debug("Current master: {}", master);
final AtomicInteger count = new AtomicInteger(cfg.getNodeAddresses().size());
for (final URI addr : cfg.getNodeAddresses()) {
for (final URL addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override

@ -16,7 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -280,7 +280,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config,
HashSet<ClusterSlotRange> slots) {
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());
List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URL>emptySet());
for (RFuture<Void> future : fs) {
future.syncUninterruptibly();
}
@ -835,7 +835,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return executor;
}
public URI getLastClusterNode() {
public URL getLastClusterNode() {
return null;
}
}

@ -16,7 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@ -37,7 +37,6 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,7 +79,7 @@ public class MasterSlaveEntry {
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
}
public List<RFuture<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) {
public List<RFuture<Void>> initSlaveBalancer(Collection<URL> disconnectedNodes) {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty()
&& config.getReadMode() == ReadMode.SLAVE
&& disconnectedNodes.size() < config.getSlaveAddresses().size();
@ -88,7 +87,7 @@ public class MasterSlaveEntry {
List<RFuture<Void>> result = new LinkedList<RFuture<Void>>();
RFuture<Void> f = addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER);
result.add(f);
for (URI address : config.getSlaveAddresses()) {
for (URL address : config.getSlaveAddresses()) {
f = addSlave(address.getHost(), address.getPort(), disconnectedNodes.contains(address), NodeType.SLAVE);
result.add(f);
}

@ -16,7 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -41,7 +41,7 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -62,7 +62,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final AtomicReference<String> currentMaster = new AtomicReference<String>();
private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap();
private final Set<URI> disconnectedSlaves = new HashSet<URI>();
private final Set<URL> disconnectedSlaves = new HashSet<URL>();
public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
super(config);
@ -70,7 +70,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
final MasterSlaveServersConfig c = create(cfg);
initTimer(c);
for (URI addr : cfg.getSentinelAddresses()) {
for (URL addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
try {
RedisConnection connection = client.connect();
@ -104,7 +104,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.info("slave: {} added", host);
if (flags.contains("s_down") || flags.contains("disconnected")) {
URI url = URIBuilder.create(host);
URL url = URLBuilder.create(host);
disconnectedSlaves.add(url);
log.warn("slave: {} is down", host);
}
@ -123,7 +123,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
init(c);
List<RFuture<RedisPubSubConnection>> connectionFutures = new ArrayList<RFuture<RedisPubSubConnection>>(cfg.getSentinelAddresses().size());
for (URI addr : cfg.getSentinelAddresses()) {
for (URL addr : cfg.getSentinelAddresses()) {
RFuture<RedisPubSubConnection> future = registerSentinel(cfg, addr, c);
connectionFutures.add(future);
}
@ -146,7 +146,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return entry;
}
private RFuture<RedisPubSubConnection> registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) {
private RFuture<RedisPubSubConnection> registerSentinel(final SentinelServersConfig cfg, final URL addr, final MasterSlaveServersConfig c) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);
if (oldClient != null) {
@ -208,12 +208,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = parts[3];
String addr = ip + ":" + port;
URI uri = URIBuilder.create(addr);
URL uri = URLBuilder.create(addr);
registerSentinel(cfg, uri, c);
}
}
protected void onSlaveAdded(URI addr, String msg) {
protected void onSlaveAdded(URL addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
@ -250,7 +250,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void onNodeDown(URI sentinelAddr, String msg) {
private void onNodeDown(URL sentinelAddr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
@ -298,7 +298,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void onNodeUp(URI addr, String msg) {
private void onNodeUp(URL addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
@ -337,7 +337,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) {
private void onMasterChange(SentinelServersConfig cfg, URL addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {

@ -16,7 +16,7 @@
package org.redisson.connection.balancer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -28,7 +28,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
import io.netty.util.internal.PlatformDependent;
@ -78,7 +78,7 @@ public class WeightedRoundRobinBalancer implements LoadBalancer {
*/
public WeightedRoundRobinBalancer(Map<String, Integer> weights, int defaultWeight) {
for (Entry<String, Integer> entry : weights.entrySet()) {
URI uri = URIBuilder.create(entry.getKey());
URL uri = URLBuilder.create(entry.getKey());
InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort());
if (entry.getValue() <= 0) {
throw new IllegalArgumentException("Weight can't be less than or equal zero");

@ -1,37 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.net.URI;
public class URIBuilder {
public static URI create(String uri) {
String[] parts = uri.split(":");
if (parts.length-1 >= 3) {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
uri = "[" + uri.replace(":" + port, "") + "]:" + newPort;
} else {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
uri = uri.replace(":" + port, "") + ":" + newPort;
}
return URI.create("//" + uri);
}
}

@ -0,0 +1,89 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
/**
*
* @author Nikita Koksharov
*
*/
public class URLBuilder {
private static volatile boolean init = false;
static {
init();
}
public static void init() {
if (init) {
return;
}
init = true;
URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory() {
@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
if ("redis".equals(protocol)) {
return new URLStreamHandler() {
@Override
protected URLConnection openConnection(URL u) throws IOException {
throw new UnsupportedOperationException();
};
@Override
protected boolean equals(URL u1, URL u2) {
return u1.toString().equals(u2.toString());
}
@Override
protected int hashCode(URL u) {
return u.toString().hashCode();
}
};
}
return null;
}
});
}
public static URL create(String url) {
try {
String[] parts = url.split(":");
if (parts.length-1 >= 3) {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
String host = url.replace(":" + port, "");
return new URL("redis://[" + host + "]:" + newPort);
} else {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
String host = url.replace(":" + port, "");
return new URL("redis://" + host + ":" + newPort);
}
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
}
}

@ -1,5 +1,6 @@
package org.redisson;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
@ -40,7 +41,7 @@ public class RedissonTest {
protected RedissonClient redisson;
protected static RedissonClient defaultRedisson;
@Test
public void testSmallPool() throws InterruptedException {
Config config = new Config();
@ -311,23 +312,40 @@ public class RedissonTest {
}
@Test
public void testSingleConfig() throws IOException {
public void testSingleConfigJSON() throws IOException {
RedissonClient r = BaseTest.createInstance();
String t = r.getConfig().toJSON();
Config c = Config.fromJSON(t);
assertThat(c.toJSON()).isEqualTo(t);
}
@Test
public void testSingleConfigYAML() throws IOException {
RedissonClient r = BaseTest.createInstance();
String t = r.getConfig().toYAML();
Config c = Config.fromYAML(t);
assertThat(c.toYAML()).isEqualTo(t);
}
@Test
public void testMasterSlaveConfig() throws IOException {
public void testMasterSlaveConfigJSON() throws IOException {
Config c2 = new Config();
c2.useMasterSlaveServers().setMasterAddress("123.1.1.1:1231").addSlaveAddress("82.12.47.12:1028");
String t = c2.toJSON();
Config c = Config.fromJSON(t);
assertThat(c.toJSON()).isEqualTo(t);
}
@Test
public void testMasterSlaveConfigYAML() throws IOException {
Config c2 = new Config();
c2.useMasterSlaveServers().setMasterAddress("123.1.1.1:1231").addSlaveAddress("82.12.47.12:1028");
String t = c2.toYAML();
Config c = Config.fromYAML(t);
assertThat(c.toYAML()).isEqualTo(t);
}
// @Test
public void testCluster() {
NodesGroup<ClusterNode> nodes = redisson.getClusterNodesGroup();

@ -28,6 +28,7 @@ public class ClusterNodesDecoderTest {
buf.writeBytes(src);
List<ClusterNodeInfo> nodes = decoder.decode(buf, null);
ClusterNodeInfo node = nodes.get(0);
Assert.assertEquals("192.168.234.129", node.getAddress().getHost());
Assert.assertEquals(7001, node.getAddress().getPort());
}

Loading…
Cancel
Save