RemoteService timeout handling fixed. #434

pull/499/head
Nikita 9 years ago
parent 5e2cbe6a2f
commit ac3278d94b

@ -23,6 +23,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.core.MessageListener;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RRemoteService;
@ -97,7 +99,7 @@ public class RedissonRemoteService implements RRemoteService {
long clients = topic.publish(response);
if (clients == 0) {
log.error("None of clients has not received a response for: {}", request);
log.error("None of clients has not received a response: {} for request: {}", response, request);
}
subscribe(remoteInterface, requestQueue);
@ -119,7 +121,8 @@ public class RedissonRemoteService implements RRemoteService {
String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args));
RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args);
requestQueue.add(request);
String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId;
final RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
@ -136,7 +139,10 @@ public class RedissonRemoteService implements RRemoteService {
if (timeout == -1) {
latch.await();
} else {
latch.await(timeout, timeUnit);
if (!latch.await(timeout, timeUnit)) {
topic.removeListener(listenerId);
throw new RedisTimeoutException("No response after " + timeUnit.toMillis(timeout) + "ms for request: " + request);
}
}
topic.removeListener(listenerId);
RemoteServiceResponse msg = response.get();

@ -38,5 +38,10 @@ public class RemoteServiceResponse {
public Object getResult() {
return result;
}
@Override
public String toString() {
return "RemoteServiceResponse [result=" + result + ", error=" + error + "]";
}
}

@ -493,8 +493,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.getMainPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
connectionManager.getShutdownPromise().removeListener(listener);
if (!future.isCancelled()) {
connectionManager.getShutdownPromise().removeListener(listener);
return;
}
// cancel handling for commands from skipTimeout collection

@ -2,9 +2,12 @@ package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisTimeoutException;
import static org.assertj.core.api.Assertions.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class RedissonRemoteServiceTest extends BaseTest {
@ -18,6 +21,8 @@ public class RedissonRemoteServiceTest extends BaseTest {
void errorMethodWithCause();
void timeoutMethod() throws InterruptedException;
}
public class RemoteImpl implements RemoteInterface {
@ -46,8 +51,29 @@ public class RedissonRemoteServiceTest extends BaseTest {
}
}
@Override
public void timeoutMethod() throws InterruptedException {
Thread.sleep(2000);
}
}
@Test(expected = RedisTimeoutException.class)
public void testTimeout() throws InterruptedException {
RedissonClient r1 = Redisson.create();
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
RedissonClient r2 = Redisson.create();
RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, 1, TimeUnit.SECONDS);
try {
ri.timeoutMethod();
} finally {
r1.shutdown();
r2.shutdown();
}
}
@Test
public void testInvocations() {
@ -74,7 +100,6 @@ public class RedissonRemoteServiceTest extends BaseTest {
assertThat(e.getCause()).isInstanceOf(ArithmeticException.class);
assertThat(e.getCause().getMessage()).isEqualTo("/ by zero");
}
r1.shutdown();
r2.shutdown();

Loading…
Cancel
Save