Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit 3ef475b1f1

@ -15,12 +15,14 @@
*/
package org.redisson.tomcat;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
public class AttributeMessage implements Serializable {
private String sessionId;

@ -15,12 +15,14 @@
*/
package org.redisson.tomcat;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
public class AttributeMessage implements Serializable {
private String sessionId;

@ -15,12 +15,14 @@
*/
package org.redisson.tomcat;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
public class AttributeMessage implements Serializable {
private String sessionId;

@ -15,12 +15,14 @@
*/
package org.redisson.tomcat;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
public class AttributeMessage implements Serializable {
private String sessionId;

@ -65,7 +65,6 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
@ -568,7 +567,7 @@ public abstract class BaseRemoteService {
}
});
}
private <T> T sync(final Class<T> remoteInterface, final RemoteInvocationOptions options) {
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
@ -595,20 +594,53 @@ public abstract class BaseRemoteService {
RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId);
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy,
System.currentTimeMillis());
addAsync(requestQueueName, request, addPromise).sync();
RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
final RFuture<RemoteServiceAck> ackFuture;
if (optionsCopy.isAckExpected()) {
ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false);
} else {
ackFuture = null;
}
final RPromise<RRemoteServiceResponse> responseFuture;
if (optionsCopy.isResultExpected()) {
responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId, false);
} else {
responseFuture = null;
}
RFuture<Boolean> futureAdd = addAsync(requestQueueName, request, addPromise);
futureAdd.await();
if (!futureAdd.isSuccess()) {
if (responseFuture != null) {
responseFuture.cancel(false);
}
if (ackFuture != null) {
ackFuture.cancel(false);
}
throw futureAdd.cause();
}
if (!futureAdd.get()) {
if (responseFuture != null) {
responseFuture.cancel(false);
}
if (ackFuture != null) {
ackFuture.cancel(false);
}
throw new RedisException("Task hasn't been added");
}
// poll for the ack only if expected
if (optionsCopy.isAckExpected()) {
if (ackFuture != null) {
String ackName = getAckName(requestId);
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(),
TimeUnit.MILLISECONDS);
ackFuture.await();
RemoteServiceAck ack = ackFuture.getNow();
if (ack == null) {
ack = tryPollAckAgain(optionsCopy, responseQueue, ackName);
RFuture<RemoteServiceAck> ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, ackName, requestId);
ackFutureAttempt.await();
ack = ackFutureAttempt.getNow();
if (ack == null) {
throw new RemoteServiceAckTimeoutException("No ACK response after "
+ optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
@ -618,9 +650,9 @@ public abstract class BaseRemoteService {
}
// poll for the response only if expected
if (optionsCopy.isResultExpected()) {
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue
.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (responseFuture != null) {
responseFuture.awaitUninterruptibly();
RemoteServiceResponse response = (RemoteServiceResponse) responseFuture.getNow();
if (response == null) {
throw new RemoteServiceTimeoutException("No response after "
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
@ -638,25 +670,6 @@ public abstract class BaseRemoteService {
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler);
}
private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy,
RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName)
throws InterruptedException {
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
+ "return 0;"
+ "end;"
+ "redis.call('del', KEYS[1]);"
+ "return 1;",
Arrays.<Object> asList(ackName), optionsCopy.getAckTimeoutInMillis());
ackClientsFuture.sync();
if (ackClientsFuture.getNow()) {
return (RemoteServiceAck) responseQueue.poll();
}
return null;
}
private RFuture<RemoteServiceAck> tryPollAckAgainAsync(final RemoteInvocationOptions optionsCopy,
String ackName, final RequestId requestId) {
final RPromise<RemoteServiceAck> promise = new RedissonPromise<RemoteServiceAck>();

@ -363,9 +363,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem); " +
" redis.call('zrem', KEYS[3], lruItem); " +
" redis.call('zrem', lastAccessTimeSetName, lruItem); " +
" local removedChannelName = KEYS[6]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[6]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
"end; " +
" end; " +
" end; " +
" end; " +
@ -527,9 +529,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem);" +
" redis.call('zrem', KEYS[3], lruItem);" +
" redis.call('zrem', lastAccessTimeSetName, lruItem);" +
" local removedChannelName = KEYS[7];" +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" +
" redis.call('publish', removedChannelName, msg);" +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[7];" +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" +
" redis.call('publish', removedChannelName, msg);" +
"end; " +
" end;" +
" end" +
" end;" +
@ -596,9 +600,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem); " +
" redis.call('zrem', KEYS[3], lruItem); " +
" redis.call('zrem', lastAccessTimeSetName, lruItem); " +
" local removedChannelName = KEYS[6]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[6]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
"end; " +
" end; " +
" end; " +
" end; " +
@ -672,9 +678,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem); " +
" redis.call('zrem', KEYS[3], lruItem); " +
" redis.call('zrem', lastAccessTimeSetName, lruItem); " +
" local removedChannelName = KEYS[7]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[7]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
"end; " +
" end; " +
" end; " +
" end; " +
@ -804,9 +812,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem); " +
" redis.call('zrem', KEYS[3], lruItem); " +
" redis.call('zrem', lastAccessTimeSetName, lruItem); " +
" local removedChannelName = KEYS[7]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[7]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
"end; " +
" end; " +
" end; " +
" end; " +
@ -940,9 +950,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem); " +
" redis.call('zrem', KEYS[3], lruItem); " +
" redis.call('zrem', lastAccessTimeSetName, lruItem); " +
" local removedChannelName = KEYS[7]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[7]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
"end; " +
" end; " +
" end; " +
" end; " +
@ -1339,9 +1351,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem); " +
" redis.call('zrem', KEYS[3], lruItem); " +
" redis.call('zrem', lastAccessTimeSetName, lruItem); " +
" local removedChannelName = KEYS[7]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[7]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
"end; " +
" end; " +
" end; " +
" end; " +
@ -1389,9 +1403,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem); " +
" redis.call('zrem', KEYS[3], lruItem); " +
" redis.call('zrem', lastAccessTimeSetName, lruItem); " +
" local removedChannelName = KEYS[6]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[6]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
"end; " +
" end; " +
" end; " +
" end; " +
@ -1533,9 +1549,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem); " +
" redis.call('zrem', KEYS[3], lruItem); " +
" redis.call('zrem', lastAccessTimeSetName, lruItem); " +
" local removedChannelName = KEYS[6]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[6]; " +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " +
" redis.call('publish', removedChannelName, msg); " +
"end; " +
" end; " +
" end; " +
" end; " +
@ -1724,9 +1742,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[2], lruItem);" +
" redis.call('zrem', KEYS[3], lruItem);" +
" redis.call('zrem', lastAccessTimeSetName, lruItem);" +
" local removedChannelName = KEYS[7];" +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" +
" redis.call('publish', removedChannelName, msg);" +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[7];" +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" +
" redis.call('publish', removedChannelName, msg);"
+ "end; " +
" end;" +
" end" +
" end;" +

@ -726,10 +726,10 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public void run(Timeout timeout) throws Exception {
// re-connection hasn't been made
// and connection is still active
if (orignalChannel == connection.getChannel()
&& connection.isActive()) {
return;
}
// if (orignalChannel == connection.getChannel()
// && connection.isActive()) {
// return;
// }
if (details.getAttemptPromise().trySuccess(null)) {
connection.forceFastReconnectAsync();

@ -43,6 +43,7 @@ public class RemoteServiceResponse implements RRemoteServiceResponse, Serializab
this.id = id;
}
@Override
public String getId() {
return id;
}

@ -11,6 +11,15 @@ import org.redisson.api.RBlockingDeque;
public class RedissonBlockingDequeTest extends BaseTest {
@Test
public void testPollLastAndOfferFirstTo() throws InterruptedException {
RBlockingDeque<String> blockingDeque = redisson.getBlockingDeque("blocking_deque");
long start = System.currentTimeMillis();
String redisTask = blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - start).isBetween(950L, 1050L);
assertThat(redisTask).isNull();
}
@Test(timeout = 3000)
public void testShortPoll() throws InterruptedException {
RBlockingDeque<Integer> queue = redisson.getBlockingDeque("queue:pollany");

@ -38,7 +38,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
return redisson.getBlockingQueue("queue");
}
// @Test
@Test
public void testPollWithBrokenConnection() throws IOException, InterruptedException, ExecutionException {
RedisProcess runner = new RedisRunner()
.nosave()

@ -5,10 +5,14 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -16,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
@ -213,6 +218,38 @@ public class RedissonRemoteServiceTest extends BaseTest {
}
}
@Test
public void testConcurrentInvocations() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
RRemoteService remoteService = redisson.getRemoteService();
remoteService.register(RemoteInterface.class, new RemoteImpl());
RemoteInterface service = redisson.getRemoteService().get(RemoteInterface.class);
List<Future<?>> futures = new ArrayList<>();
int iterations = 1000;
AtomicBoolean bool = new AtomicBoolean();
for (int i = 0; i < iterations; i++) {
futures.add(executorService.submit(() -> {
try {
if (ThreadLocalRandom.current().nextBoolean()) {
service.resultMethod(1L);
} else {
service.methodOverload();
}
} catch (Exception e) {
bool.set(true);
}
}));
}
while (!futures.stream().allMatch(Future::isDone)) {}
assertThat(bool.get()).isFalse();
remoteService.deregister(RemoteInterface.class);
}
@Test
public void testCancelAsync() throws InterruptedException {

Loading…
Cancel
Save