From 18e5051c825c281fa99a5616abb030ae1e142a86 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 17 Dec 2015 15:24:13 +0300 Subject: [PATCH] RSemaphore optimization. #207 --- .../java/org/redisson/RedissonSemaphore.java | 18 +++++----- .../org/redisson/pubsub/SemaphorePubSub.java | 34 +++++++++++++++++++ 2 files changed, 43 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/redisson/pubsub/SemaphorePubSub.java diff --git a/src/main/java/org/redisson/RedissonSemaphore.java b/src/main/java/org/redisson/RedissonSemaphore.java index 23511809d..e29a4e0a0 100644 --- a/src/main/java/org/redisson/RedissonSemaphore.java +++ b/src/main/java/org/redisson/RedissonSemaphore.java @@ -25,7 +25,7 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.core.RSemaphore; -import org.redisson.pubsub.LockPubSub; +import org.redisson.pubsub.SemaphorePubSub; import io.netty.util.concurrent.Future; @@ -41,7 +41,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { final UUID id; - private static final LockPubSub PUBSUB = new LockPubSub(); + private static final SemaphorePubSub PUBSUB = new SemaphorePubSub(); final CommandExecutor commandExecutor; @@ -77,7 +77,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { return; } - getEntry().getLatch().acquire(); + getEntry().getLatch().acquire(permits); } } finally { unsubscribe(future); @@ -130,7 +130,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { // waiting for message long current = System.currentTimeMillis(); - getEntry().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); + getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS); long elapsed = System.currentTimeMillis() - current; time -= elapsed; @@ -169,9 +169,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { } commandExecutor.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_OBJECT, - "redis.call('incrby', KEYS[1], ARGV[1]); " + - "redis.call('publish', KEYS[2], ARGV[2]); ", - Arrays.asList(getName(), getChannelName()), permits, LockPubSub.unlockMessage); + "local value = redis.call('incrby', KEYS[1], ARGV[1]); " + + "redis.call('publish', KEYS[2], value); ", + Arrays.asList(getName(), getChannelName()), permits); } @Override @@ -201,10 +201,10 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('get', KEYS[1]); " + "if (value == false or value == 0) then " - + "redis.call('set', KEYS[1], ARGV[2]); " + + "redis.call('set', KEYS[1], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "end;", - Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, permits); + Arrays.asList(getName(), getChannelName()), permits); get(f); } diff --git a/src/main/java/org/redisson/pubsub/SemaphorePubSub.java b/src/main/java/org/redisson/pubsub/SemaphorePubSub.java new file mode 100644 index 000000000..08054649d --- /dev/null +++ b/src/main/java/org/redisson/pubsub/SemaphorePubSub.java @@ -0,0 +1,34 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.pubsub; + +import org.redisson.RedissonLockEntry; + +import io.netty.util.concurrent.Promise; + +public class SemaphorePubSub extends PublishSubscribe { + + @Override + protected RedissonLockEntry createEntry(Promise newPromise) { + return new RedissonLockEntry(newPromise); + } + + @Override + protected void onMessage(RedissonLockEntry value, Long message) { + value.getLatch().release(message.intValue()); + } + +}