More equal elements distribution for consumers of RBlockingFairQueue

pull/748/head
Nikita 8 years ago
parent cad190152f
commit a913ca9d72

@ -15,24 +15,21 @@
*/
package org.redisson;
import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -40,40 +37,42 @@ import io.netty.util.internal.PlatformDependent;
*/
public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> implements RBlockingFairQueue<V> {
private final Set<String> usedIds = Collections.newSetFromMap(PlatformDependent.<String, Boolean>newConcurrentHashMap());
private final UUID id;
private final AtomicInteger instances = new AtomicInteger();
private final SemaphorePubSub semaphorePubSub;
protected RedissonBlockingFairQueue(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) {
super(commandExecutor, name);
this.semaphorePubSub = semaphorePubSub;
this.id = id;
instances.incrementAndGet();
}
protected RedissonBlockingFairQueue(Codec codec, CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) {
super(codec, commandExecutor, name);
this.semaphorePubSub = semaphorePubSub;
this.id = id;
instances.incrementAndGet();
}
private String getIdsListName() {
return "{" + getName() + "}:list";
return suffixName(getName(), "list");
}
private String getChannelName() {
return "{" + getName() + "}:" + getCurrentId() + ":channel";
return suffixName(getName(), getCurrentId() + ":channel");
}
private RedissonLockEntry getEntry() {
return semaphorePubSub.getEntry(getName() + ":" + getCurrentId());
return semaphorePubSub.getEntry(getName());
}
private RFuture<RedissonLockEntry> subscribe() {
return semaphorePubSub.subscribe(getName() + ":" + getCurrentId(), getChannelName(), commandExecutor.getConnectionManager());
return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager());
}
private void unsubscribe(RFuture<RedissonLockEntry> future) {
semaphorePubSub.unsubscribe(future.getNow(), getName() + ":" + getCurrentId(), getChannelName(), commandExecutor.getConnectionManager());
semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager());
}
@Override
@ -96,7 +95,7 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
"end; " +
"end; "
+ "if found == false then "
+ "redis.call('rpush', KEYS[2], ARGV[1]); "
+ "redis.call('lpush', KEYS[2], ARGV[1]); "
+ "end; "
+ "local value = redis.call('lindex', KEYS[2], 0); "
+ "local size = redis.call('llen', KEYS[2]); "
@ -114,9 +113,7 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
}
private String getCurrentId() {
String currentId = id + "-" + Thread.currentThread().getId();
usedIds.add(currentId);
return currentId;
return id.toString();
}
@ -143,11 +140,13 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
@Override
public void destroy() {
commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID_WITH_VALUES,
"for i = 1, #ARGV, 1 do "
+ "redis.call('lrem', KEYS[1], 0, ARGV[i]);"
+"end; ",
Collections.<Object>singletonList(getIdsListName()), usedIds.toArray());
if (instances.decrementAndGet() == 0) {
get(commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID_WITH_VALUES,
"for i = 1, #ARGV, 1 do "
+ "redis.call('lrem', KEYS[1], 0, ARGV[i]);"
+"end; ",
Collections.<Object>singletonList(getIdsListName()), getCurrentId()));
}
}
// @Override
@ -287,7 +286,11 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
return null;
}
getEntry().getLatch().acquire(1);
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime <= 0 || !getEntry().getLatch().tryAcquire(remainTime, TimeUnit.MILLISECONDS)) {
return null;
}
}
} finally {
unsubscribe(future);

@ -57,6 +57,13 @@ public abstract class RedissonObject implements RObject {
}
return prefix + ":{" + getName() + "}";
}
protected String suffixName(String name, String suffix) {
if (getName().contains("{")) {
return name + ":" + suffix;
}
return "{" + getName() + "}:" + suffix;
}
protected <V> V get(RFuture<V> future) {
return commandExecutor.get(future);

@ -2,35 +2,36 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
public class RedissonBlockingFairQueueTest extends BaseTest {
@Test
public void testFairness() throws InterruptedException {
int size = 2000;
RBlockingQueue<String> queue = redisson.getBlockingFairQueue("test");
int size = 1000;
RBlockingQueue<String> queue = redisson.getBlockingFairQueue("test");
CountDownLatch latch = new CountDownLatch(size);
AtomicInteger t1Counter = new AtomicInteger();
AtomicInteger t2Counter = new AtomicInteger();
AtomicInteger t3Counter = new AtomicInteger();
AtomicInteger t4Counter = new AtomicInteger();
RedissonClient redisson1 = createInstance();
RBlockingFairQueue<String> queue1 = redisson1.getBlockingFairQueue("test");
Thread t1 = new Thread("test-thread1") {
public void run() {
RBlockingFairQueue<String> queue = redisson.getBlockingFairQueue("test");
while (true) {
try {
String a = queue.poll(1, TimeUnit.SECONDS);
String a = queue1.poll(1, TimeUnit.SECONDS);
if (a == null) {
break;
}
@ -39,35 +40,35 @@ public class RedissonBlockingFairQueueTest extends BaseTest {
} catch (InterruptedException e) {
}
}
queue.destroy();
};
};
RedissonClient redisson2 = createInstance();
RBlockingFairQueue<String> queue2 = redisson2.getBlockingFairQueue("test");
Thread t2 = new Thread("test-thread2") {
public void run() {
RBlockingFairQueue<String> queue = redisson.getBlockingFairQueue("test");
while (true) {
try {
String a = queue.poll(1, TimeUnit.SECONDS);
String a = queue2.poll(1, TimeUnit.SECONDS);
if (a == null) {
break;
}
Thread.sleep(5);
Thread.sleep(50);
latch.countDown();
t2Counter.incrementAndGet();
} catch (InterruptedException e) {
}
}
queue.destroy();
};
};
RBlockingFairQueue<String> queue34 = redisson.getBlockingFairQueue("test");
RedissonClient redisson3 = createInstance();
RBlockingFairQueue<String> queue3 = redisson3.getBlockingFairQueue("test");
Thread t3 = new Thread("test-thread3") {
public void run() {
while (true) {
try {
String a = queue34.poll(1, TimeUnit.SECONDS);
String a = queue3.poll(1, TimeUnit.SECONDS);
if (a == null) {
break;
}
@ -80,11 +81,13 @@ public class RedissonBlockingFairQueueTest extends BaseTest {
};
};
RedissonClient redisson4 = createInstance();
RBlockingFairQueue<String> queue4 = redisson4.getBlockingFairQueue("test");
Thread t4 = new Thread("test-thread4") {
public void run() {
while (true) {
try {
String a = queue34.poll(1, TimeUnit.SECONDS);
String a = queue4.poll(1, TimeUnit.SECONDS);
if (a == null) {
break;
}
@ -96,9 +99,6 @@ public class RedissonBlockingFairQueueTest extends BaseTest {
};
};
queue34.destroy();
for (int i = 0; i < size; i++) {
queue.add("" + i);
}
@ -112,8 +112,23 @@ public class RedissonBlockingFairQueueTest extends BaseTest {
t2.join();
t3.join();
t4.join();
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
queue1.destroy();
queue2.destroy();
queue3.destroy();
queue4.destroy();
redisson1.shutdown();
redisson2.shutdown();
redisson3.shutdown();
redisson4.shutdown();
assertThat(t1Counter.get()).isEqualTo(250);
assertThat(t2Counter.get()).isEqualTo(250);
assertThat(t3Counter.get()).isEqualTo(250);
assertThat(t4Counter.get()).isEqualTo(250);
System.out.println("t1: " + t1Counter.get());
System.out.println("t2: " + t2Counter.get());
System.out.println("t3: " + t3Counter.get());

Loading…
Cancel
Save