RBoundedBlockingQueue.clear implemented

pull/583/head
Nikita 9 years ago
parent d112b5370f
commit ad796f34b1

@ -36,7 +36,6 @@ import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/** /**
* <p>Distributed and concurrent implementation of bounded {@link java.util.concurrent.BlockingQueue}. * <p>Distributed and concurrent implementation of bounded {@link java.util.concurrent.BlockingQueue}.
@ -338,6 +337,20 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
return get(trySetCapacityAsync(capacity)); return get(trySetCapacityAsync(capacity));
} }
@Override
public void clear() {
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
commandExecutor.evalWrite(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local len = redis.call('llen', KEYS[1]); " +
"if len > 0 then "
+ "redis.call('del', KEYS[1]); "
+ "local value = redis.call('incrby', KEYS[2], len); " +
"redis.call('publish', KEYS[3], value); "
+ "end; ",
Arrays.<Object>asList(getName(), getSemaphoreName(), channelName));
}
@Override @Override
public RFuture<Boolean> deleteAsync() { public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getSemaphoreName()); return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getSemaphoreName());

Loading…
Cancel
Save