From ad796f34b14f8ff5e37317a471929f62ddecdb36 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 15 Aug 2016 10:52:48 +0300 Subject: [PATCH] RBoundedBlockingQueue.clear implemented --- .../redisson/RedissonBoundedBlockingQueue.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index de3944a11..4b1fc9747 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -36,7 +36,6 @@ import org.redisson.pubsub.SemaphorePubSub; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; /** *

Distributed and concurrent implementation of bounded {@link java.util.concurrent.BlockingQueue}. @@ -338,6 +337,20 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements return get(trySetCapacityAsync(capacity)); } + @Override + public void clear() { + String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); + commandExecutor.evalWrite(getName(), codec, RedisCommands.EVAL_BOOLEAN, + "local len = redis.call('llen', KEYS[1]); " + + "if len > 0 then " + + "redis.call('del', KEYS[1]); " + + "local value = redis.call('incrby', KEYS[2], len); " + + "redis.call('publish', KEYS[3], value); " + + "end; ", + Arrays.asList(getName(), getSemaphoreName(), channelName)); + + } + @Override public RFuture deleteAsync() { return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getSemaphoreName());