diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java index 322eff42c..7d5f08a1d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java @@ -15,24 +15,21 @@ */ package org.redisson; -import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE; - import java.util.Arrays; import java.util.Collections; -import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.RBlockingFairQueue; import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.pubsub.SemaphorePubSub; -import io.netty.util.internal.PlatformDependent; - /** * * @author Nikita Koksharov @@ -40,40 +37,42 @@ import io.netty.util.internal.PlatformDependent; */ public class RedissonBlockingFairQueue extends RedissonBlockingQueue implements RBlockingFairQueue { - private final Set usedIds = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); private final UUID id; + private final AtomicInteger instances = new AtomicInteger(); private final SemaphorePubSub semaphorePubSub; protected RedissonBlockingFairQueue(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) { super(commandExecutor, name); this.semaphorePubSub = semaphorePubSub; this.id = id; + instances.incrementAndGet(); } protected RedissonBlockingFairQueue(Codec codec, CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) { super(codec, commandExecutor, name); this.semaphorePubSub = semaphorePubSub; this.id = id; + instances.incrementAndGet(); } private String getIdsListName() { - return "{" + getName() + "}:list"; + return suffixName(getName(), "list"); } private String getChannelName() { - return "{" + getName() + "}:" + getCurrentId() + ":channel"; + return suffixName(getName(), getCurrentId() + ":channel"); } private RedissonLockEntry getEntry() { - return semaphorePubSub.getEntry(getName() + ":" + getCurrentId()); + return semaphorePubSub.getEntry(getName()); } private RFuture subscribe() { - return semaphorePubSub.subscribe(getName() + ":" + getCurrentId(), getChannelName(), commandExecutor.getConnectionManager()); + return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager()); } private void unsubscribe(RFuture future) { - semaphorePubSub.unsubscribe(future.getNow(), getName() + ":" + getCurrentId(), getChannelName(), commandExecutor.getConnectionManager()); + semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager()); } @Override @@ -96,7 +95,7 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple "end; " + "end; " + "if found == false then " - + "redis.call('rpush', KEYS[2], ARGV[1]); " + + "redis.call('lpush', KEYS[2], ARGV[1]); " + "end; " + "local value = redis.call('lindex', KEYS[2], 0); " + "local size = redis.call('llen', KEYS[2]); " @@ -114,9 +113,7 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple } private String getCurrentId() { - String currentId = id + "-" + Thread.currentThread().getId(); - usedIds.add(currentId); - return currentId; + return id.toString(); } @@ -143,11 +140,13 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple @Override public void destroy() { - commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID_WITH_VALUES, - "for i = 1, #ARGV, 1 do " - + "redis.call('lrem', KEYS[1], 0, ARGV[i]);" - +"end; ", - Collections.singletonList(getIdsListName()), usedIds.toArray()); + if (instances.decrementAndGet() == 0) { + get(commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID_WITH_VALUES, + "for i = 1, #ARGV, 1 do " + + "redis.call('lrem', KEYS[1], 0, ARGV[i]);" + +"end; ", + Collections.singletonList(getIdsListName()), getCurrentId())); + } } // @Override @@ -287,7 +286,11 @@ public class RedissonBlockingFairQueue extends RedissonBlockingQueue imple return null; } - getEntry().getLatch().acquire(1); + long spentTime = System.currentTimeMillis() - startTime; + long remainTime = unit.toMillis(timeout) - spentTime; + if (remainTime <= 0 || !getEntry().getLatch().tryAcquire(remainTime, TimeUnit.MILLISECONDS)) { + return null; + } } } finally { unsubscribe(future); diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index af7dc40a0..e649dac07 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -57,6 +57,13 @@ public abstract class RedissonObject implements RObject { } return prefix + ":{" + getName() + "}"; } + + protected String suffixName(String name, String suffix) { + if (getName().contains("{")) { + return name + ":" + suffix; + } + return "{" + getName() + "}:" + suffix; + } protected V get(RFuture future) { return commandExecutor.get(future); diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java index a00e00154..740379338 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingFairQueueTest.java @@ -2,35 +2,36 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Assert; import org.junit.Test; import org.redisson.api.RBlockingFairQueue; import org.redisson.api.RBlockingQueue; +import org.redisson.api.RedissonClient; public class RedissonBlockingFairQueueTest extends BaseTest { @Test public void testFairness() throws InterruptedException { - int size = 2000; - RBlockingQueue queue = redisson.getBlockingFairQueue("test"); + int size = 1000; + RBlockingQueue queue = redisson.getBlockingFairQueue("test"); + CountDownLatch latch = new CountDownLatch(size); AtomicInteger t1Counter = new AtomicInteger(); AtomicInteger t2Counter = new AtomicInteger(); AtomicInteger t3Counter = new AtomicInteger(); AtomicInteger t4Counter = new AtomicInteger(); + + RedissonClient redisson1 = createInstance(); + RBlockingFairQueue queue1 = redisson1.getBlockingFairQueue("test"); Thread t1 = new Thread("test-thread1") { public void run() { - RBlockingFairQueue queue = redisson.getBlockingFairQueue("test"); while (true) { try { - String a = queue.poll(1, TimeUnit.SECONDS); + String a = queue1.poll(1, TimeUnit.SECONDS); if (a == null) { break; } @@ -39,35 +40,35 @@ public class RedissonBlockingFairQueueTest extends BaseTest { } catch (InterruptedException e) { } } - queue.destroy(); }; }; + RedissonClient redisson2 = createInstance(); + RBlockingFairQueue queue2 = redisson2.getBlockingFairQueue("test"); Thread t2 = new Thread("test-thread2") { public void run() { - RBlockingFairQueue queue = redisson.getBlockingFairQueue("test"); while (true) { try { - String a = queue.poll(1, TimeUnit.SECONDS); + String a = queue2.poll(1, TimeUnit.SECONDS); if (a == null) { break; } - Thread.sleep(5); + Thread.sleep(50); latch.countDown(); t2Counter.incrementAndGet(); } catch (InterruptedException e) { } } - queue.destroy(); }; }; - RBlockingFairQueue queue34 = redisson.getBlockingFairQueue("test"); + RedissonClient redisson3 = createInstance(); + RBlockingFairQueue queue3 = redisson3.getBlockingFairQueue("test"); Thread t3 = new Thread("test-thread3") { public void run() { while (true) { try { - String a = queue34.poll(1, TimeUnit.SECONDS); + String a = queue3.poll(1, TimeUnit.SECONDS); if (a == null) { break; } @@ -80,11 +81,13 @@ public class RedissonBlockingFairQueueTest extends BaseTest { }; }; + RedissonClient redisson4 = createInstance(); + RBlockingFairQueue queue4 = redisson4.getBlockingFairQueue("test"); Thread t4 = new Thread("test-thread4") { public void run() { while (true) { try { - String a = queue34.poll(1, TimeUnit.SECONDS); + String a = queue4.poll(1, TimeUnit.SECONDS); if (a == null) { break; } @@ -96,9 +99,6 @@ public class RedissonBlockingFairQueueTest extends BaseTest { }; }; - queue34.destroy(); - - for (int i = 0; i < size; i++) { queue.add("" + i); } @@ -112,8 +112,23 @@ public class RedissonBlockingFairQueueTest extends BaseTest { t2.join(); t3.join(); t4.join(); - + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + + queue1.destroy(); + queue2.destroy(); + queue3.destroy(); + queue4.destroy(); + redisson1.shutdown(); + redisson2.shutdown(); + redisson3.shutdown(); + redisson4.shutdown(); + + assertThat(t1Counter.get()).isEqualTo(250); + assertThat(t2Counter.get()).isEqualTo(250); + assertThat(t3Counter.get()).isEqualTo(250); + assertThat(t4Counter.get()).isEqualTo(250); + System.out.println("t1: " + t1Counter.get()); System.out.println("t2: " + t2Counter.get()); System.out.println("t3: " + t3Counter.get());