RSemaphore optimization. #207

pull/365/head
Nikita 9 years ago
parent 80a1ea194c
commit 18e5051c82

@ -25,7 +25,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.core.RSemaphore; import org.redisson.core.RSemaphore;
import org.redisson.pubsub.LockPubSub; import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -41,7 +41,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
final UUID id; final UUID id;
private static final LockPubSub PUBSUB = new LockPubSub(); private static final SemaphorePubSub PUBSUB = new SemaphorePubSub();
final CommandExecutor commandExecutor; final CommandExecutor commandExecutor;
@ -77,7 +77,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return; return;
} }
getEntry().getLatch().acquire(); getEntry().getLatch().acquire(permits);
} }
} finally { } finally {
unsubscribe(future); unsubscribe(future);
@ -130,7 +130,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
// waiting for message // waiting for message
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
getEntry().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
long elapsed = System.currentTimeMillis() - current; long elapsed = System.currentTimeMillis() - current;
time -= elapsed; time -= elapsed;
@ -169,9 +169,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
} }
commandExecutor.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_OBJECT, commandExecutor.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_OBJECT,
"redis.call('incrby', KEYS[1], ARGV[1]); " + "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], ARGV[2]); ", "redis.call('publish', KEYS[2], value); ",
Arrays.<Object>asList(getName(), getChannelName()), permits, LockPubSub.unlockMessage); Arrays.<Object>asList(getName(), getChannelName()), permits);
} }
@Override @Override
@ -201,10 +201,10 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
Future<Void> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, Future<Void> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('get', KEYS[1]); " + "local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then " "if (value == false or value == 0) then "
+ "redis.call('set', KEYS[1], ARGV[2]); " + "redis.call('set', KEYS[1], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); "
+ "end;", + "end;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, permits); Arrays.<Object>asList(getName(), getChannelName()), permits);
get(f); get(f);
} }

@ -0,0 +1,34 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.pubsub;
import org.redisson.RedissonLockEntry;
import io.netty.util.concurrent.Promise;
public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
@Override
protected RedissonLockEntry createEntry(Promise<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
value.getLatch().release(message.intValue());
}
}
Loading…
Cancel
Save