Useless Redisson.latchesMap removed

pull/38/head
Nikita 11 years ago
parent 40321c8ba9
commit 6250f41c55

@ -60,7 +60,6 @@ public class Redisson {
};
private final ConcurrentMap<String, RedissonCountDownLatch> latchesMap = new ReferenceMap<String, RedissonCountDownLatch>(ReferenceType.STRONG, ReferenceType.WEAK, listener);
private final ConcurrentMap<String, RedissonLock> locksMap = new ReferenceMap<String, RedissonLock>(ReferenceType.STRONG, ReferenceType.WEAK, listener);
private final ConnectionManager connectionManager;
@ -217,16 +216,7 @@ public class Redisson {
* @return distributed "count down latch"
*/
public RCountDownLatch getCountDownLatch(String name) {
RedissonCountDownLatch latch = latchesMap.get(name);
if (latch == null) {
latch = new RedissonCountDownLatch(connectionManager, name);
RedissonCountDownLatch oldLatch = latchesMap.putIfAbsent(name, latch);
if (oldLatch != null) {
latch = oldLatch;
}
}
return latch;
return new RedissonCountDownLatch(connectionManager, name);
}
/**

@ -18,13 +18,13 @@ package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.RCountDownLatch;
import org.redisson.misc.ReclosableLatch;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
@ -45,9 +45,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
private static final Integer zeroCountMessage = 0;
private static final Integer newCountMessage = 1;
private final AtomicReference<Promise<Boolean>> promise = new AtomicReference<Promise<Boolean>>();
private final ReclosableLatch msg = new ReclosableLatch();
private static final ConcurrentMap<String, RedissonCountDownLatchEntry> ENTRIES = new ConcurrentHashMap<String, RedissonCountDownLatchEntry>();
private PubSubConnectionEntry pubSubEntry;
@ -56,22 +54,28 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
private Future<Boolean> subscribe() {
Promise<Boolean> p = promise.get();
if (p != null) {
return p;
Promise<Boolean> promise = aquire();
if (promise != null) {
return promise;
}
final Promise<Boolean> newPromise = newPromise();
if (!promise.compareAndSet(null, newPromise)) {
return promise.get();
Promise<Boolean> newPromise = newPromise();
final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise);
value.aquire();
RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getName(), value);
if (oldValue != null) {
Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) {
return subscribe();
}
}
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
@Override
public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) {
newPromise.setSuccess(true);
value.getPromise().setSuccess(true);
}
}
@ -81,10 +85,10 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return;
}
if (message.equals(zeroCountMessage)) {
msg.open();
value.getLatch().open();
}
if (message.equals(newCountMessage)) {
msg.close();
value.getLatch().close();
}
}
@ -94,60 +98,101 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return newPromise;
}
public void await() throws InterruptedException {
subscribe().await();
private void release() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getName());
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.release();
if (ENTRIES.replace(getName(), entry, newEntry)) {
return;
}
}
}
private Promise<Boolean> aquire() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getName());
if (entry != null) {
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.aquire();
if (ENTRIES.replace(getName(), entry, newEntry)) {
return newEntry.getPromise();
}
} else {
return null;
}
}
}
while (getCount() > 0) {
// waiting for open state
msg.await();
public void await() throws InterruptedException {
Future<Boolean> promise = subscribe();
try {
promise.await();
while (getCount() > 0) {
// waiting for open state
ENTRIES.get(getName()).getLatch().await();
}
} finally {
close();
}
}
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
if (!subscribe().await(time, unit)) {
return false;
}
time = unit.toMillis(time);
while (getCount() > 0) {
if (time <= 0) {
Future<Boolean> promise = subscribe();
try {
if (!promise.await(time, unit)) {
return false;
}
long current = System.currentTimeMillis();
// waiting for open state
msg.await(time, TimeUnit.MILLISECONDS);
long elapsed = System.currentTimeMillis() - current;
time = time - elapsed;
time = unit.toMillis(time);
while (getCount() > 0) {
if (time <= 0) {
return false;
}
long current = System.currentTimeMillis();
// waiting for open state
ENTRIES.get(getName()).getLatch().await(time, TimeUnit.MILLISECONDS);
long elapsed = System.currentTimeMillis() - current;
time = time - elapsed;
}
return true;
} finally {
close();
}
return true;
}
@Override
public void countDown() {
subscribe().awaitUninterruptibly();
if (getCount() <= 0) {
return;
}
RedisConnection<String, Object> connection = connectionManager.connectionWriteOp();
Future<Boolean> promise = subscribe();
try {
Long val = connection.decr(getName());
if (val == 0) {
connection.multi();
connection.del(getName());
connection.publish(getChannelName(), zeroCountMessage);
if (connection.exec().size() != 2) {
throw new IllegalStateException();
promise.awaitUninterruptibly();
RedisConnection<String, Object> connection = connectionManager.connectionWriteOp();
try {
Long val = connection.decr(getName());
if (val == 0) {
connection.multi();
connection.del(getName());
connection.publish(getChannelName(), zeroCountMessage);
if (connection.exec().size() != 2) {
throw new IllegalStateException();
}
} else if (val < 0) {
connection.del(getName());
}
} else if (val < 0) {
connection.del(getName());
} finally {
connectionManager.releaseWrite(connection);
}
} finally {
connectionManager.releaseWrite(connection);
close();
}
}
@ -157,43 +202,65 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public long getCount() {
subscribe().awaitUninterruptibly();
RedisConnection<String, Object> connection = connectionManager.connectionReadOp();
Future<Boolean> promise = subscribe();
try {
Number val = (Number) connection.get(getName());
if (val == null) {
return 0;
promise.awaitUninterruptibly();
RedisConnection<String, Object> connection = connectionManager.connectionReadOp();
try {
Number val = (Number) connection.get(getName());
if (val == null) {
return 0;
}
return val.longValue();
} finally {
connectionManager.releaseRead(connection);
}
return val.longValue();
} finally {
connectionManager.releaseRead(connection);
close();
}
}
@Override
public boolean trySetCount(long count) {
subscribe().awaitUninterruptibly();
RedisConnection<String, Object> connection = connectionManager.connectionWriteOp();
Future<Boolean> promise = subscribe();
try {
connection.watch(getName());
Long oldValue = (Long) connection.get(getName());
if (oldValue != null) {
connection.unwatch();
return false;
promise.awaitUninterruptibly();
RedisConnection<String, Object> connection = connectionManager.connectionWriteOp();
try {
connection.watch(getName());
Long oldValue = (Long) connection.get(getName());
if (oldValue != null) {
connection.unwatch();
return false;
}
connection.multi();
connection.set(getName(), count);
connection.publish(getChannelName(), newCountMessage);
return connection.exec().size() == 2;
} finally {
connectionManager.releaseWrite(connection);
}
connection.multi();
connection.set(getName(), count);
connection.publish(getChannelName(), newCountMessage);
return connection.exec().size() == 2;
} finally {
connectionManager.releaseWrite(connection);
close();
}
}
public void close() {
connectionManager.unsubscribe(pubSubEntry, getChannelName());
release();
connectionManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
RedissonCountDownLatchEntry entry = ENTRIES.get(getName());
if (entry.isFree()
&& ENTRIES.remove(getName(), entry)) {
connectionManager.unsubscribe(pubSubEntry, getChannelName());
}
}
}, 15, TimeUnit.SECONDS);
}
}

@ -0,0 +1,83 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.util.concurrent.Promise;
import org.redisson.misc.ReclosableLatch;
public class RedissonCountDownLatchEntry {
private int counter;
private final ReclosableLatch latch;
private final Promise<Boolean> promise;
public RedissonCountDownLatchEntry(RedissonCountDownLatchEntry source) {
counter = source.counter;
latch = source.latch;
promise = source.promise;
}
public RedissonCountDownLatchEntry(Promise<Boolean> promise) {
super();
this.latch = new ReclosableLatch();
this.promise = promise;
}
public boolean isFree() {
return counter == 0;
}
public void aquire() {
counter++;
}
public void release() {
counter--;
}
public Promise<Boolean> getPromise() {
return promise;
}
public ReclosableLatch getLatch() {
return latch;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + counter;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RedissonCountDownLatchEntry other = (RedissonCountDownLatchEntry) obj;
if (counter != other.counter)
return false;
return true;
}
}
Loading…
Cancel
Save