+sdown -sdown sentinel handling. lock expire experimental support.

pull/38/head
Nikita 11 years ago
parent 5cb99f2c7a
commit c9ed11750c

@ -561,7 +561,7 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
}
public Future<Long> publish(K channel, V message) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(channel).addValue(message);
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(channel.toString()).addValue(message);
return dispatch(PUBLISH, new IntegerOutput<K, V>(codec), args);
}

@ -180,7 +180,7 @@ public class RedisClient {
return connection;
} catch (Throwable e) {
throw new RedisConnectionException("Unable to connect", e);
throw new RedisConnectionException("Unable to connect " + addr, e);
}
}

@ -4,6 +4,7 @@ package com.lambdaworks.redis.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
import java.util.concurrent.BlockingQueue;
@ -59,6 +60,8 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler {
Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg;
ByteBuf buf = ctx.alloc().heapBuffer();
cmd.encode(buf);
// System.out.println("out: " + buf.toString(CharsetUtil.UTF_8));
ctx.write(buf, promise);
}

@ -15,6 +15,9 @@ import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A netty {@link ChannelHandler} responsible for monitoring the channel and
* reconnecting when the connection is lost.
@ -23,6 +26,9 @@ import java.util.concurrent.TimeUnit;
*/
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter{
private final Logger log = LoggerFactory.getLogger(getClass());
private Bootstrap bootstrap;
private Channel channel;
private ChannelGroup channels;
@ -84,31 +90,35 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter{
}
private void doReConnect(final EventLoop loop, final CommandHandler<?, ?> handler, final RedisAsyncConnection<?, ?> connection, final int attempts) {
if (reconnect) {
ChannelFuture connect;
synchronized (bootstrap) {
connect = bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(ConnectionWatchdog.this, handler, connection);
}
}).connect();
}
connect.addListener(new GenericFutureListener<ChannelFuture>() {
if (!reconnect) {
return;
}
log.debug("trying to reconnect {}", bootstrap);
ChannelFuture connect;
synchronized (bootstrap) {
connect = bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
int timeout = 2 << attempts;
loop.schedule(new Runnable() {
@Override
public void run() {
doReConnect(loop, handler, connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
}
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(ConnectionWatchdog.this, handler, connection);
}
});
}).connect();
}
connect.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
int timeout = 2 << attempts;
loop.schedule(new Runnable() {
@Override
public void run() {
doReConnect(loop, handler, connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
}
}
});
}
@Override

@ -19,8 +19,8 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> {
enum Type { message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe }
private Type type;
private K channel;
private K pattern;
private String channel;
private String pattern;
private long count;
public PubSubOutput(RedisCodec<K, V> codec) {
@ -31,11 +31,11 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> {
return type;
}
public K channel() {
public String channel() {
return channel;
}
public K pattern() {
public String pattern() {
return pattern;
}
@ -54,23 +54,28 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> {
switch (type) {
case pmessage:
if (pattern == null) {
pattern = codec.decodeKey(bytes);
pattern = decodeAscii(bytes);
break;
}
case message:
if (channel == null) {
channel = codec.decodeKey(bytes);
channel = decodeAscii(bytes);
break;
}
output = codec.decodeValue(bytes);
if (channel.startsWith("__keyspace@")
|| channel.startsWith("__keyevent@")) {
output = (V)decodeAscii(bytes);
} else {
output = codec.decodeValue(bytes);
}
break;
case psubscribe:
case punsubscribe:
pattern = codec.decodeKey(bytes);
pattern = decodeAscii(bytes);
break;
case subscribe:
case unsubscribe:
channel = codec.decodeKey(bytes);
channel = decodeAscii(bytes);
break;
}
}

@ -10,28 +10,28 @@ package com.lambdaworks.redis.pubsub;
*
* @author Will Glozer
*/
public class RedisPubSubAdapter<K, V> implements RedisPubSubListener<K, V> {
public class RedisPubSubAdapter<V> implements RedisPubSubListener<V> {
@Override
public void message(K channel, V message) {
public void message(String channel, V message) {
}
@Override
public void message(K pattern, K channel, V message) {
public void message(String pattern, String channel, V message) {
}
@Override
public void subscribed(K channel, long count) {
public void subscribed(String channel, long count) {
}
@Override
public void psubscribed(K pattern, long count) {
public void psubscribed(String pattern, long count) {
}
@Override
public void unsubscribed(K channel, long count) {
public void unsubscribed(String channel, long count) {
}
@Override
public void punsubscribed(K pattern, long count) {
public void punsubscribed(String pattern, long count) {
}
}

@ -39,9 +39,9 @@ import com.lambdaworks.redis.protocol.CommandArgs;
* @author Will Glozer
*/
public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
private final Queue<RedisPubSubListener<K, V>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<K, V>>();
private Set<K> channels;
private Set<K> patterns;
private final Queue<RedisPubSubListener<V>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<V>>();
private Set<String> channels;
private Set<String> patterns;
/**
* Initialize a new connection.
@ -54,8 +54,8 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
*/
public RedisPubSubConnection(RedisClient client, BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit, EventLoopGroup eventLoopGroup) {
super(client, queue, codec, timeout, unit, eventLoopGroup);
channels = new HashSet<K>();
patterns = new HashSet<K>();
channels = new HashSet<String>();
patterns = new HashSet<String>();
}
/**
@ -63,36 +63,32 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
*
* @param listener Listener.
*/
public void addListener(RedisPubSubListener<K, V> listener) {
public void addListener(RedisPubSubListener<V> listener) {
listeners.add(listener);
}
public Queue<RedisPubSubListener<K, V>> getListeners() {
return listeners;
}
/**
* Remove an existing listener.
*
* @param listener Listener.
*/
public void removeListener(RedisPubSubListener<K, V> listener) {
public void removeListener(RedisPubSubListener<V> listener) {
listeners.remove(listener);
}
public void psubscribe(K... patterns) {
public void psubscribe(String... patterns) {
dispatch(PSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
}
public void punsubscribe(K... patterns) {
public void punsubscribe(String... patterns) {
dispatch(PUNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
}
public void subscribe(K... channels) {
public void subscribe(String... channels) {
dispatch(SUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
}
public void unsubscribe(K... channels) {
public void unsubscribe(String... channels) {
dispatch(UNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
}
@ -101,7 +97,7 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
super.channelActive(ctx);
if (channels.size() > 0) {
subscribe(toArray(channels));
subscribe(channels.toArray(new String[channels.size()]));
channels.clear();
}
@ -115,7 +111,7 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
PubSubOutput<K, V> output = (PubSubOutput<K, V>) msg;
for (RedisPubSubListener<K, V> listener : listeners) {
for (RedisPubSubListener<V> listener : listeners) {
switch (output.type()) {
case message:
listener.message(output.channel(), output.get());
@ -143,9 +139,11 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
}
}
private CommandArgs<K, V> args(K... keys) {
private CommandArgs<K, V> args(String... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKeys(keys);
for (String key : keys) {
args.add(key.toString());
}
return args;
}

@ -9,14 +9,14 @@ package com.lambdaworks.redis.pubsub;
*
* @author Will Glozer
*/
public interface RedisPubSubListener<K, V> {
public interface RedisPubSubListener<V> {
/**
* Message received from a channel subscription.
*
* @param channel Channel.
* @param message Message.
*/
void message(K channel, V message);
void message(String channel, V message);
/**
* Message received from a pattern subscription.
@ -25,7 +25,7 @@ public interface RedisPubSubListener<K, V> {
* @param channel Channel.
* @param message Message.
*/
void message(K pattern, K channel, V message);
void message(String pattern, String channel, V message);
/**
* Subscribed to a channel.
@ -33,7 +33,7 @@ public interface RedisPubSubListener<K, V> {
* @param channel Channel
* @param count Subscription count.
*/
void subscribed(K channel, long count);
void subscribed(String channel, long count);
/**
* Subscribed to a pattern.
@ -41,7 +41,7 @@ public interface RedisPubSubListener<K, V> {
* @param pattern Pattern.
* @param count Subscription count.
*/
void psubscribed(K pattern, long count);
void psubscribed(String pattern, long count);
/**
* Unsubscribed from a channel.
@ -49,7 +49,7 @@ public interface RedisPubSubListener<K, V> {
* @param channel Channel
* @param count Subscription count.
*/
void unsubscribed(K channel, long count);
void unsubscribed(String channel, long count);
/**
* Unsubscribed from a pattern.
@ -57,5 +57,5 @@ public interface RedisPubSubListener<K, V> {
* @param pattern Channel
* @param count Subscription count.
*/
void punsubscribed(K pattern, long count);
void punsubscribed(String pattern, long count);
}

@ -26,7 +26,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
* @param <K>
* @param <V>
*/
public class RedisPubSubTopicListenerWrapper<K, V> extends RedisPubSubAdapter<K, V> {
public class RedisPubSubTopicListenerWrapper<V> extends RedisPubSubAdapter<V> {
private final MessageListener<V> listener;
private final String name;
@ -42,7 +42,7 @@ public class RedisPubSubTopicListenerWrapper<K, V> extends RedisPubSubAdapter<K,
}
@Override
public void message(K channel, V message) {
public void message(String channel, V message) {
// could be subscribed to multiple channels
if (name.equals(channel)) {
listener.onMessage(message);

@ -190,7 +190,7 @@ public class Redisson {
* @return distributed "count down latch"
*/
public RCountDownLatch getCountDownLatch(String name) {
return new RedissonCountDownLatch(connectionManager, name);
return new RedissonCountDownLatch(connectionManager, name, id);
}
/**

@ -18,12 +18,12 @@ package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.RCountDownLatch;
import com.lambdaworks.redis.RedisConnection;
@ -46,11 +46,12 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
private static final Integer newCountMessage = 1;
private static final ConcurrentMap<String, RedissonCountDownLatchEntry> ENTRIES = new ConcurrentHashMap<String, RedissonCountDownLatchEntry>();
private final UUID id;
private PubSubConnectionEntry pubSubEntry;
RedissonCountDownLatch(ConnectionManager connectionManager, String name) {
RedissonCountDownLatch(ConnectionManager connectionManager, String name, UUID id) {
super(connectionManager, name);
this.id = id;
}
private Future<Boolean> subscribe() {
@ -62,7 +63,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
Promise<Boolean> newPromise = newPromise();
final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise);
value.aquire();
RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getName(), value);
RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) {
@ -71,7 +72,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return oldPromise;
}
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
RedisPubSubAdapter<Integer> listener = new RedisPubSubAdapter<Integer>() {
@Override
public void subscribed(String channel, long count) {
@ -95,19 +96,19 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
};
pubSubEntry = connectionManager.subscribe(listener, getChannelName());
connectionManager.subscribe(listener, getChannelName());
return newPromise;
}
private void release() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getName());
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry == null) {
return;
}
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.release();
if (ENTRIES.replace(getName(), entry, newEntry)) {
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
return;
}
}
@ -115,11 +116,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
private Promise<Boolean> aquire() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getName());
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.aquire();
if (ENTRIES.replace(getName(), entry, newEntry)) {
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
return newEntry.getPromise();
}
} else {
@ -135,7 +136,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
while (getCountInner() > 0) {
// waiting for open state
RedissonCountDownLatchEntry entry = ENTRIES.get(getName());
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.getLatch().await();
}
@ -161,7 +162,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
long current = System.currentTimeMillis();
// waiting for open state
RedissonCountDownLatchEntry entry = ENTRIES.get(getName());
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.getLatch().await(time, TimeUnit.MILLISECONDS);
}
@ -207,6 +208,10 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
}
private String getEntryName() {
return id + getName();
}
private String getChannelName() {
return groupName + getName();
}
@ -282,7 +287,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
} finally {
close();
ENTRIES.remove(getName());
ENTRIES.remove(getEntryName());
}
}
@ -292,11 +297,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
connectionManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
RedissonCountDownLatchEntry entry = ENTRIES.get(getName());
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null
&& entry.isFree()
&& ENTRIES.remove(getName(), entry)) {
connectionManager.unsubscribe(pubSubEntry, getChannelName());
&& ENTRIES.remove(getEntryName(), entry)) {
connectionManager.unsubscribe(getChannelName());
}
}
}, 15, TimeUnit.SECONDS);

@ -19,6 +19,7 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.io.Serializable;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -26,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.RLock;
import com.lambdaworks.redis.RedisConnection;
@ -110,8 +110,6 @@ public class RedissonLock extends RedissonObject implements RLock {
private static final ConcurrentMap<String, RedissonLockEntry> ENTRIES = new ConcurrentHashMap<String, RedissonLockEntry>();
private PubSubConnectionEntry pubSubEntry;
RedissonLock(ConnectionManager connectionManager, String name, UUID id) {
super(connectionManager, name);
this.id = id;
@ -119,25 +117,29 @@ public class RedissonLock extends RedissonObject implements RLock {
private void release() {
while (true) {
RedissonLockEntry entry = ENTRIES.get(getName());
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry == null) {
return;
}
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.release();
if (ENTRIES.replace(getName(), entry, newEntry)) {
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
return;
}
}
}
private String getEntryName() {
return id + getName();
}
private Promise<Boolean> aquire() {
while (true) {
RedissonLockEntry entry = ENTRIES.get(getName());
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.aquire();
if (ENTRIES.replace(getName(), entry, newEntry)) {
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
return newEntry.getPromise();
}
} else {
@ -155,7 +157,7 @@ public class RedissonLock extends RedissonObject implements RLock {
Promise<Boolean> newPromise = newPromise();
final RedissonLockEntry value = new RedissonLockEntry(newPromise);
value.aquire();
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getName(), value);
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) {
@ -164,9 +166,10 @@ public class RedissonLock extends RedissonObject implements RLock {
return oldPromise;
}
// init();
value.getLatch().acquireUninterruptibly();
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
RedisPubSubAdapter<Object> listener = new RedisPubSubAdapter<Object>() {
@Override
public void subscribed(String channel, long count) {
@ -176,7 +179,7 @@ public class RedissonLock extends RedissonObject implements RLock {
}
@Override
public void message(String channel, Integer message) {
public void message(String channel, Object message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
value.getLatch().release();
}
@ -184,10 +187,43 @@ public class RedissonLock extends RedissonObject implements RLock {
};
pubSubEntry = connectionManager.subscribe(listener, getChannelName());
connectionManager.subscribe(listener, getChannelName());
RedisPubSubAdapter<Object> expireListener = new RedisPubSubAdapter<Object>() {
@Override
public void message(String channel, Object message) {
if (getExpireChannelName().equals(channel)
&& "expired".equals(message)) {
forceUnlock();
}
}
};
connectionManager.subscribe(expireListener, getExpireChannelName());
return newPromise;
}
private String getExpireChannelName() {
return "__keyspace@0__:\"" + getName() + "\"";
}
/**
* Turning on the notify-keyspace-events for Keyevent events from Expired keys
*
*/
private void init() {
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
try {
if (!conn.configSet("notify-keyspace-events", "KEx").equals("OK")) {
throw new IllegalStateException();
}
} finally {
connectionManager.releaseWrite(conn);
}
}
@Override
public void lock() {
try {
@ -198,10 +234,6 @@ public class RedissonLock extends RedissonObject implements RLock {
}
}
private String getKeyName() {
return "redisson__lock__" + getName();
}
private String getChannelName() {
return "redisson__lock__channel__" + getName();
}
@ -210,7 +242,7 @@ public class RedissonLock extends RedissonObject implements RLock {
public void lockInterruptibly() throws InterruptedException {
while (!tryLock()) {
// waiting for message
RedissonLockEntry entry = ENTRIES.get(getName());
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.getLatch().acquire();
}
@ -235,12 +267,12 @@ public class RedissonLock extends RedissonObject implements RLock {
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
try {
Boolean res = connection.setnx(getKeyName(), currentLock);
Boolean res = connection.setnx(getName(), currentLock);
if (!res) {
LockValue lock = (LockValue) connection.get(getKeyName());
LockValue lock = (LockValue) connection.get(getName());
if (lock != null && lock.equals(currentLock)) {
lock.incCounter();
connection.set(getKeyName(), lock);
connection.set(getName(), lock);
return true;
}
}
@ -265,7 +297,7 @@ public class RedissonLock extends RedissonObject implements RLock {
}
long current = System.currentTimeMillis();
// waiting for message
RedissonLockEntry entry = ENTRIES.get(getName());
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
@ -286,12 +318,12 @@ public class RedissonLock extends RedissonObject implements RLock {
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
try {
LockValue lock = (LockValue) connection.get(getKeyName());
LockValue lock = (LockValue) connection.get(getName());
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
if (lock != null && lock.equals(currentLock)) {
if (lock.getCounter() > 1) {
lock.decCounter();
connection.set(getKeyName(), lock);
connection.set(getName(), lock);
} else {
unlock(connection);
}
@ -312,9 +344,10 @@ public class RedissonLock extends RedissonObject implements RLock {
int counter = 0;
while (counter < 5) {
connection.multi();
connection.del(getKeyName());
connection.del(getName());
connection.publish(getChannelName(), unlockMessage);
if (connection.exec().size() == 2) {
List<Object> res = connection.exec();
if (res.size() == 2) {
return;
}
counter++;
@ -337,10 +370,7 @@ public class RedissonLock extends RedissonObject implements RLock {
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
try {
LockValue lock = (LockValue) connection.get(getKeyName());
if (lock != null) {
unlock(connection);
}
unlock(connection);
} finally {
connectionManager.releaseWrite(connection);
}
@ -357,7 +387,7 @@ public class RedissonLock extends RedissonObject implements RLock {
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();
try {
LockValue lock = (LockValue) connection.get(getKeyName());
LockValue lock = (LockValue) connection.get(getName());
return lock != null;
} finally {
connectionManager.releaseRead(connection);
@ -375,7 +405,7 @@ public class RedissonLock extends RedissonObject implements RLock {
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();
try {
LockValue lock = (LockValue) connection.get(getKeyName());
LockValue lock = (LockValue) connection.get(getName());
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
return lock != null && lock.equals(currentLock);
} finally {
@ -394,7 +424,7 @@ public class RedissonLock extends RedissonObject implements RLock {
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();
try {
LockValue lock = (LockValue) connection.get(getKeyName());
LockValue lock = (LockValue) connection.get(getName());
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
if (lock != null && lock.equals(currentLock)) {
return lock.getCounter();
@ -411,7 +441,7 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public void delete() {
forceUnlock();
ENTRIES.remove(getName());
ENTRIES.remove(getEntryName());
}
public void close() {
@ -420,11 +450,12 @@ public class RedissonLock extends RedissonObject implements RLock {
connectionManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
RedissonLockEntry entry = ENTRIES.get(getName());
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry != null
&& entry.isFree()
&& ENTRIES.remove(getName(), entry)) {
connectionManager.unsubscribe(pubSubEntry, getChannelName());
&& ENTRIES.remove(getEntryName(), entry)) {
connectionManager.unsubscribe(getChannelName());
// connectionManager.unsubscribe(getExpireChannelName());
}
}
}, 15, TimeUnit.SECONDS);

@ -50,15 +50,15 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
@Override
public int addListener(MessageListener<M> listener) {
RedisPubSubTopicListenerWrapper<String, M> pubSubListener = new RedisPubSubTopicListenerWrapper<String, M>(listener, getName());
RedisPubSubTopicListenerWrapper<M> pubSubListener = new RedisPubSubTopicListenerWrapper<M>(listener, getName());
return addListener(pubSubListener);
}
private int addListener(RedisPubSubTopicListenerWrapper<String, M> pubSubListener) {
private int addListener(RedisPubSubTopicListenerWrapper<M> pubSubListener) {
PubSubConnectionEntry entry = connectionManager.subscribe(getName());
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(pubSubListener);
entry.addListener(getName(), pubSubListener);
return pubSubListener.hashCode();
}
}
@ -74,8 +74,8 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
}
synchronized (entry) {
if (entry.isActive()) {
entry.removeListener(listenerId);
connectionManager.unsubscribe(entry, getName());
entry.removeListener(getName(), listenerId);
connectionManager.unsubscribe(getName());
return;
}
}

@ -52,6 +52,30 @@ abstract class BaseLoadBalancer implements LoadBalancer {
clientsEmpty.open();
}
public void unfreeze(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (ConnectionEntry connectionEntry : clients) {
if (!connectionEntry.getClient().getAddr().equals(addr)) {
continue;
}
connectionEntry.setFreezed(false);
}
throw new IllegalStateException("Can't find " + addr + " in slaves!");
}
public Queue<RedisPubSubConnection> freeze(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (ConnectionEntry connectionEntry : clients) {
if (!connectionEntry.getClient().getAddr().equals(addr)) {
continue;
}
connectionEntry.setFreezed(true);
return connectionEntry.getSubscribeConnections();
}
throw new IllegalStateException("Can't find " + addr + " in slaves!");
}
public Queue<RedisPubSubConnection> remove(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (Iterator<ConnectionEntry> iterator = clients.iterator(); iterator.hasNext();) {
@ -60,8 +84,8 @@ abstract class BaseLoadBalancer implements LoadBalancer {
continue;
}
log.info("slave {} removed", entry.getClient().getAddr());
iterator.remove();
log.info("slave {} removed", entry.getClient().getAddr());
if (clients.isEmpty()) {
clientsEmpty.close();
}
@ -93,7 +117,8 @@ abstract class BaseLoadBalancer implements LoadBalancer {
int index = getIndex(clientsCopy);
ConnectionEntry entry = clientsCopy.get(index);
if (!entry.getSubscribeConnectionsSemaphore().tryAcquire()) {
if (!entry.getSubscribeConnectionsSemaphore().tryAcquire()
|| entry.isFreezed()) {
clientsCopy.remove(index);
} else {
try {
@ -134,7 +159,8 @@ abstract class BaseLoadBalancer implements LoadBalancer {
int index = getIndex(clientsCopy);
ConnectionEntry entry = clientsCopy.get(index);
if (!entry.getConnectionsSemaphore().tryAcquire()) {
if (!entry.getConnectionsSemaphore().tryAcquire()
|| entry.isFreezed()) {
clientsCopy.remove(index);
} else {
RedisConnection conn = entry.getConnections().poll();

@ -25,6 +25,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public class ConnectionEntry {
private volatile boolean freezed;
private final RedisClient client;
private final Semaphore subscribeConnectionsSemaphore;
@ -47,6 +48,14 @@ public class ConnectionEntry {
return client;
}
public boolean isFreezed() {
return freezed;
}
public void setFreezed(boolean freezed) {
this.freezed = freezed;
}
public void shutdown() {
connectionsSemaphore.acquireUninterruptibly(poolSize);
client.shutdown();

@ -16,28 +16,10 @@
package org.redisson.connection;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.FutureListener;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import org.redisson.Config;
import org.redisson.codec.RedisCodecWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
/**
*
@ -47,8 +29,6 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
//TODO ping support
public interface ConnectionManager {
void changeMaster(String host, int port);
<T> FutureListener<T> createReleaseWriteListener(final RedisConnection conn);
<T> FutureListener<T> createReleaseReadListener(final RedisConnection conn);
@ -61,9 +41,9 @@ public interface ConnectionManager {
<K, V> PubSubConnectionEntry subscribe(String channelName);
<K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, String channelName);
<K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<V> listener, String channelName);
void unsubscribe(PubSubConnectionEntry entry, String channelName);
void unsubscribe(String channelName);
void releaseWrite(RedisConnection сonnection);

@ -23,6 +23,10 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public interface LoadBalancer {
void unfreeze(String host, int port);
Queue<RedisPubSubConnection> freeze(String host, int port);
void init(RedisCodec codec, String password);
void add(ConnectionEntry entry);

@ -21,10 +21,9 @@ import io.netty.util.concurrent.FutureListener;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
@ -63,7 +62,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
private LoadBalancer balancer;
protected LoadBalancer balancer;
private final List<RedisClient> slaveClients = new ArrayList<RedisClient>();
protected volatile RedisClient masterClient;
@ -105,7 +104,22 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.codec = new RedisCodecWrapper(cfg.getCodec());
}
public void changeMaster(String host, int port) {
protected void slaveDown(String host, int port) {
Queue<RedisPubSubConnection> connections = balancer.freeze(host, port);
reattachListeners(connections);
}
protected void slaveUp(String host, int port) {
balancer.unfreeze(host, port);
}
/**
* Remove slave with <code>host:port</code> from slaves list.
* Re-attach pub/sub listeners from it to other slave.
* Shutdown old master client.
*
*/
protected void changeMaster(String host, int port) {
RedisClient oldMaster = masterClient;
masterClient = new RedisClient(group, host, port);
Queue<RedisPubSubConnection> connections = balancer.remove(host, port);
@ -116,22 +130,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private void reattachListeners(Queue<RedisPubSubConnection> connections) {
for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {
for (RedisPubSubConnection redisPubSubConnection : connections) {
if (!mapEntry.getValue().getConnection().equals(redisPubSubConnection)) {
PubSubConnectionEntry entry = mapEntry.getValue();
String channelName = mapEntry.getKey();
if (!entry.getConnection().equals(redisPubSubConnection)) {
continue;
}
PubSubConnectionEntry entry = mapEntry.getValue();
String channelName = mapEntry.getKey();
synchronized (entry) {
entry.close();
unsubscribeEntry(entry, channelName);
unsubscribe(channelName);
List<RedisPubSubListener> listeners = entry.getListeners(channelName);
Collection<RedisPubSubListener> listeners = entry.getListeners(channelName);
if (!listeners.isEmpty()) {
PubSubConnectionEntry newEntry = subscribe(mapEntry.getKey());
PubSubConnectionEntry newEntry = subscribe(channelName);
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(redisPubSubListener);
newEntry.addListener(channelName, redisPubSubListener);
}
}
}
@ -224,7 +239,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, String channelName) {
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<V> listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
@ -267,16 +282,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public void unsubscribe(PubSubConnectionEntry entry, String channelName) {
if (entry.hasListeners(channelName)) {
public void unsubscribe(String channelName) {
PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return;
}
unsubscribeEntry(entry, channelName);
}
private void unsubscribeEntry(PubSubConnectionEntry entry, String channelName) {
name2PubSubConnection.remove(channelName);
entry.unsubscribe(channelName);
if (entry.tryClose()) {
returnSubscribeConnection(entry);

@ -15,12 +15,14 @@
*/
package org.redisson.connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import org.redisson.RedisPubSubTopicListenerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,6 +40,7 @@ public class PubSubConnectionEntry {
private final Semaphore subscribedChannelsAmount;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
private final ConcurrentMap<String, Queue<RedisPubSubListener>> channelListeners = new ConcurrentHashMap<String, Queue<RedisPubSubListener>>();
public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
super();
@ -46,7 +49,32 @@ public class PubSubConnectionEntry {
this.subscribedChannelsAmount = new Semaphore(subscriptionsPerConnection);
}
public void addListener(RedisPubSubListener listener) {
public Collection<RedisPubSubListener> getListeners(String channelName) {
Collection<RedisPubSubListener> result = channelListeners.get(channelName);
if (result == null) {
return Collections.emptyList();
}
return result;
}
public void addListener(String channelName, RedisPubSubListener listener) {
Queue<RedisPubSubListener> queue = channelListeners.get(channelName);
if (queue == null) {
queue = new ConcurrentLinkedQueue<RedisPubSubListener>();
Queue<RedisPubSubListener> oldQueue = channelListeners.putIfAbsent(channelName, queue);
if (oldQueue != null) {
queue = oldQueue;
}
}
synchronized (queue) {
if (channelListeners.get(channelName) != queue) {
addListener(channelName, listener);
return;
}
queue.add(listener);
}
conn.addListener(listener);
}
@ -58,44 +86,24 @@ public class PubSubConnectionEntry {
status = Status.INACTIVE;
}
public List<RedisPubSubListener> getListeners(String channelName) {
List<RedisPubSubListener> result = new ArrayList<RedisPubSubListener>();
Queue<RedisPubSubListener> queue = conn.getListeners();
for (RedisPubSubListener listener : queue) {
if (!(listener instanceof RedisPubSubTopicListenerWrapper)) {
continue;
}
RedisPubSubTopicListenerWrapper entry = (RedisPubSubTopicListenerWrapper) listener;
if (entry.getName().equals(channelName)) {
result.add(entry);
}
}
return result;
}
// TODO optimize
public boolean hasListeners(String channelName) {
return !getListeners(channelName).isEmpty();
}
// TODO optimize
public void removeListener(int listenerId) {
Queue<RedisPubSubListener> queue = conn.getListeners();
for (RedisPubSubListener listener : queue) {
if (!(listener instanceof RedisPubSubTopicListenerWrapper)) {
continue;
}
RedisPubSubTopicListenerWrapper entry = (RedisPubSubTopicListenerWrapper) listener;
if (entry.hashCode() == listenerId) {
removeListener(entry);
public void removeListener(String channelName, int listenerId) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channelName);
for (RedisPubSubListener listener : listeners) {
if (listener.hashCode() == listenerId) {
removeListener(channelName, listener);
break;
}
}
}
public void removeListener(RedisPubSubListener listener) {
public void removeListener(String channelName, RedisPubSubListener listener) {
Queue<RedisPubSubListener> queue = channelListeners.get(channelName);
synchronized (queue) {
if (queue.remove(listener)) {
channelListeners.remove(channelName, new ConcurrentLinkedQueue<RedisPubSubListener>());
}
}
conn.removeListener(listener);
}
@ -110,12 +118,12 @@ public class PubSubConnectionEntry {
public void subscribe(final String channelName) {
conn.addListener(new RedisPubSubAdapter() {
@Override
public void subscribed(Object channel, long count) {
public void subscribed(String channel, long count) {
log.debug("subscribed to '{}' channel", channelName);
}
@Override
public void unsubscribed(Object channel, long count) {
public void unsubscribed(String channel, long count) {
log.debug("unsubscribed from '{}' channel", channelName);
}
});
@ -123,7 +131,7 @@ public class PubSubConnectionEntry {
}
public void subscribe(RedisPubSubAdapter listener, Object channel) {
public void subscribe(RedisPubSubAdapter listener, String channel) {
conn.addListener(listener);
conn.subscribe(channel);
}

@ -1,6 +1,7 @@
package org.redisson;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@ -21,9 +22,34 @@ public class RedissonLockTest extends BaseConcurrentTest {
@After
public void after() {
redisson.shutdown();
try {
redisson.flushdb();
} finally {
redisson.shutdown();
}
}
// @Test
public void testExpire() throws InterruptedException {
RLock lock = redisson.getLock("lock");
lock.lock();
// lock.expire(2, TimeUnit.SECONDS);
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
public void run() {
RLock lock1 = redisson.getLock("lock");
lock1.lock();
lock1.unlock();
latch.countDown();
};
}.start();
latch.await();
lock.unlock();
}
@Test
public void testGetHoldCount() {
RLock lock = redisson.getLock("lock");
@ -156,9 +182,14 @@ public class RedissonLockTest extends BaseConcurrentTest {
@Override
public void run(Redisson redisson) {
Lock lock = redisson.getLock("testConcurrency_SingleInstance");
System.out.println("lock1 " + Thread.currentThread().getId());
lock.lock();
System.out.println("lock2 "+ Thread.currentThread().getId());
lockedCounter.set(lockedCounter.get() + 1);
System.out.println("lockedCounter " + lockedCounter);
System.out.println("unlock1 "+ Thread.currentThread().getId());
lock.unlock();
System.out.println("unlock2 "+ Thread.currentThread().getId());
}
});

Loading…
Cancel
Save