RObject implemented by Redisson objects

pull/6/head
Nikita
parent 173112ccb8
commit 3fa9e736ad

@ -140,7 +140,7 @@ public class Redisson {
RedisConnection<Object, Object> connection = connect(); RedisConnection<Object, Object> connection = connect();
RedisPubSubConnection<Object, Object> pubSubConnection = connectPubSub(); RedisPubSubConnection<Object, Object> pubSubConnection = connectPubSub();
topic = new RedissonTopic<M>(pubSubConnection, connection, name); topic = new RedissonTopic<M>(this, pubSubConnection, connection, name);
RedissonTopic<M> oldTopic = topicsMap.putIfAbsent(name, topic); RedissonTopic<M> oldTopic = topicsMap.putIfAbsent(name, topic);
if (oldTopic != null) { if (oldTopic != null) {
connection.close(); connection.close();
@ -194,7 +194,7 @@ public class Redisson {
RedisConnection<Object, Object> connection = connect(); RedisConnection<Object, Object> connection = connect();
RedisPubSubConnection<Object, Object> pubSubConnection = connectPubSub(); RedisPubSubConnection<Object, Object> pubSubConnection = connectPubSub();
latch = new RedissonCountDownLatch(pubSubConnection, connection, name); latch = new RedissonCountDownLatch(this, pubSubConnection, connection, name);
RedissonCountDownLatch oldLatch = latchesMap.putIfAbsent(name, latch); RedissonCountDownLatch oldLatch = latchesMap.putIfAbsent(name, latch);
if (oldLatch != null) { if (oldLatch != null) {
connection.close(); connection.close();
@ -220,6 +220,18 @@ public class Redisson {
if (robject instanceof RedissonLock) { if (robject instanceof RedissonLock) {
locksMap.remove(robject.getName()); locksMap.remove(robject.getName());
} }
if (robject instanceof RedissonSet) {
setsMap.remove(robject.getName());
}
if (robject instanceof RedissonTopic) {
topicsMap.remove(robject.getName());
}
if (robject instanceof RedissonAtomicLong) {
atomicLongsMap.remove(robject.getName());
}
if (robject instanceof RedissonCountDownLatch) {
latchesMap.remove(robject.getName());
}
} }
public void shutdown() { public void shutdown() {

@ -107,4 +107,17 @@ public class RedissonAtomicLong implements RAtomicLong {
return Long.toString(get()); return Long.toString(get());
} }
@Override
public String getName() {
return name;
}
@Override
public void destroy() {
connection.close();
redisson.remove(this);
}
} }

@ -32,6 +32,7 @@ public class RedissonCountDownLatch implements RCountDownLatch {
private final CountDownLatch subscribeLatch = new CountDownLatch(1); private final CountDownLatch subscribeLatch = new CountDownLatch(1);
private final RedisPubSubConnection<Object, Object> pubSubConnection; private final RedisPubSubConnection<Object, Object> pubSubConnection;
private final RedisConnection<Object, Object> connection; private final RedisConnection<Object, Object> connection;
private final Redisson redisson;
private final String groupName = "redisson_countdownlatch_"; private final String groupName = "redisson_countdownlatch_";
private final String name; private final String name;
@ -42,10 +43,11 @@ public class RedissonCountDownLatch implements RCountDownLatch {
private final ThreadLocalSemaphore msg = new ThreadLocalSemaphore(); private final ThreadLocalSemaphore msg = new ThreadLocalSemaphore();
RedissonCountDownLatch(RedisPubSubConnection<Object, Object> pubSubConnection, RedisConnection<Object, Object> connection, String name) { RedissonCountDownLatch(Redisson redisson, RedisPubSubConnection<Object, Object> pubSubConnection, RedisConnection<Object, Object> connection, String name) {
this.connection = connection; this.connection = connection;
this.name = name; this.name = name;
this.pubSubConnection = pubSubConnection; this.pubSubConnection = pubSubConnection;
this.redisson = redisson;
} }
@ -140,4 +142,17 @@ public class RedissonCountDownLatch implements RCountDownLatch {
return connection.setnx(name, count); return connection.setnx(name, count);
} }
@Override
public String getName() {
return name;
}
@Override
public void destroy() {
connection.close();
pubSubConnection.close();
redisson.remove(this);
}
} }

@ -37,11 +37,13 @@ public class RedissonTopic<M> implements RTopic<M> {
private final RedisPubSubConnection<Object, Object> pubSubConnection; private final RedisPubSubConnection<Object, Object> pubSubConnection;
private final RedisConnection<Object, Object> connection; private final RedisConnection<Object, Object> connection;
private final String name; private final String name;
private final Redisson redisson;
RedissonTopic(RedisPubSubConnection<Object, Object> pubSubConnection, RedisConnection<Object, Object> connection, final String name) { RedissonTopic(Redisson redisson, RedisPubSubConnection<Object, Object> pubSubConnection, RedisConnection<Object, Object> connection, final String name) {
this.pubSubConnection = pubSubConnection; this.pubSubConnection = pubSubConnection;
this.name = name; this.name = name;
this.connection = connection; this.connection = connection;
this.redisson = redisson;
} }
public void subscribe() { public void subscribe() {
@ -86,5 +88,18 @@ public class RedissonTopic<M> implements RTopic<M> {
pubSubConnection.removeListener(list); pubSubConnection.removeListener(list);
} }
@Override
public String getName() {
return name;
}
@Override
public void destroy() {
connection.close();
pubSubConnection.close();
redisson.remove(this);
}
} }

@ -15,7 +15,7 @@
*/ */
package org.redisson.core; package org.redisson.core;
public interface RAtomicLong { public interface RAtomicLong extends RObject {
long getAndDecrement(); long getAndDecrement();

@ -17,7 +17,7 @@ package org.redisson.core;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface RCountDownLatch { public interface RCountDownLatch extends RObject {
void await() throws InterruptedException; void await() throws InterruptedException;

@ -25,7 +25,7 @@ public interface RObject {
String getName(); String getName();
/** /**
* Closes connections and releases resources engaged by this object * Releases resources engaged by this object
* *
*/ */
void destroy(); void destroy();

@ -21,7 +21,7 @@ package org.redisson.core;
* *
* @param <M> the type of message object * @param <M> the type of message object
*/ */
public interface RTopic<M> { public interface RTopic<M> extends RObject {
/** /**
* Publish the message to all subscribers of this topic * Publish the message to all subscribers of this topic

Loading…
Cancel
Save