unix sockets support. #173

pull/139/merge
Nikita 10 years ago
parent a39cc41363
commit e923b269a2

@ -89,6 +89,11 @@
</profiles>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.0.27.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>

@ -12,6 +12,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
@ -61,9 +62,14 @@ public class RedisClient {
* @param port Server port.
*/
public RedisClient(EventLoopGroup group, String host, int port, int timeout) {
this(group, NioSocketChannel.class, host, port, timeout);
}
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host,
int port, int timeout2) {
addr = new InetSocketAddress(host, port);
bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(group).remoteAddress(addr);
bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
setDefaultTimeout(timeout, TimeUnit.MILLISECONDS);

@ -46,6 +46,8 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler {
try {
if (!input.isReadable()) return;
System.out.println("in: " + input.toString(CharsetUtil.UTF_8));
buffer.discardReadBytes();
buffer.writeBytes(input);
@ -60,7 +62,7 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler {
Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg;
ByteBuf buf = ctx.alloc().heapBuffer();
cmd.encode(buf);
// System.out.println("out: " + buf.toString(CharsetUtil.UTF_8));
System.out.println("out: " + buf.toString(CharsetUtil.UTF_8));
ctx.write(buf, promise);
}

@ -31,7 +31,7 @@ public class Config {
private MasterSlaveServersConfig masterSlaveServersConfig;
private SingleServerConfig singleServerConfig;
private ClusterServersConfig clusterServersConfig;
/**
@ -44,10 +44,14 @@ public class Config {
*/
private RedissonCodec codec;
private boolean useLinuxNativeEpoll;
public Config() {
}
Config(Config oldConf) {
setUseLinuxNativeEpoll(oldConf.isUseLinuxNativeEpoll());
if (oldConf.getCodec() == null) {
// use it by default
oldConf.setCodec(new JsonJacksonCodec());
@ -100,7 +104,7 @@ public class Config {
void setClusterServersConfig(ClusterServersConfig clusterServersConfig) {
this.clusterServersConfig = clusterServersConfig;
}
public SingleServerConfig useSingleServer() {
checkClusterServersConfig();
checkMasterSlaveServersConfig();
@ -168,7 +172,7 @@ public class Config {
throw new IllegalStateException("cluster servers config already used!");
}
}
private void checkSentinelServersConfig() {
if (sentinelServersConfig != null) {
throw new IllegalStateException("sentinel servers config already used!");
@ -187,4 +191,13 @@ public class Config {
}
}
public boolean isUseLinuxNativeEpoll() {
return useLinuxNativeEpoll;
}
public Config setUseLinuxNativeEpoll(boolean useLinuxNativeEpoll) {
this.useLinuxNativeEpoll = useLinuxNativeEpoll;
return this;
}
}

@ -54,7 +54,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
init(config);
for (URI addr : cfg.getNodeAddresses()) {
RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout());
RedisClient client = createClient(addr.getHost(), addr.getPort());
try {
RedisAsyncConnection<String, String> connection = client.connectAsync();
String nodesValue = get(connection.clusterNodes());
@ -87,7 +87,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
c.setMasterAddress(partition.getMasterAddress());
SingleEntry entry = new SingleEntry(codec, group, c);
SingleEntry entry = new SingleEntry(codec, this, c);
entries.put(partition.getEndSlot(), entry);
lastPartitions.put(partition.getEndSlot(), partition);
}
@ -98,7 +98,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public void run() {
try {
for (URI addr : cfg.getNodeAddresses()) {
final RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout());
final RedisClient client = createClient(addr.getHost(), addr.getPort());
try {
RedisAsyncConnection<String, String> connection = client.connectAsync();
String nodesValue = get(connection.clusterNodes());

@ -22,6 +22,7 @@ import org.redisson.async.AsyncOperation;
import org.redisson.async.SyncInterruptedOperation;
import org.redisson.async.SyncOperation;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
@ -33,16 +34,18 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
//TODO ping support
public interface ConnectionManager {
RedisClient createClient(String host, int port);
<V> V get(Future<V> future);
<V, R> R read(String key, SyncOperation<V, R> operation);
<V, R> R read(SyncOperation<V, R> operation);
<V, R> R write(String key, SyncInterruptedOperation<V, R> operation) throws InterruptedException;
<V, R> R write(SyncInterruptedOperation<V, R> operation) throws InterruptedException;
<V, R> R write(String key, SyncOperation<V, R> operation);
<V, R> R write(SyncOperation<V, R> operation);

@ -17,6 +17,10 @@ package org.redisson.connection;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
@ -46,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.RedisConnectionException;
import com.lambdaworks.redis.RedisException;
@ -71,6 +76,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected EventLoopGroup group;
protected Class<? extends SocketChannel> socketChannelClass;
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
protected MasterSlaveServersConfig config;
@ -92,15 +99,25 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void init(MasterSlaveServersConfig config) {
this.config = config;
MasterSlaveEntry entry = new MasterSlaveEntry(codec, group, config);
MasterSlaveEntry entry = new MasterSlaveEntry(codec, this, config);
entries.put(Integer.MAX_VALUE, entry);
}
protected void init(Config cfg) {
this.group = new NioEventLoopGroup(cfg.getThreads());
if (cfg.isUseLinuxNativeEpoll()) {
this.group = new EpollEventLoopGroup(cfg.getThreads());
this.socketChannelClass = EpollSocketChannel.class;
} else {
this.group = new NioEventLoopGroup(cfg.getThreads());
this.socketChannelClass = NioSocketChannel.class;
}
this.codec = new RedisCodecWrapper(cfg.getCodec());
}
public RedisClient createClient(String host, int port) {
return new RedisClient(group, socketChannelClass, host, port, config.getTimeout());
}
public <T> FutureListener<T> createReleaseWriteListener(final int slot,
final RedisConnection conn, final Timeout timeout) {
return new FutureListener<T>() {
@ -258,7 +275,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
});
}
public <V, R> R write(String key, SyncInterruptedOperation<V, R> operation) throws InterruptedException {
int slot = calcSlot(key);
return write(slot, operation, 0);

@ -15,8 +15,6 @@
*/
package org.redisson.connection;
import io.netty.channel.EventLoopGroup;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@ -47,11 +45,11 @@ public class MasterSlaveEntry {
final MasterSlaveServersConfig config;
final RedisCodec codec;
final EventLoopGroup group;
final ConnectionManager connectionManager;
public MasterSlaveEntry(RedisCodec codec, EventLoopGroup group, MasterSlaveServersConfig config) {
public MasterSlaveEntry(RedisCodec codec, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
this.codec = codec;
this.group = group;
this.connectionManager = connectionManager;
this.config = config;
slaveBalancer = config.getLoadBalancer();
@ -60,7 +58,7 @@ public class MasterSlaveEntry {
List<URI> addresses = new ArrayList<URI>(config.getSlaveAddresses());
addresses.add(config.getMasterAddress());
for (URI address : addresses) {
RedisClient client = new RedisClient(group, address.getHost(), address.getPort(), config.getTimeout());
RedisClient client = connectionManager.createClient(address.getHost(), address.getPort());
SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client,
config.getSlaveConnectionPoolSize(),
config.getSlaveSubscriptionConnectionPoolSize());
@ -74,7 +72,7 @@ public class MasterSlaveEntry {
}
public void setupMasterEntry(String host, int port) {
RedisClient masterClient = new RedisClient(group, host, port, config.getTimeout());
RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new ConnectionEntry(masterClient, config.getMasterConnectionPoolSize());
}
@ -85,7 +83,7 @@ public class MasterSlaveEntry {
public void addSlave(String host, int port) {
slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
RedisClient client = new RedisClient(group, host, port, config.getTimeout());
RedisClient client = connectionManager.createClient(host, port);
slaveBalancer.add(new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()));

@ -58,7 +58,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
final Set<String> addedSlaves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout());
RedisClient client = createClient(addr.getHost(), addr.getPort());
RedisAsyncConnection<String, String> connection = client.connectAsync();
// TODO async
@ -93,7 +93,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
final Set<String> freezeSlaves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
for (final URI addr : cfg.getSentinelAddresses()) {
RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout());
RedisClient client = createClient(addr.getHost(), addr.getPort());
sentinels.add(client);
RedisPubSubConnection<String, String> pubsub = client.connectPubSub();

@ -41,7 +41,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
protected void init(MasterSlaveServersConfig config) {
this.config = config;
SingleEntry entry = new SingleEntry(codec, group, config);
SingleEntry entry = new SingleEntry(codec, this, config);
entries.put(Integer.MAX_VALUE, entry);
}

@ -15,8 +15,6 @@
*/
package org.redisson.connection;
import io.netty.channel.EventLoopGroup;
import org.redisson.MasterSlaveServersConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,13 +29,13 @@ public class SingleEntry extends MasterSlaveEntry {
private final Logger log = LoggerFactory.getLogger(getClass());
public SingleEntry(RedisCodec codec, EventLoopGroup group, MasterSlaveServersConfig config) {
super(codec, group, config);
public SingleEntry(RedisCodec codec, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(codec, connectionManager, config);
}
@Override
public void setupMasterEntry(String host, int port) {
RedisClient masterClient = new RedisClient(group, host, port, config.getTimeout());
RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize());
}

Loading…
Cancel
Save