From 3fa9e736ad87750b2de3a6e44c74ee3a8d7fb09b Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 6 Jan 2014 14:17:05 +0400 Subject: [PATCH] RObject implemented by Redisson objects --- src/main/java/org/redisson/Redisson.java | 16 ++++++++++++++-- .../java/org/redisson/RedissonAtomicLong.java | 13 +++++++++++++ .../org/redisson/RedissonCountDownLatch.java | 17 ++++++++++++++++- src/main/java/org/redisson/RedissonTopic.java | 17 ++++++++++++++++- .../java/org/redisson/core/RAtomicLong.java | 2 +- .../java/org/redisson/core/RCountDownLatch.java | 2 +- src/main/java/org/redisson/core/RObject.java | 2 +- src/main/java/org/redisson/core/RTopic.java | 2 +- 8 files changed, 63 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 10a0c7bab..a83b19788 100644 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -140,7 +140,7 @@ public class Redisson { RedisConnection connection = connect(); RedisPubSubConnection pubSubConnection = connectPubSub(); - topic = new RedissonTopic(pubSubConnection, connection, name); + topic = new RedissonTopic(this, pubSubConnection, connection, name); RedissonTopic oldTopic = topicsMap.putIfAbsent(name, topic); if (oldTopic != null) { connection.close(); @@ -194,7 +194,7 @@ public class Redisson { RedisConnection connection = connect(); RedisPubSubConnection pubSubConnection = connectPubSub(); - latch = new RedissonCountDownLatch(pubSubConnection, connection, name); + latch = new RedissonCountDownLatch(this, pubSubConnection, connection, name); RedissonCountDownLatch oldLatch = latchesMap.putIfAbsent(name, latch); if (oldLatch != null) { connection.close(); @@ -220,6 +220,18 @@ public class Redisson { if (robject instanceof RedissonLock) { locksMap.remove(robject.getName()); } + if (robject instanceof RedissonSet) { + setsMap.remove(robject.getName()); + } + if (robject instanceof RedissonTopic) { + topicsMap.remove(robject.getName()); + } + if (robject instanceof RedissonAtomicLong) { + atomicLongsMap.remove(robject.getName()); + } + if (robject instanceof RedissonCountDownLatch) { + latchesMap.remove(robject.getName()); + } } public void shutdown() { diff --git a/src/main/java/org/redisson/RedissonAtomicLong.java b/src/main/java/org/redisson/RedissonAtomicLong.java index be06e1ad8..483fca911 100644 --- a/src/main/java/org/redisson/RedissonAtomicLong.java +++ b/src/main/java/org/redisson/RedissonAtomicLong.java @@ -107,4 +107,17 @@ public class RedissonAtomicLong implements RAtomicLong { return Long.toString(get()); } + + @Override + public String getName() { + return name; + } + + + @Override + public void destroy() { + connection.close(); + redisson.remove(this); + } + } diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index 4083f50ac..39e5446a0 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -32,6 +32,7 @@ public class RedissonCountDownLatch implements RCountDownLatch { private final CountDownLatch subscribeLatch = new CountDownLatch(1); private final RedisPubSubConnection pubSubConnection; private final RedisConnection connection; + private final Redisson redisson; private final String groupName = "redisson_countdownlatch_"; private final String name; @@ -42,10 +43,11 @@ public class RedissonCountDownLatch implements RCountDownLatch { private final ThreadLocalSemaphore msg = new ThreadLocalSemaphore(); - RedissonCountDownLatch(RedisPubSubConnection pubSubConnection, RedisConnection connection, String name) { + RedissonCountDownLatch(Redisson redisson, RedisPubSubConnection pubSubConnection, RedisConnection connection, String name) { this.connection = connection; this.name = name; this.pubSubConnection = pubSubConnection; + this.redisson = redisson; } @@ -140,4 +142,17 @@ public class RedissonCountDownLatch implements RCountDownLatch { return connection.setnx(name, count); } + @Override + public String getName() { + return name; + } + + @Override + public void destroy() { + connection.close(); + pubSubConnection.close(); + + redisson.remove(this); + } + } diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index bdc894159..60ac70c7f 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -37,11 +37,13 @@ public class RedissonTopic implements RTopic { private final RedisPubSubConnection pubSubConnection; private final RedisConnection connection; private final String name; + private final Redisson redisson; - RedissonTopic(RedisPubSubConnection pubSubConnection, RedisConnection connection, final String name) { + RedissonTopic(Redisson redisson, RedisPubSubConnection pubSubConnection, RedisConnection connection, final String name) { this.pubSubConnection = pubSubConnection; this.name = name; this.connection = connection; + this.redisson = redisson; } public void subscribe() { @@ -86,5 +88,18 @@ public class RedissonTopic implements RTopic { pubSubConnection.removeListener(list); } + @Override + public String getName() { + return name; + } + + @Override + public void destroy() { + connection.close(); + pubSubConnection.close(); + + redisson.remove(this); + } + } diff --git a/src/main/java/org/redisson/core/RAtomicLong.java b/src/main/java/org/redisson/core/RAtomicLong.java index af99add24..1ac2b8bb8 100644 --- a/src/main/java/org/redisson/core/RAtomicLong.java +++ b/src/main/java/org/redisson/core/RAtomicLong.java @@ -15,7 +15,7 @@ */ package org.redisson.core; -public interface RAtomicLong { +public interface RAtomicLong extends RObject { long getAndDecrement(); diff --git a/src/main/java/org/redisson/core/RCountDownLatch.java b/src/main/java/org/redisson/core/RCountDownLatch.java index 3cc163f02..6fb752e7a 100644 --- a/src/main/java/org/redisson/core/RCountDownLatch.java +++ b/src/main/java/org/redisson/core/RCountDownLatch.java @@ -17,7 +17,7 @@ package org.redisson.core; import java.util.concurrent.TimeUnit; -public interface RCountDownLatch { +public interface RCountDownLatch extends RObject { void await() throws InterruptedException; diff --git a/src/main/java/org/redisson/core/RObject.java b/src/main/java/org/redisson/core/RObject.java index ac0585856..6e8ad0297 100644 --- a/src/main/java/org/redisson/core/RObject.java +++ b/src/main/java/org/redisson/core/RObject.java @@ -25,7 +25,7 @@ public interface RObject { String getName(); /** - * Closes connections and releases resources engaged by this object + * Releases resources engaged by this object * */ void destroy(); diff --git a/src/main/java/org/redisson/core/RTopic.java b/src/main/java/org/redisson/core/RTopic.java index 101dc47bd..ae882a3bd 100644 --- a/src/main/java/org/redisson/core/RTopic.java +++ b/src/main/java/org/redisson/core/RTopic.java @@ -21,7 +21,7 @@ package org.redisson.core; * * @param the type of message object */ -public interface RTopic { +public interface RTopic extends RObject { /** * Publish the message to all subscribers of this topic